• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 13372869678

17 Feb 2025 02:46PM UTC coverage: 82.381%. First build
13372869678

Pull #14671

github

web-flow
Merge 2a8d8dfaf into af3774130
Pull Request #14671: fix(mqtt action): make `tcp_closed` and `closed` recoverable errors

2 of 6 new or added lines in 2 files covered. (33.33%)

57470 of 69761 relevant lines covered (82.38%)

15483.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.34
/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16
-module(emqx_bridge_mqtt_connector).
17

18
-include_lib("emqx/include/emqx_mqtt.hrl").
19
-include_lib("emqx/include/logger.hrl").
20
-include_lib("emqx_resource/include/emqx_resource.hrl").
21

22
-behaviour(emqx_resource).
23
-behaviour(ecpool_worker).
24

25
%% ecpool
26
-export([connect/1]).
27

28
-export([on_message_received/3]).
29
-export([handle_disconnect/1]).
30

31
%% callbacks of behaviour emqx_resource
32
-export([
33
    resource_type/0,
34
    callback_mode/0,
35
    on_start/2,
36
    on_stop/2,
37
    on_query/3,
38
    on_query_async/4,
39
    on_get_status/2,
40
    on_add_channel/4,
41
    on_remove_channel/3,
42
    on_get_channel_status/3,
43
    on_get_channels/1
44
]).
45

46
-export([on_async_result/2]).
47

48
-type name() :: term().
49

50
-type option() ::
51
    {name, name()}
52
    | {ingress, map()}
53
    %% see `emqtt:option()`
54
    | {client_opts, map()}.
55

56
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
57

58
-define(HEALTH_CHECK_TIMEOUT, 1000).
59
-define(NO_PREFIX, <<>>).
60
-define(IS_NO_PREFIX(P), (P =:= undefined orelse P =:= ?NO_PREFIX)).
61
-define(MAX_PREFIX_BYTES, 19).
62
-define(AUTO_RECONNECT_INTERVAL_S, 2).
63

64
-type clientid() :: binary().
65
-type channel_resource_id() :: action_resource_id() | source_resource_id().
66
-type connector_state() :: #{
67
    pool_name := connector_resource_id(),
68
    installed_channels := #{channel_resource_id() => channel_state()},
69
    clean_start := boolean(),
70
    available_clientids := [clientid()],
71
    topic_to_handler_index := ets:table(),
72
    server := string()
73
}.
74
-type channel_state() :: _Todo :: map().
75

76
%% ===================================================================
77
%% When use this bridge as a data source, ?MODULE:on_message_received will be called
78
%% if the bridge received msgs from the remote broker.
79

80
on_message_received(Msg, HookPoints, ResId) ->
81
    emqx_resource_metrics:received_inc(ResId),
24✔
82
    lists:foreach(
24✔
83
        fun(HookPoint) ->
84
            emqx_hooks:run(HookPoint, [Msg])
48✔
85
        end,
86
        HookPoints
87
    ),
88
    ok.
24✔
89

90
%% ===================================================================
91
resource_type() -> mqtt.
136✔
92

93
callback_mode() -> async_if_possible.
136✔
94

95
-spec on_start(connector_resource_id(), map()) -> {ok, connector_state()} | {error, term()}.
96
on_start(ResourceId, #{server := Server} = Conf) ->
97
    ?SLOG(info, #{
163✔
98
        msg => "starting_mqtt_connector",
99
        connector => ResourceId,
100
        config => emqx_utils:redact(Conf)
101
    }),
163✔
102
    TopicToHandlerIndex = emqx_topic_index:new(),
163✔
103
    StartConf = Conf#{topic_to_handler_index => TopicToHandlerIndex},
163✔
104
    case start_mqtt_clients(ResourceId, StartConf) of
163✔
105
        {ok, Result1} ->
106
            {ok, Result1#{
72✔
107
                installed_channels => #{},
108
                clean_start => maps:get(clean_start, Conf),
109
                topic_to_handler_index => TopicToHandlerIndex,
110
                server => Server
111
            }};
112
        {error, Reason} ->
113
            {error, emqx_maybe:define(explain_error(Reason), Reason)}
90✔
114
    end.
115

116
on_add_channel(
117
    _InstId,
118
    #{
119
        installed_channels := InstalledChannels,
120
        clean_start := CleanStart
121
    } = OldState,
122
    ChannelId,
123
    #{config_root := actions} = ChannelConfig
124
) ->
125
    %% Publisher channel
126
    %% make a warning if clean_start is set to false
127
    case CleanStart of
45✔
128
        false ->
129
            ?tp(
2✔
130
                mqtt_clean_start_egress_action_warning,
131
                #{
132
                    channel_id => ChannelId,
133
                    resource_id => _InstId
134
                }
135
            ),
136
            ?SLOG(warning, #{
2✔
137
                msg => "mqtt_publisher_clean_start_false",
138
                reason => "clean_start is set to false when using MQTT publisher action, " ++
139
                    "which may cause unexpected behavior. " ++
140
                    "For example, if the client ID is already subscribed to topics, " ++
141
                    "we might receive messages that are unhanded.",
142
                channel => ChannelId,
143
                config => emqx_utils:redact(ChannelConfig)
144
            });
×
145
        true ->
146
            ok
43✔
147
    end,
148
    RemoteParams0 = maps:get(parameters, ChannelConfig),
45✔
149
    {LocalParams, RemoteParams} = take(local, RemoteParams0, #{}),
45✔
150
    ChannelState = emqx_bridge_mqtt_egress:config(#{remote => RemoteParams, local => LocalParams}),
45✔
151
    NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
45✔
152
    NewState = OldState#{installed_channels => NewInstalledChannels},
45✔
153
    {ok, NewState};
45✔
154
on_add_channel(
155
    _ResourceId,
156
    #{
157
        installed_channels := InstalledChannels,
158
        pool_name := PoolName,
159
        topic_to_handler_index := TopicToHandlerIndex,
160
        server := Server
161
    } = OldState,
162
    ChannelId,
163
    #{hookpoints := HookPoints} = ChannelConfig
164
) ->
165
    %% Add ingress channel
166
    RemoteParams0 = maps:get(parameters, ChannelConfig),
26✔
167
    {LocalParams, RemoteParams} = take(local, RemoteParams0, #{}),
26✔
168
    ChannelState0 = #{
26✔
169
        hookpoints => HookPoints,
170
        server => Server,
171
        config_root => sources,
172
        local => LocalParams,
173
        remote => RemoteParams
174
    },
175
    ChannelState1 = mk_ingress_config(ChannelId, ChannelState0, TopicToHandlerIndex),
26✔
176
    ok = emqx_bridge_mqtt_ingress:subscribe_channel(PoolName, ChannelState1),
26✔
177
    NewInstalledChannels = maps:put(ChannelId, ChannelState1, InstalledChannels),
26✔
178
    NewState = OldState#{installed_channels => NewInstalledChannels},
26✔
179
    {ok, NewState}.
26✔
180

181
on_remove_channel(
182
    _InstId,
183
    #{
184
        installed_channels := InstalledChannels,
185
        pool_name := PoolName,
186
        topic_to_handler_index := TopicToHandlerIndex
187
    } = OldState,
188
    ChannelId
189
) ->
190
    case maps:find(ChannelId, InstalledChannels) of
68✔
191
        error ->
192
            %% maybe the channel failed to be added, just ignore it
193
            {ok, OldState};
×
194
        {ok, ChannelState} ->
195
            case ChannelState of
68✔
196
                #{config_root := sources} ->
197
                    ok = emqx_bridge_mqtt_ingress:unsubscribe_channel(
26✔
198
                        PoolName, ChannelState, ChannelId, TopicToHandlerIndex
199
                    );
200
                _ ->
201
                    ok
42✔
202
            end,
203
            NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
68✔
204
            %% Update state
205
            NewState = OldState#{installed_channels => NewInstalledChannels},
68✔
206
            {ok, NewState}
68✔
207
    end.
208

209
on_get_channel_status(
210
    _ResId,
211
    ChannelId,
212
    #{
213
        available_clientids := AvailableClientids,
214
        installed_channels := Channels
215
    } = _State
216
) when is_map_key(ChannelId, Channels) ->
217
    case AvailableClientids of
101✔
218
        [] ->
219
            %% We should mark this connector as unhealthy so messages fail fast and an
220
            %% alarm is raised.
221
            {?status_disconnected, {unhealthy_target, <<"No clientids assigned to this node">>}};
×
222
        [_ | _] ->
223
            %% The channel should be ok as long as the MQTT client is ok
224
            ?status_connected
101✔
225
    end.
226

227
on_get_channels(ResId) ->
228
    emqx_bridge_v2:get_channels_for_connector(ResId).
604✔
229

230
start_mqtt_clients(ResourceId, Conf) ->
231
    ClientOpts = mk_ecpool_client_opts(ResourceId, Conf),
163✔
232
    start_mqtt_clients(ResourceId, Conf, ClientOpts).
162✔
233

234
find_my_static_clientids(#{static_clientids := [_ | _] = Entries}) ->
235
    NodeBin = atom_to_binary(node()),
18✔
236
    MyConfig =
18✔
237
        lists:filtermap(
238
            fun(#{node := N, ids := Ids}) ->
239
                case N =:= NodeBin of
46✔
240
                    true ->
241
                        {true, Ids};
18✔
242
                    false ->
243
                        false
28✔
244
                end
245
            end,
246
            Entries
247
        ),
248
    {ok, lists:flatten(MyConfig)};
18✔
249
find_my_static_clientids(#{} = _Conf) ->
250
    error.
153✔
251

252
start_mqtt_clients(ResourceId, StartConf, ClientOpts) ->
253
    PoolName = ResourceId,
162✔
254
    PoolSize = get_pool_size(StartConf),
162✔
255
    AvailableClientids = get_available_clientids(StartConf, ClientOpts),
162✔
256
    Options = [
162✔
257
        {name, PoolName},
258
        {pool_size, PoolSize},
259
        {available_clientids, AvailableClientids},
260
        {client_opts, ClientOpts},
261
        {auto_reconnect, ?AUTO_RECONNECT_INTERVAL_S}
262
    ],
263
    ok = emqx_resource:allocate_resource(ResourceId, pool_name, PoolName),
162✔
264
    case emqx_resource_pool:start(PoolName, ?MODULE, Options) of
162✔
265
        ok ->
266
            {ok, #{pool_name => PoolName, available_clientids => AvailableClientids}};
72✔
267
        {error, {start_pool_failed, _, Reason}} ->
268
            {error, Reason}
90✔
269
    end.
270

271
get_pool_size(#{static_clientids := [_ | _]} = Conf) ->
272
    {ok, Ids} = find_my_static_clientids(Conf),
9✔
273
    length(Ids);
9✔
274
get_pool_size(#{pool_size := PoolSize}) ->
275
    PoolSize.
153✔
276

277
get_available_clientids(#{} = Conf, ClientOpts) ->
278
    case find_my_static_clientids(Conf) of
162✔
279
        {ok, Ids} ->
280
            Ids;
9✔
281
        error ->
282
            #{pool_size := PoolSize} = Conf,
153✔
283
            #{clientid := ClientIdPrefix} = ClientOpts,
153✔
284
            lists:map(
153✔
285
                fun(WorkerId) ->
286
                    mk_clientid(WorkerId, ClientIdPrefix)
1,117✔
287
                end,
288
                lists:seq(1, PoolSize)
289
            )
290
    end.
291

292
on_stop(ResourceId, State) ->
293
    ?SLOG(info, #{
148✔
294
        msg => "stopping_mqtt_connector",
295
        resource_id => ResourceId
296
    }),
148✔
297
    %% on_stop can be called with State = undefined
298
    StateMap =
148✔
299
        case State of
300
            Map when is_map(State) ->
301
                Map;
67✔
302
            _ ->
303
                #{}
81✔
304
        end,
305
    case maps:get(topic_to_handler_index, StateMap, undefined) of
148✔
306
        undefined ->
307
            ok;
81✔
308
        TopicToHandlerIndex ->
309
            ets:delete(TopicToHandlerIndex)
67✔
310
    end,
311
    Allocated = emqx_resource:get_allocated_resources(ResourceId),
148✔
312
    ok = stop_helper(Allocated),
148✔
313
    ?tp(mqtt_connector_stopped, #{instance_id => ResourceId}),
148✔
314
    ok.
148✔
315

316
stop_helper(#{pool_name := PoolName}) ->
317
    emqx_resource_pool:stop(PoolName).
148✔
318

319
on_query(
320
    ResourceId,
321
    {ChannelId, Msg},
322
    #{pool_name := PoolName} = State
323
) ->
324
    ?TRACE(
7✔
325
        "QUERY",
7✔
326
        "send_msg_to_remote_node",
327
        #{
328
            message => Msg,
329
            connector => ResourceId,
330
            channel_id => ChannelId
331
        }
7✔
332
    ),
333
    Channels = maps:get(installed_channels, State),
7✔
334
    ChannelConfig = maps:get(ChannelId, Channels),
7✔
335
    case is_expected_to_have_workers(State) of
7✔
336
        true ->
337
            handle_send_result(with_egress_client(ChannelId, PoolName, send, [Msg, ChannelConfig]));
7✔
338
        false ->
339
            {error, {unrecoverable_error, <<"This node has no assigned static clientid.">>}}
×
340
    end;
341
on_query(ResourceId, {_ChannelId, Msg}, #{}) ->
342
    ?SLOG(error, #{
×
343
        msg => "forwarding_unavailable",
344
        resource_id => ResourceId,
345
        message => Msg,
346
        reason => "Egress is not configured"
347
    }).
×
348

349
on_query_async(
350
    ResourceId,
351
    {ChannelId, Msg},
352
    CallbackIn,
353
    #{pool_name := PoolName} = State
354
) ->
355
    ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => ResourceId}),
29✔
356
    Callback = {fun on_async_result/2, [CallbackIn]},
29✔
357
    Channels = maps:get(installed_channels, State),
29✔
358
    ChannelConfig = maps:get(ChannelId, Channels),
29✔
359
    case is_expected_to_have_workers(State) of
29✔
360
        true ->
361
            Result = with_egress_client(ChannelId, PoolName, send_async, [
29✔
362
                Msg, Callback, ChannelConfig
363
            ]),
364
            case Result of
29✔
365
                ok ->
366
                    ok;
×
367
                {ok, Pid} when is_pid(Pid) ->
368
                    {ok, Pid};
29✔
369
                {error, Reason} ->
370
                    {error, classify_error(Reason)}
×
371
            end;
372
        false ->
373
            {error, {unrecoverable_error, <<"This node has no assigned static clientid.">>}}
×
374
    end;
375
on_query_async(ResourceId, {_ChannelId, Msg}, _Callback, #{}) ->
376
    ?SLOG(error, #{
×
377
        msg => "forwarding_unavailable",
378
        resource_id => ResourceId,
379
        message => Msg,
380
        reason => "Egress is not configured"
381
    }).
×
382

383
with_egress_client(ActionID, ResourceId, Fun, Args) ->
384
    TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ActionID),
36✔
385
    ecpool:pick_and_do(
36✔
386
        ResourceId, {emqx_bridge_mqtt_egress, Fun, [TraceRenderedCTX | Args]}, no_handover
387
    ).
388

389
on_async_result(Callback, Result) ->
390
    apply_callback_function(Callback, handle_send_result(Result)).
28✔
391

392
apply_callback_function(F, Result) when is_function(F) ->
393
    erlang:apply(F, [Result]);
×
394
apply_callback_function({F, A}, Result) when is_function(F), is_list(A) ->
395
    erlang:apply(F, A ++ [Result]);
28✔
396
apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list(A) ->
397
    erlang:apply(M, F, A ++ [Result]).
×
398

399
handle_send_result(ok) ->
400
    ok;
29✔
401
handle_send_result({ok, #{reason_code := ?RC_SUCCESS}}) ->
402
    ok;
5✔
403
handle_send_result({ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}}) ->
404
    ok;
×
405
handle_send_result({ok, Reply}) ->
406
    {error, classify_reply(Reply)};
×
407
handle_send_result({error, Reason}) ->
408
    {error, classify_error(Reason)}.
1✔
409

410
classify_reply(Reply = #{reason_code := ?RC_PACKET_IDENTIFIER_IN_USE}) ->
411
    %% If `emqtt' client restarted, it may re-use packet ids that the remote broker still
412
    %% has memory of.  We should retry.
NEW
413
    {recoverable_error, Reply};
×
414
classify_reply(Reply = #{reason_code := _}) ->
415
    {unrecoverable_error, Reply}.
×
416

417
classify_error(disconnected = Reason) ->
418
    {recoverable_error, Reason};
1✔
419
classify_error(ecpool_empty) ->
420
    {recoverable_error, disconnected};
×
421
classify_error({disconnected, _RC, _} = Reason) ->
422
    {recoverable_error, Reason};
×
423
classify_error({shutdown, _} = Reason) ->
424
    {recoverable_error, Reason};
×
425
classify_error(shutdown = Reason) ->
426
    {recoverable_error, Reason};
×
427
classify_error(closed = Reason) ->
NEW
428
    {recoverable_error, Reason};
×
429
classify_error(tcp_closed = Reason) ->
NEW
430
    {recoverable_error, Reason};
×
431
classify_error(einval = Reason) ->
NEW
432
    {recoverable_error, Reason};
×
433
classify_error({unrecoverable_error, _Reason} = Error) ->
434
    Error;
×
435
classify_error(Reason) ->
436
    {unrecoverable_error, Reason}.
×
437

438
on_get_status(_ResourceId, State) ->
439
    Pools = maps:to_list(maps:with([pool_name], State)),
162✔
440
    Workers = [{Pool, Worker} || {Pool, PN} <- Pools, {_Name, Worker} <- ecpool:workers(PN)],
162✔
441
    try emqx_utils:pmap(fun get_status/1, Workers, ?HEALTH_CHECK_TIMEOUT) of
162✔
442
        Statuses ->
443
            combine_status(Statuses, State)
162✔
444
    catch
445
        exit:timeout ->
446
            ?status_connecting
×
447
    end.
448

449
get_status({_Pool, Worker}) ->
450
    case ecpool_worker:client(Worker) of
869✔
451
        {ok, Client} ->
452
            emqx_bridge_mqtt_ingress:status(Client);
859✔
453
        {error, _} ->
454
            ?status_disconnected
10✔
455
    end.
456

457
combine_status(Statuses, ConnState) ->
458
    %% NOTE
459
    %% Natural order of statuses: [connected, connecting, disconnected]
460
    %% * `disconnected` wins over any other status
461
    %% * `connecting` wins over `connected`
462
    #{available_clientids := AvailableClientids} = ConnState,
162✔
463
    ExpectedNoClientids =
162✔
464
        case AvailableClientids of
465
            _ when length(AvailableClientids) == 0 ->
466
                true;
5✔
467
            _ ->
468
                false
157✔
469
        end,
470
    ToStatus = fun
162✔
471
        ({S, _Reason}) -> S;
×
472
        (S) when is_atom(S) -> S
1,428✔
473
    end,
474
    CompareFn = fun(S1A, S2A) ->
162✔
475
        S1 = ToStatus(S1A),
714✔
476
        S2 = ToStatus(S2A),
714✔
477
        S1 > S2
714✔
478
    end,
479
    case lists:usort(CompareFn, Statuses) of
162✔
480
        [{Status, Reason} | _] ->
481
            case explain_error(Reason) of
×
482
                undefined -> Status;
×
483
                Msg -> {Status, Msg}
×
484
            end;
485
        [Status | _] ->
486
            Status;
157✔
487
        [] when ExpectedNoClientids ->
488
            {?status_disconnected,
5✔
489
                {unhealthy_target, <<"Connector has no assigned static clientids">>}};
490
        [] ->
491
            ?status_disconnected
×
492
    end.
493

494
mk_ingress_config(
495
    ChannelId,
496
    IngressChannelConfig,
497
    TopicToHandlerIndex
498
) ->
499
    HookPoints = maps:get(hookpoints, IngressChannelConfig, []),
26✔
500
    NewConf = IngressChannelConfig#{
26✔
501
        on_message_received => {?MODULE, on_message_received, [HookPoints, ChannelId]},
502
        ingress_list => [IngressChannelConfig]
503
    },
504
    emqx_bridge_mqtt_ingress:config(NewConf, ChannelId, TopicToHandlerIndex).
26✔
505

506
mk_ecpool_client_opts(
507
    ResourceId,
508
    Config = #{
509
        server := Server,
510
        keepalive := KeepAlive,
511
        ssl := #{enable := EnableSsl} = Ssl
512
    }
513
) ->
514
    HostPort = emqx_bridge_mqtt_connector_schema:parse_server(Server),
163✔
515
    Options = maps:with(
163✔
516
        [
517
            proto_ver,
518
            username,
519
            password,
520
            clean_start,
521
            retry_interval,
522
            max_inflight,
523
            % Opening a connection in bridge mode will form a non-standard mqtt connection message.
524
            % A load balancing server (such as haproxy) is often set up before the emqx broker server.
525
            % When the load balancing server enables mqtt connection packet inspection,
526
            % non-standard mqtt connection packets might be filtered out by LB.
527
            bridge_mode,
528
            topic_to_handler_index
529
        ],
530
        Config
531
    ),
532
    Name = parse_id_to_name(ResourceId),
163✔
533
    mk_client_opt_password(Options#{
163✔
534
        hosts => [HostPort],
535
        clientid => clientid(Name, Config),
536
        connect_timeout => 30,
537
        keepalive => ms_to_s(KeepAlive),
538
        force_ping => true,
539
        ssl => EnableSsl,
540
        ssl_opts => maps:to_list(maps:remove(enable, Ssl))
541
    }).
542

543
parse_id_to_name(Id) ->
544
    {_Type, Name} = emqx_connector_resource:parse_connector_id(Id, #{atom_name => false}),
163✔
545
    Name.
163✔
546

547
mk_client_opt_password(Options = #{password := Secret}) ->
548
    %% TODO: Teach `emqtt` to accept 0-arity closures as passwords.
549
    Options#{password := emqx_secret:unwrap(Secret)};
70✔
550
mk_client_opt_password(Options) ->
551
    Options.
93✔
552

553
ms_to_s(Ms) ->
554
    erlang:ceil(Ms / 1000).
163✔
555

556
clientid(Name, _Conf = #{clientid_prefix := Prefix}) when
557
    is_binary(Prefix) andalso Prefix =/= <<>>
558
->
559
    {Prefix, emqx_bridge_mqtt_lib:clientid_base(Name)};
3✔
560
clientid(Name, _Conf) ->
561
    {?NO_PREFIX, emqx_bridge_mqtt_lib:clientid_base(Name)}.
160✔
562

563
%% @doc Start an ingress bridge worker.
564
-spec connect([option() | {ecpool_worker_id, pos_integer()}]) ->
565
    {ok, pid()} | {error, _Reason}.
566
connect(Options) ->
567
    WorkerId = proplists:get_value(ecpool_worker_id, Options),
1,128✔
568
    ?SLOG(debug, #{
1,128✔
569
        msg => "ingress_client_starting",
570
        options => emqx_utils:redact(Options)
571
    }),
1,128✔
572
    Name = proplists:get_value(name, Options),
1,128✔
573
    ClientOpts = proplists:get_value(client_opts, Options),
1,128✔
574
    AvailableClientids = proplists:get_value(available_clientids, Options),
1,128✔
575
    case emqtt:start_link(mk_emqtt_client_opts(Name, WorkerId, AvailableClientids, ClientOpts)) of
1,128✔
576
        {ok, Pid} ->
577
            connect(Pid, Name);
1,128✔
578
        {error, Reason} = Error ->
579
            IsDryRun = emqx_resource:is_dry_run(Name),
×
580
            ?SLOG(?LOG_LEVEL(IsDryRun), #{
×
581
                msg => "client_start_failed",
582
                resource_id => Name,
583
                config => emqx_utils:redact(ClientOpts),
584
                reason => Reason
585
            }),
×
586
            Error
×
587
    end.
588

589
mk_emqtt_client_opts(
590
    Name,
591
    WorkerId,
592
    AvailableClientids,
593
    ClientOpts = #{
594
        topic_to_handler_index := TopicToHandlerIndex
595
    }
596
) ->
597
    %% WorkerId :: 1..inf
598
    ClientOpts#{
1,128✔
599
        clientid := lists:nth(WorkerId, AvailableClientids),
600
        msg_handler => mk_client_event_handler(Name, TopicToHandlerIndex)
601
    }.
602

603
mk_clientid(WorkerId, {Prefix, ClientId}) when ?IS_NO_PREFIX(Prefix) ->
604
    %% When there is no prefix, try to keep the client ID length within 23 bytes
605
    emqx_bridge_mqtt_lib:bytes23(ClientId, WorkerId);
1,114✔
606
mk_clientid(WorkerId, {Prefix, ClientId}) when
607
    size(Prefix) =< ?MAX_PREFIX_BYTES
608
->
609
    %% Try to respect client ID prefix when it's no more than 19 bytes,
610
    %% meaning there are at least 4 bytes as hash space.
611
    emqx_bridge_mqtt_lib:bytes23_with_prefix(Prefix, ClientId, WorkerId);
2✔
612
mk_clientid(WorkerId, {Prefix, ClientId}) ->
613
    %% There is no other option but to use a long client ID
614
    iolist_to_binary([Prefix, ClientId, $:, integer_to_binary(WorkerId)]).
1✔
615

616
mk_client_event_handler(Name, TopicToHandlerIndex) ->
617
    #{
1,128✔
618
        publish => {fun emqx_bridge_mqtt_ingress:handle_publish/3, [Name, TopicToHandlerIndex]},
619
        disconnected => {fun ?MODULE:handle_disconnect/1, []}
620
    }.
621

622
-spec connect(pid(), name()) ->
623
    {ok, pid()} | {error, _Reason}.
624
connect(Pid, Name) ->
625
    case emqtt:connect(Pid) of
1,128✔
626
        {ok, _Props} ->
627
            {ok, Pid};
393✔
628
        {error, Reason} = Error ->
629
            IsDryRun = emqx_resource:is_dry_run(Name),
735✔
630
            log_connect_error_reason(?LOG_LEVEL(IsDryRun), Reason, Name),
735✔
631
            _ = catch emqtt:stop(Pid),
735✔
632
            Error
735✔
633
    end.
634

635
log_connect_error_reason(Level, {tcp_closed, _} = Reason, Name) ->
636
    ?tp(emqx_bridge_mqtt_connector_tcp_closed, #{}),
177✔
637
    ?SLOG(Level, #{
177✔
638
        msg => "ingress_client_connect_failed",
639
        reason => Reason,
640
        name => Name,
641
        explain => explain_error(Reason)
642
    });
×
643
log_connect_error_reason(Level, econnrefused = Reason, Name) ->
644
    ?tp(emqx_bridge_mqtt_connector_econnrefused_error, #{}),
285✔
645
    ?SLOG(Level, #{
285✔
646
        msg => "ingress_client_connect_failed",
647
        reason => Reason,
648
        name => Name,
649
        explain => explain_error(Reason)
650
    });
32✔
651
log_connect_error_reason(Level, Reason, Name) ->
652
    ?SLOG(Level, #{
273✔
653
        msg => "ingress_client_connect_failed",
654
        reason => Reason,
655
        name => Name
656
    }).
80✔
657

658
explain_error(econnrefused) ->
659
    <<
297✔
660
        "Connection refused. "
661
        "This error indicates that your connection attempt to the MQTT server was rejected. "
662
        "In simpler terms, the server you tried to connect to refused your request. "
663
        "There can be multiple reasons for this. "
664
        "For example, the MQTT server you're trying to connect to might be down or not "
665
        "running at all or you might have provided the wrong address "
666
        "or port number for the server."
667
    >>;
668
explain_error({tcp_closed, _}) ->
669
    <<
188✔
670
        "Your MQTT connection attempt was unsuccessful. "
671
        "It might be at its maximum capacity for handling new connections. "
672
        "To diagnose the issue further, you can check the server logs for "
673
        "any specific messages related to the unavailability or connection limits."
674
    >>;
675
explain_error(_Reason) ->
676
    undefined.
35✔
677

678
handle_disconnect(_Reason) ->
679
    ok.
1,112✔
680

681
take(Key, Map0, Default) ->
682
    case maps:take(Key, Map0) of
71✔
683
        {Value, Map} ->
684
            {Value, Map};
71✔
685
        error ->
686
            {Default, Map0}
×
687
    end.
688

689
is_expected_to_have_workers(#{available_clientids := []} = _ConnState) ->
690
    false;
×
691
is_expected_to_have_workers(_ConnState) ->
692
    true.
36✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc