• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 12259797823

10 Dec 2024 03:51PM UTC coverage: 81.991%. First build
12259797823

Pull #14375

github

web-flow
Merge 36d1c250a into 5c1e98911
Pull Request #14375: feat(kafka consumer): expose more error details when doing health checks / dry runs

8 of 12 new or added lines in 1 file covered. (66.67%)

56405 of 68794 relevant lines covered (81.99%)

15360.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.01
/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%--------------------------------------------------------------------
4
-module(emqx_bridge_kafka_impl_consumer).
5

6
-behaviour(emqx_resource).
7

8
%% `emqx_resource' API
9
-export([
10
    resource_type/0,
11
    callback_mode/0,
12
    query_mode/1,
13
    on_start/2,
14
    on_stop/2,
15
    on_get_status/2,
16

17
    on_add_channel/4,
18
    on_remove_channel/3,
19
    on_get_channels/1,
20
    on_get_channel_status/3
21
]).
22

23
%% `brod_group_consumer' API
24
-export([
25
    init/2,
26
    handle_message/2
27
]).
28

29
-ifdef(TEST).
30
-export([consumer_group_id/2]).
31
-endif.
32

33
-include_lib("emqx/include/logger.hrl").
34
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
35
%% needed for the #kafka_message record definition
36
-include_lib("brod/include/brod.hrl").
37
-include_lib("emqx_resource/include/emqx_resource.hrl").
38

39
-type connector_config() :: #{
40
    authentication := term(),
41
    bootstrap_hosts := binary(),
42
    connector_name := atom() | binary(),
43
    connector_type := atom() | binary(),
44
    socket_opts := _,
45
    ssl := _,
46
    any() => term()
47
}.
48
-type source_config() :: #{
49
    bridge_name := atom(),
50
    hookpoints := [binary()],
51
    parameters := source_parameters()
52
}.
53
-type source_parameters() :: #{
54
    group_id => binary(),
55
    key_encoding_mode := encoding_mode(),
56
    max_batch_bytes := emqx_schema:bytesize(),
57
    max_wait_time := non_neg_integer(),
58
    max_rejoin_attempts := non_neg_integer(),
59
    offset_commit_interval_seconds := pos_integer(),
60
    offset_reset_policy := offset_reset_policy(),
61
    topic := kafka_topic(),
62
    value_encoding_mode := encoding_mode(),
63
    topic_mapping => [one_topic_mapping()]
64
}.
65
-type one_topic_mapping() :: #{
66
    kafka_topic => kafka_topic(),
67
    mqtt_topic => emqx_types:topic(),
68
    qos => emqx_types:qos(),
69
    payload_template => string()
70
}.
71
-type subscriber_id() :: emqx_bridge_kafka_consumer_sup:child_id().
72
-type kafka_topic() :: brod:topic().
73
-type kafka_message() :: #kafka_message{}.
74
-type connector_state() :: #{
75
    kafka_client_id := brod:client_id(),
76
    installed_sources := #{source_resource_id() => source_state()}
77
}.
78
-type source_state() :: #{
79
    subscriber_id := subscriber_id(),
80
    kafka_client_id := brod:client_id(),
81
    kafka_topics := [kafka_topic()]
82
}.
83
-type offset_reset_policy() :: latest | earliest.
84
-type encoding_mode() :: none | base64.
85
-type consumer_init_data() :: #{
86
    hookpoints := [binary()],
87
    key_encoding_mode := encoding_mode(),
88
    resource_id := source_resource_id(),
89
    topic_mapping := #{
90
        kafka_topic() := #{
91
            payload_template => emqx_placeholder:tmpl_token(),
92
            mqtt_topic_template => emqx_placeholder:tmpl_token(),
93
            qos => emqx_types:qos()
94
        }
95
    },
96
    value_encoding_mode := encoding_mode()
97
}.
98
-type consumer_state() :: #{
99
    hookpoints := [binary()],
100
    kafka_topic := kafka_topic(),
101
    key_encoding_mode := encoding_mode(),
102
    resource_id := source_resource_id(),
103
    topic_mapping := #{
104
        kafka_topic() := #{
105
            payload_template => emqx_placeholder:tmpl_token(),
106
            mqtt_topic_template => emqx_placeholder:tmpl_token(),
107
            qos => emqx_types:qos()
108
        }
109
    },
110
    value_encoding_mode := encoding_mode()
111
}.
112
-type subscriber_init_info() :: #{
113
    topic := brod:topic(),
114
    parition => brod:partition(),
115
    group_id => brod:group_id(),
116
    commit_fun => brod_group_subscriber_v2:commit_fun()
117
}.
118

119
-define(CLIENT_DOWN_MESSAGE,
120
    "Failed to start Kafka client. Please check the logs for errors and check"
121
    " the connection parameters."
122
).
123

124
%% Allocatable resources
125
-define(kafka_client_id, kafka_client_id).
126
-define(kafka_subscriber_id, kafka_subscriber_id).
127

128
%%-------------------------------------------------------------------------------------
129
%% `emqx_resource' API
130
%%-------------------------------------------------------------------------------------
131
resource_type() -> kafka_consumer.
112✔
132

133
callback_mode() ->
134
    async_if_possible.
112✔
135

136
%% consumer bridges don't need resource workers
137
query_mode(_Config) ->
138
    no_queries.
252✔
139

140
-spec on_start(connector_resource_id(), connector_config()) -> {ok, connector_state()}.
141
on_start(ConnectorResId, Config) ->
142
    #{
111✔
143
        authentication := Auth,
144
        bootstrap_hosts := BootstrapHosts0,
145
        connector_type := ConnectorType,
146
        connector_name := ConnectorName,
147
        socket_opts := SocketOpts0,
148
        ssl := SSL
149
    } = Config,
150
    BootstrapHosts = emqx_bridge_kafka_impl:hosts(BootstrapHosts0),
111✔
151
    %% Note: this is distinct per node.
152
    ClientID = make_client_id(ConnectorResId, ConnectorType, ConnectorName),
111✔
153
    ClientOpts0 =
111✔
154
        case Auth of
155
            none -> [];
44✔
156
            Auth -> [{sasl, emqx_bridge_kafka_impl:sasl(Auth)}]
67✔
157
        end,
158
    ClientOpts = add_ssl_opts(ClientOpts0, SSL),
111✔
159
    SocketOpts = emqx_bridge_kafka_impl:socket_opts(SocketOpts0),
111✔
160
    ClientOpts1 = [{extra_sock_opts, SocketOpts} | ClientOpts],
111✔
161
    ok = emqx_resource:allocate_resource(ConnectorResId, ?kafka_client_id, ClientID),
111✔
162
    case brod:start_client(BootstrapHosts, ClientID, ClientOpts1) of
111✔
163
        ok ->
164
            ?tp(
110✔
165
                kafka_consumer_client_started,
166
                #{client_id => ClientID, resource_id => ConnectorResId}
167
            ),
168
            ?SLOG(info, #{
110✔
169
                msg => "kafka_consumer_client_started",
170
                resource_id => ConnectorResId,
171
                kafka_hosts => BootstrapHosts
172
            });
110✔
173
        {error, Reason} ->
174
            ?SLOG(error, #{
×
175
                msg => "failed_to_start_kafka_consumer_client",
176
                resource_id => ConnectorResId,
177
                kafka_hosts => BootstrapHosts,
178
                reason => emqx_utils:redact(Reason)
179
            }),
×
180
            throw(?CLIENT_DOWN_MESSAGE)
×
181
    end,
182
    {ok, #{
110✔
183
        kafka_client_id => ClientID,
184
        installed_sources => #{}
185
    }}.
186

187
-spec on_stop(connector_resource_id(), connector_state()) -> ok.
188
on_stop(ConnectorResId, _State = undefined) ->
189
    SubscribersStopped =
8✔
190
        maps:fold(
191
            fun
192
                (?kafka_client_id, ClientID, Acc) ->
193
                    stop_client(ClientID),
8✔
194
                    Acc;
8✔
195
                ({?kafka_subscriber_id, _SourceResId}, SubscriberId, Acc) ->
196
                    stop_subscriber(SubscriberId),
6✔
197
                    Acc + 1
6✔
198
            end,
199
            0,
200
            emqx_resource:get_allocated_resources(ConnectorResId)
201
        ),
202
    case SubscribersStopped > 0 of
8✔
203
        true ->
204
            ?tp(kafka_consumer_subcriber_and_client_stopped, #{}),
6✔
205
            ok;
6✔
206
        false ->
207
            ?tp(kafka_consumer_just_client_stopped, #{}),
2✔
208
            ok
2✔
209
    end;
210
on_stop(ConnectorResId, State) ->
211
    #{
97✔
212
        installed_sources := InstalledSources,
213
        kafka_client_id := ClientID
214
    } = State,
215
    maps:foreach(
97✔
216
        fun(_SourceResId, #{subscriber_id := SubscriberId}) ->
217
            stop_subscriber(SubscriberId)
8✔
218
        end,
219
        InstalledSources
220
    ),
221
    stop_client(ClientID),
97✔
222
    ?tp(kafka_consumer_subcriber_and_client_stopped, #{instance_id => ConnectorResId}),
97✔
223
    ok.
97✔
224

225
-spec on_get_status(connector_resource_id(), connector_state()) ->
226
    ?status_connected | ?status_disconnected.
227
on_get_status(_ConnectorResId, #{kafka_client_id := ClientID}) ->
228
    case whereis(ClientID) of
316✔
229
        Pid when is_pid(Pid) ->
230
            check_client_connectivity(Pid);
305✔
231
        _ ->
232
            ?status_disconnected
11✔
233
    end;
234
on_get_status(_ConnectorResId, _State) ->
235
    ?status_disconnected.
×
236

237
-spec on_add_channel(
238
    connector_resource_id(),
239
    connector_state(),
240
    source_resource_id(),
241
    source_config()
242
) ->
243
    {ok, connector_state()}.
244
on_add_channel(ConnectorResId, ConnectorState0, SourceResId, SourceConfig) ->
245
    #{
104✔
246
        kafka_client_id := ClientID,
247
        installed_sources := InstalledSources0
248
    } = ConnectorState0,
249
    case start_consumer(SourceConfig, ConnectorResId, SourceResId, ClientID, ConnectorState0) of
104✔
250
        {ok, SourceState} ->
251
            InstalledSources = InstalledSources0#{SourceResId => SourceState},
101✔
252
            ConnectorState = ConnectorState0#{installed_sources := InstalledSources},
101✔
253
            {ok, ConnectorState};
101✔
254
        Error = {error, _} ->
255
            Error
×
256
    end.
257

258
-spec on_remove_channel(
259
    connector_resource_id(),
260
    connector_state(),
261
    source_resource_id()
262
) ->
263
    {ok, connector_state()}.
264
on_remove_channel(ConnectorResId, ConnectorState0, SourceResId) ->
265
    #{installed_sources := InstalledSources0} = ConnectorState0,
82✔
266
    case maps:take(SourceResId, InstalledSources0) of
82✔
267
        {SourceState, InstalledSources} ->
268
            #{subscriber_id := SubscriberId} = SourceState,
82✔
269
            stop_subscriber(SubscriberId),
82✔
270
            deallocate_subscriber_id(ConnectorResId, SourceResId),
82✔
271
            ok;
82✔
272
        error ->
273
            InstalledSources = InstalledSources0
×
274
    end,
275
    ConnectorState = ConnectorState0#{installed_sources := InstalledSources},
82✔
276
    {ok, ConnectorState}.
82✔
277

278
-spec on_get_channels(connector_resource_id()) ->
279
    [{action_resource_id(), source_config()}].
280
on_get_channels(ConnectorResId) ->
281
    emqx_bridge_v2:get_channels_for_connector(ConnectorResId).
505✔
282

283
-spec on_get_channel_status(
284
    connector_resource_id(),
285
    source_resource_id(),
286
    connector_state()
287
) ->
288
    ?status_connected | {?status_disconnected | ?status_connecting, _Msg :: binary()}.
289
on_get_channel_status(
290
    _ConnectorResId,
291
    SourceResId,
292
    ConnectorState = #{installed_sources := InstalledSources}
293
) when is_map_key(SourceResId, InstalledSources) ->
294
    #{kafka_client_id := ClientID} = ConnectorState,
188✔
295
    #{
188✔
296
        kafka_topics := KafkaTopics,
297
        subscriber_id := SubscriberId
298
    } = maps:get(SourceResId, InstalledSources),
299
    do_get_status(ClientID, KafkaTopics, SubscriberId);
188✔
300
on_get_channel_status(_ConnectorResId, _SourceResId, _ConnectorState) ->
301
    ?status_disconnected.
×
302

303
%%-------------------------------------------------------------------------------------
304
%% `brod_group_subscriber' API
305
%%-------------------------------------------------------------------------------------
306

307
-spec init(subscriber_init_info(), consumer_init_data()) -> {ok, consumer_state()}.
308
init(GroupData, State0) ->
309
    ?tp(kafka_consumer_subscriber_init, #{group_data => GroupData, state => State0}),
247✔
310
    #{topic := KafkaTopic} = GroupData,
247✔
311
    State = State0#{kafka_topic => KafkaTopic},
247✔
312
    {ok, State}.
247✔
313

314
-spec handle_message(kafka_message(), consumer_state()) -> {ok, commit, consumer_state()}.
315
handle_message(Message, State) ->
316
    ?tp_span(
956✔
317
        kafka_consumer_handle_message,
318
        #{message => Message, state => State},
956✔
319
        do_handle_message(Message, State)
956✔
320
    ).
321

322
do_handle_message(Message, State) ->
323
    #{
956✔
324
        hookpoints := Hookpoints,
325
        kafka_topic := KafkaTopic,
326
        key_encoding_mode := KeyEncodingMode,
327
        resource_id := SourceResId,
328
        topic_mapping := TopicMapping,
329
        value_encoding_mode := ValueEncodingMode
330
    } = State,
331
    FullMessage = #{
956✔
332
        headers => maps:from_list(Message#kafka_message.headers),
333
        key => encode(Message#kafka_message.key, KeyEncodingMode),
334
        offset => Message#kafka_message.offset,
335
        topic => KafkaTopic,
336
        ts => Message#kafka_message.ts,
337
        ts_type => Message#kafka_message.ts_type,
338
        value => encode(Message#kafka_message.value, ValueEncodingMode)
339
    },
340
    LegacyMQTTConfig = maps:get(KafkaTopic, TopicMapping, #{}),
956✔
341
    legacy_maybe_publish_mqtt_message(LegacyMQTTConfig, SourceResId, FullMessage),
956✔
342
    lists:foreach(fun(Hookpoint) -> emqx_hooks:run(Hookpoint, [FullMessage]) end, Hookpoints),
956✔
343
    emqx_resource_metrics:received_inc(SourceResId),
956✔
344
    %% note: just `ack' does not commit the offset to the
345
    %% kafka consumer group.
346
    {ok, commit, State}.
956✔
347

348
legacy_maybe_publish_mqtt_message(
349
    _MQTTConfig = #{
350
        payload_template := PayloadTemplate,
351
        qos := MQTTQoS,
352
        mqtt_topic_template := MQTTTopicTemplate
353
    },
354
    SourceResId,
355
    FullMessage
356
) when MQTTTopicTemplate =/= <<>> ->
357
    Payload = render(FullMessage, PayloadTemplate),
955✔
358
    MQTTTopic = render(FullMessage, MQTTTopicTemplate),
955✔
359
    MQTTMessage = emqx_message:make(SourceResId, MQTTQoS, MQTTTopic, Payload),
955✔
360
    _ = emqx_broker:safe_publish(MQTTMessage),
955✔
361
    ok;
955✔
362
legacy_maybe_publish_mqtt_message(_MQTTConfig, _SourceResId, _FullMessage) ->
363
    ok.
1✔
364

365
%%-------------------------------------------------------------------------------------
366
%% Helper fns
367
%%-------------------------------------------------------------------------------------
368

369
add_ssl_opts(ClientOpts, #{enable := false}) ->
370
    ClientOpts;
74✔
371
add_ssl_opts(ClientOpts, SSL) ->
372
    [{ssl, emqx_tls_lib:to_client_opts(SSL)} | ClientOpts].
37✔
373

374
-spec make_subscriber_id(atom() | binary()) -> emqx_bridge_kafka_consumer_sup:child_id().
375
make_subscriber_id(BridgeName) ->
376
    BridgeNameBin = to_bin(BridgeName),
103✔
377
    <<"kafka_subscriber:", BridgeNameBin/binary>>.
103✔
378

379
-spec start_consumer(
380
    source_config(),
381
    connector_resource_id(),
382
    source_resource_id(),
383
    brod:client_id(),
384
    connector_state()
385
) ->
386
    {ok, source_state()} | {error, term()}.
387
start_consumer(Config, ConnectorResId, SourceResId, ClientID, ConnState) ->
388
    #{
104✔
389
        bridge_name := BridgeName,
390
        hookpoints := Hookpoints,
391
        parameters := #{
392
            key_encoding_mode := KeyEncodingMode,
393
            max_batch_bytes := MaxBatchBytes,
394
            max_wait_time := MaxWaitTime,
395
            max_rejoin_attempts := MaxRejoinAttempts,
396
            offset_commit_interval_seconds := OffsetCommitInterval,
397
            offset_reset_policy := OffsetResetPolicy0,
398
            topic := _Topic,
399
            value_encoding_mode := ValueEncodingMode
400
        } = Params0
401
    } = Config,
402
    ?tp(kafka_consumer_sup_started, #{}),
104✔
403
    TopicMapping = ensure_topic_mapping(Params0),
104✔
404
    InitialState = #{
104✔
405
        key_encoding_mode => KeyEncodingMode,
406
        hookpoints => Hookpoints,
407
        resource_id => SourceResId,
408
        topic_mapping => TopicMapping,
409
        value_encoding_mode => ValueEncodingMode
410
    },
411
    %% note: the group id should be the same for all nodes in the
412
    %% cluster, so that the load gets distributed between all
413
    %% consumers and we don't repeat messages in the same cluster.
414
    GroupID = consumer_group_id(Params0, BridgeName),
104✔
415
    %% earliest or latest
416
    BeginOffset = OffsetResetPolicy0,
104✔
417
    OffsetResetPolicy =
104✔
418
        case OffsetResetPolicy0 of
419
            latest -> reset_to_latest;
56✔
420
            earliest -> reset_to_earliest
48✔
421
        end,
422
    ConsumerConfig = [
104✔
423
        {begin_offset, BeginOffset},
424
        {max_bytes, MaxBatchBytes},
425
        {max_wait_time, MaxWaitTime},
426
        {offset_reset_policy, OffsetResetPolicy}
427
    ],
428
    GroupConfig = [
104✔
429
        {max_rejoin_attempts, MaxRejoinAttempts},
430
        {offset_commit_interval_seconds, OffsetCommitInterval}
431
    ],
432
    KafkaTopics = maps:keys(TopicMapping),
104✔
433
    ensure_no_repeated_topics(KafkaTopics, ConnState),
104✔
434
    GroupSubscriberConfig =
103✔
435
        #{
436
            client => ClientID,
437
            group_id => GroupID,
438
            topics => KafkaTopics,
439
            cb_module => ?MODULE,
440
            init_data => InitialState,
441
            message_type => message,
442
            consumer_config => ConsumerConfig,
443
            group_config => GroupConfig
444
        },
445
    %% Below, we spawn a single `brod_group_consumer_v2' worker, with
446
    %% no option for a pool of those. This is because that worker
447
    %% spawns one worker for each assigned topic-partition
448
    %% automatically, so we should not spawn duplicate workers.
449
    SubscriberId = make_subscriber_id(BridgeName),
103✔
450
    ?tp(kafka_consumer_about_to_start_subscriber, #{}),
103✔
451
    ok = allocate_subscriber_id(ConnectorResId, SourceResId, SubscriberId),
102✔
452
    ?tp(kafka_consumer_subscriber_allocated, #{}),
102✔
453
    case emqx_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of
102✔
454
        {ok, _ConsumerPid} ->
455
            ?tp(
102✔
456
                kafka_consumer_subscriber_started,
457
                #{resource_id => SourceResId, subscriber_id => SubscriberId}
458
            ),
459
            {ok, #{
101✔
460
                subscriber_id => SubscriberId,
461
                kafka_client_id => ClientID,
462
                kafka_topics => KafkaTopics
463
            }};
464
        {error, Reason} ->
465
            ?SLOG(error, #{
×
466
                msg => "failed_to_start_kafka_consumer",
467
                resource_id => SourceResId,
468
                reason => emqx_utils:redact(Reason)
469
            }),
×
470
            {error, Reason}
×
471
    end.
472

473
%% Currently, brod treats a consumer process to a specific topic as a singleton (per
474
%% client id / connector), meaning that the first subscriber to a given topic will define
475
%% the consumer options for all other consumers, and those options persist even after the
476
%% original consumer group is terminated.  We enforce that, if the user wants to consume
477
%% multiple times from the same topic, then they must create a different connector.
478
ensure_no_repeated_topics(KafkaTopics, ConnState) ->
479
    #{installed_sources := Sources} = ConnState,
104✔
480
    InstalledTopics = lists:flatmap(fun(#{kafka_topics := Ts}) -> Ts end, maps:values(Sources)),
104✔
481
    case KafkaTopics -- InstalledTopics of
104✔
482
        KafkaTopics ->
483
            %% all new topics
484
            ok;
103✔
485
        NewTopics ->
486
            ExistingTopics0 = KafkaTopics -- NewTopics,
1✔
487
            ExistingTopics = lists:join(<<", ">>, ExistingTopics0),
1✔
488
            Message = iolist_to_binary([
1✔
489
                <<"Topics ">>,
490
                ExistingTopics,
491
                <<" already exist in other sources associated with this connector.">>,
492
                <<" If you want to repeat topics, create new connector and source(s).">>
493
            ]),
494
            throw(Message)
1✔
495
    end.
496

497
%% This is to ensure backwards compatibility with the deprectated topic mapping.
498
-spec ensure_topic_mapping(source_parameters()) -> #{kafka_topic() := map()}.
499
ensure_topic_mapping(#{topic_mapping := [_ | _] = TM}) ->
500
    %% There is an existing topic mapping: legacy config.  We use it and ignore the single
501
    %% pubsub topic so that the bridge keeps working as before.
502
    convert_topic_mapping(TM);
72✔
503
ensure_topic_mapping(#{topic := KafkaTopic}) ->
504
    %% No topic mapping: generate one without MQTT templates.
505
    #{KafkaTopic => #{}}.
32✔
506

507
-spec stop_subscriber(emqx_bridge_kafka_consumer_sup:child_id()) -> ok.
508
stop_subscriber(SubscriberId) ->
509
    _ = log_when_error(
96✔
510
        fun() ->
511
            try
96✔
512
                emqx_bridge_kafka_consumer_sup:ensure_child_deleted(SubscriberId)
96✔
513
            catch
514
                exit:{noproc, _} ->
515
                    %% may happen when node is shutting down
516
                    ok
×
517
            end
518
        end,
519
        #{
520
            msg => "failed_to_delete_kafka_subscriber",
521
            subscriber_id => SubscriberId
522
        }
523
    ),
524
    ok.
96✔
525

526
-spec stop_client(brod:client_id()) -> ok.
527
stop_client(ClientID) ->
528
    _ = log_when_error(
105✔
529
        fun() ->
530
            brod:stop_client(ClientID)
105✔
531
        end,
532
        #{
533
            msg => "failed_to_delete_kafka_consumer_client",
534
            client_id => ClientID
535
        }
536
    ),
537
    ok.
105✔
538

539
do_get_status(ClientID, [KafkaTopic | RestTopics], SubscriberId) ->
540
    case brod:get_partitions_count(ClientID, KafkaTopic) of
191✔
541
        {ok, NPartitions} ->
542
            case do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) of
189✔
543
                ?status_connected ->
544
                    do_get_status(ClientID, RestTopics, SubscriberId);
188✔
545
                {Status, Message} when Status =/= ?status_connected ->
546
                    {Status, Message}
1✔
547
            end;
548
        {error, {client_down, Context}} ->
549
            case infer_client_error(Context) of
×
550
                auth_error ->
551
                    Message = "Authentication error. " ++ ?CLIENT_DOWN_MESSAGE,
×
552
                    {?status_disconnected, Message};
×
553
                {auth_error, Message0} ->
554
                    Message = binary_to_list(Message0) ++ "; " ++ ?CLIENT_DOWN_MESSAGE,
×
555
                    {?status_disconnected, Message};
×
556
                connection_refused ->
557
                    Message = "Connection refused. " ++ ?CLIENT_DOWN_MESSAGE,
×
558
                    {?status_disconnected, Message};
×
559
                _ ->
560
                    {?status_disconnected, ?CLIENT_DOWN_MESSAGE}
×
561
            end;
562
        {error, leader_not_available} ->
563
            Message =
2✔
564
                "Leader connection not available. Please check the Kafka topic used,"
565
                " the connection parameters and Kafka cluster health",
566
            {?status_disconnected, Message};
2✔
567
        _ ->
568
            ?status_disconnected
×
569
    end;
570
do_get_status(_ClientID, _KafkaTopics = [], _SubscriberId) ->
571
    ?status_connected.
185✔
572

573
-spec do_get_topic_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) ->
574
    ?status_connected | {?status_disconnected | ?status_connecting, _Msg :: binary()}.
575
do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
576
    Results =
189✔
577
        lists:map(
578
            fun(N) ->
579
                {N, brod_client:get_leader_connection(ClientID, KafkaTopic, N)}
461✔
580
            end,
581
            lists:seq(0, NPartitions - 1)
582
        ),
583
    WorkersAlive = are_subscriber_workers_alive(SubscriberId),
189✔
584
    case check_leader_connection_results(Results) of
189✔
585
        ok when WorkersAlive ->
586
            ?status_connected;
188✔
587
        {error, no_leaders} ->
NEW
588
            {?status_disconnected, <<"No leaders available (no partitions?)">>};
×
589
        {error, {N, Reason}} ->
590
            Msg = iolist_to_binary(
1✔
591
                io_lib:format(
592
                    "Leader for partition ~b unavailable; reason: ~0p",
593
                    [N, emqx_utils:redact(Reason)]
594
                )
595
            ),
596
            {?status_disconnected, Msg};
1✔
597
        ok when not WorkersAlive ->
NEW
598
            {?status_connecting, <<"Subscription workers restarting">>}
×
599
    end.
600

601
check_leader_connection_results(Results) ->
602
    emqx_utils:foldl_while(
189✔
603
        fun
604
            ({_N, {ok, _}}, _Acc) ->
605
                {cont, ok};
458✔
606
            ({N, {error, Reason}}, _Acc) ->
607
                {halt, {error, {N, Reason}}}
1✔
608
        end,
609
        {error, no_leaders},
610
        Results
611
    ).
612

613
are_subscriber_workers_alive(SubscriberId) ->
614
    try
189✔
615
        Children = supervisor:which_children(emqx_bridge_kafka_consumer_sup),
189✔
616
        case lists:keyfind(SubscriberId, 1, Children) of
189✔
617
            false ->
618
                false;
×
619
            {_, undefined, _, _} ->
NEW
620
                false;
×
621
            {_, Pid, _, _} when is_pid(Pid) ->
622
                Workers = brod_group_subscriber_v2:get_workers(Pid),
189✔
623
                %% we can't enforce the number of partitions on a single
624
                %% node, as the group might be spread across an emqx
625
                %% cluster.
626
                lists:all(fun is_process_alive/1, maps:values(Workers))
189✔
627
        end
628
    catch
629
        exit:{noproc, _} ->
NEW
630
            false;
×
631
        exit:{shutdown, _} ->
632
            %% may happen if node is shutting down
633
            false
×
634
    end.
635

636
log_when_error(Fun, Log) ->
637
    try
201✔
638
        Fun()
201✔
639
    catch
640
        C:E ->
641
            ?SLOG(error, Log#{
×
642
                exception => C,
643
                reason => E
644
            })
×
645
    end.
646

647
-spec consumer_group_id(#{group_id => binary(), any() => term()}, atom() | binary()) -> binary().
648
consumer_group_id(#{group_id := GroupId}, _BridgeName) when
649
    is_binary(GroupId) andalso GroupId =/= <<"">>
650
->
651
    GroupId;
1✔
652
consumer_group_id(_ConsumerParams, BridgeName0) ->
653
    BridgeName = to_bin(BridgeName0),
113✔
654
    <<"emqx-kafka-consumer-", BridgeName/binary>>.
113✔
655

656
-spec check_client_connectivity(pid()) ->
657
    ?status_connected
658
    | ?status_disconnected
659
    | {?status_disconnected, term()}.
660
check_client_connectivity(ClientPid) ->
661
    %% We use a fake group id just to probe the connection, as `get_group_coordinator'
662
    %% will ensure a connection to the broker.
663
    FakeGroupId = <<"____emqx_consumer_probe">>,
305✔
664
    case brod_client:get_group_coordinator(ClientPid, FakeGroupId) of
305✔
665
        {error, client_down} ->
666
            ?status_disconnected;
×
667
        {error, {client_down, Reason}} ->
668
            %% `brod' should have already logged the client being down.
669
            {?status_disconnected, maybe_clean_error(Reason)};
14✔
670
        {error, Reason} ->
671
            %% `brod' should have already logged the client being down.
672
            {?status_disconnected, maybe_clean_error(Reason)};
5✔
673
        {ok, _Metadata} ->
674
            ?status_connected
284✔
675
    end.
676

677
%% Attempt to make the returned error a bit more friendly.
678
maybe_clean_error(Reason) ->
679
    case Reason of
19✔
680
        [{{Host, Port}, {nxdomain, _Stacktrace}} | _] when is_integer(Port) ->
681
            HostPort = iolist_to_binary([Host, ":", integer_to_binary(Port)]),
1✔
682
            {HostPort, nxdomain};
1✔
683
        [{error_code, Code}, {error_msg, Msg} | _] ->
684
            {Code, Msg};
4✔
685
        _ ->
686
            Reason
14✔
687
    end.
688

689
-spec make_client_id(connector_resource_id(), binary(), atom() | binary()) -> atom().
690
make_client_id(ConnectorResId, BridgeType, BridgeName) ->
691
    case emqx_resource:is_dry_run(ConnectorResId) of
111✔
692
        false ->
693
            ClientID0 = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName),
73✔
694
            binary_to_atom(ClientID0);
73✔
695
        true ->
696
            %% It is a dry run and we don't want to leak too many
697
            %% atoms.
698
            probing_brod_consumers
38✔
699
    end.
700

701
convert_topic_mapping(TopicMappingList) ->
702
    lists:foldl(
72✔
703
        fun(Fields, Acc) ->
704
            #{
75✔
705
                kafka_topic := KafkaTopic,
706
                mqtt_topic := MQTTTopicTemplate0,
707
                qos := QoS,
708
                payload_template := PayloadTemplate0
709
            } = Fields,
710
            PayloadTemplate = emqx_placeholder:preproc_tmpl(PayloadTemplate0),
75✔
711
            MQTTTopicTemplate = emqx_placeholder:preproc_tmpl(MQTTTopicTemplate0),
75✔
712
            Acc#{
75✔
713
                KafkaTopic => #{
714
                    payload_template => PayloadTemplate,
715
                    mqtt_topic_template => MQTTTopicTemplate,
716
                    qos => QoS
717
                }
718
            }
719
        end,
720
        #{},
721
        TopicMappingList
722
    ).
723

724
render(FullMessage, PayloadTemplate) ->
725
    Opts = #{
1,910✔
726
        return => full_binary,
727
        var_trans => fun
728
            (undefined) ->
729
                <<>>;
3✔
730
            (X) ->
731
                emqx_utils_conv:bin(X)
964✔
732
        end
733
    },
734
    emqx_placeholder:proc_tmpl(PayloadTemplate, FullMessage, Opts).
1,910✔
735

736
encode(Value, none) ->
737
    Value;
1,912✔
738
encode(Value, base64) ->
739
    base64:encode(Value).
×
740

741
to_bin(B) when is_binary(B) -> B;
208✔
742
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).
8✔
743

744
infer_client_error(Error) ->
745
    case Error of
×
746
        [{_BrokerEndpoint, {econnrefused, _}} | _] ->
747
            connection_refused;
×
748
        [{_BrokerEndpoint, {{sasl_auth_error, Message}, _}} | _] when is_binary(Message) ->
749
            {auth_error, Message};
×
750
        [{_BrokerEndpoint, {{sasl_auth_error, _}, _}} | _] ->
751
            auth_error;
×
752
        _ ->
753
            undefined
×
754
    end.
755

756
allocate_subscriber_id(ConnectorResId, SourceResId, SubscriberId) ->
757
    ok = emqx_resource:allocate_resource(
102✔
758
        ConnectorResId,
759
        {?kafka_subscriber_id, SourceResId},
760
        SubscriberId
761
    ).
762

763
deallocate_subscriber_id(ConnectorResId, SourceResId) ->
764
    ok = emqx_resource:deallocate_resource(
82✔
765
        ConnectorResId,
766
        {?kafka_subscriber_id, SourceResId}
767
    ).
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc