• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 15070759477

16 May 2025 02:26PM UTC coverage: 82.508%. First build
15070759477

Pull #15221

github

web-flow
Merge 63f6fb0dc into b895cdcd8
Pull Request #15221: port of #14989 to release-58

30 of 37 new or added lines in 3 files covered. (81.08%)

57882 of 70153 relevant lines covered (82.51%)

14993.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.78
/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2022-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%--------------------------------------------------------------------
4

5
-module(emqx_bridge_kinesis_impl_producer).
6

7
-feature(maybe_expr, enable).
8

9
-include_lib("emqx/include/logger.hrl").
10
-include_lib("emqx_resource/include/emqx_resource.hrl").
11
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
12

13
-define(HEALTH_CHECK_TIMEOUT, 15000).
14
-define(TOPIC_MESSAGE,
15
    "Kinesis stream is invalid. Please check if the stream exist in Kinesis account."
16
).
17

18
-type config_connector() :: #{
19
    aws_access_key_id := binary(),
20
    aws_secret_access_key := emqx_secret:t(binary()),
21
    endpoint := binary(),
22
    max_retries := non_neg_integer(),
23
    pool_size := non_neg_integer(),
24
    instance_id => resource_id(),
25
    any() => term()
26
}.
27
-type state() :: #{
28
    pool_name := resource_id(),
29
    installed_channels := map()
30
}.
31
-export_type([config_connector/0]).
32

33
%% `emqx_resource' API
34
-export([
35
    resource_type/0,
36
    callback_mode/0,
37
    on_start/2,
38
    on_stop/2,
39
    on_query/3,
40
    on_batch_query/3,
41
    on_get_status/2,
42
    on_add_channel/4,
43
    on_remove_channel/3,
44
    on_get_channels/1,
45
    on_get_channel_status/3,
46
    on_format_query_result/1
47
]).
48

49
-export([
50
    connect/1,
51
    do_connector_health_check/1,
52
    do_channel_health_check/2
53
]).
54

55
%%-------------------------------------------------------------------------------------------------
56
%% `emqx_resource' API
57
%%-------------------------------------------------------------------------------------------------
58
resource_type() -> kinesis_producer.
36✔
59

60
callback_mode() -> always_sync.
36✔
61

62
-spec on_start(resource_id(), config_connector()) -> {ok, state()} | {error, term()}.
63
on_start(
64
    InstanceId,
65
    #{
66
        pool_size := PoolSize
67
    } = Config0
68
) ->
69
    ?SLOG(info, #{
41✔
70
        msg => "starting_kinesis_bridge",
71
        connector => InstanceId,
72
        config => redact(Config0)
73
    }),
41✔
74
    Config = Config0#{instance_id => InstanceId},
41✔
75
    Options = [
41✔
76
        {config, Config},
77
        {pool_size, PoolSize}
78
    ],
79
    State = #{
41✔
80
        pool_name => InstanceId,
81
        installed_channels => #{}
82
    },
83

84
    case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
41✔
85
        ok ->
86
            ?tp(emqx_bridge_kinesis_impl_producer_start_ok, #{config => Config}),
41✔
87
            {ok, State};
41✔
88
        Error ->
89
            ?tp(emqx_bridge_kinesis_impl_producer_start_failed, #{config => Config}),
×
90
            Error
×
91
    end.
92

93
-spec on_stop(resource_id(), state()) -> ok | {error, term()}.
94
on_stop(InstanceId, _State) ->
95
    emqx_resource_pool:stop(InstanceId).
38✔
96

97
-spec on_get_status(resource_id(), state()) ->
98
    ?status_connected
99
    | ?status_disconnected
100
    | {?status_disconnected, state(), {unhealthy_target, string()}}.
101
on_get_status(_InstanceId, #{pool_name := Pool} = _State) ->
102
    case
108✔
103
        emqx_resource_pool:health_check_workers_optimistic(
104
            Pool,
105
            {?MODULE, do_connector_health_check, []},
106
            ?HEALTH_CHECK_TIMEOUT
107
        )
108
    of
109
        ok ->
110
            ?status_connected;
85✔
111
        {error, {error, {nxdomain = Posix, _}}} ->
112
            {error, emqx_utils:explain_posix(Posix)};
6✔
113
        {error, {error, {econnrefused = Posix, _}}} ->
114
            {error, emqx_utils:explain_posix(Posix)};
11✔
115
        {error, Reason} ->
116
            ?SLOG(error, #{
6✔
117
                msg => "kinesis_producer_connector_get_status_failed",
118
                reason => Reason
119
            }),
×
120
            {?status_disconnected, Reason}
6✔
121
    end.
122

123
do_connector_health_check(WorkerPid) ->
124
    maybe
108✔
125
        {ok, connected} ?=
108✔
126
            emqx_bridge_kinesis_connector_client:connection_status(WorkerPid),
127
        ok
85✔
128
    end.
129

130
do_channel_health_check(WorkerPid, StreamName) ->
131
    case emqx_bridge_kinesis_connector_client:connection_status(WorkerPid, StreamName) of
57✔
132
        {ok, connected} ->
133
            ok;
49✔
134
        {error, unhealthy_target} ->
135
            {halt, {error, unhealthy_target}};
6✔
136
        {error, Reason} ->
137
            {error, Reason}
2✔
138
    end.
139

140
on_add_channel(
141
    _InstId,
142
    #{
143
        installed_channels := InstalledChannels
144
    } = OldState,
145
    ChannelId,
146
    ChannelConfig
147
) ->
148
    {ok, ChannelState} = create_channel_state(ChannelConfig),
29✔
149
    NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
29✔
150
    %% Update state
151
    NewState = OldState#{installed_channels => NewInstalledChannels},
29✔
152
    {ok, NewState}.
29✔
153

154
create_channel_state(
155
    #{parameters := Parameters} = _ChannelConfig
156
) ->
157
    #{
29✔
158
        stream_name := StreamName,
159
        partition_key := PartitionKey
160
    } = Parameters,
161
    {ok, #{
29✔
162
        templates => parse_template(Parameters),
163
        stream_name => StreamName,
164
        partition_key => PartitionKey
165
    }}.
166

167
on_remove_channel(
168
    _InstId,
169
    #{
170
        installed_channels := InstalledChannels
171
    } = OldState,
172
    ChannelId
173
) ->
174
    NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
28✔
175
    %% Update state
176
    NewState = OldState#{installed_channels => NewInstalledChannels},
28✔
177
    {ok, NewState}.
28✔
178

179
on_get_channel_status(
180
    _ResId,
181
    ChannelId,
182
    #{
183
        pool_name := Pool,
184
        installed_channels := Channels
185
    } = _State
186
) ->
187
    #{stream_name := StreamName} = maps:get(ChannelId, Channels),
57✔
188
    case
57✔
189
        emqx_resource_pool:health_check_workers_optimistic(
190
            Pool,
191
            {?MODULE, do_channel_health_check, [StreamName]},
192
            ?HEALTH_CHECK_TIMEOUT
193
        )
194
    of
195
        ok ->
196
            ?status_connected;
48✔
197
        {error, unhealthy_target} ->
198
            {?status_disconnected, {unhealthy_target, ?TOPIC_MESSAGE}};
6✔
199
        {error, Reason} ->
NEW
200
            ?SLOG(error, #{
×
201
                msg => "kinesis_producer_channel_get_status_failed",
202
                reason => Reason
NEW
203
            }),
×
NEW
204
            {?status_disconnected, Reason}
×
205
    end.
206

207
on_get_channels(ResId) ->
208
    emqx_bridge_v2:get_channels_for_connector(ResId).
218✔
209

210
-spec on_query(
211
    resource_id(),
212
    {channel_id(), map()},
213
    state()
214
) ->
215
    {ok, map()}
216
    | {error, {recoverable_error, term()}}
217
    | {error, term()}.
218
on_query(ResourceId, {ChannelId, Message}, State) ->
219
    Requests = [{ChannelId, Message}],
17✔
220
    ?tp(emqx_bridge_kinesis_impl_producer_sync_query, #{message => Message}),
17✔
221
    do_send_requests_sync(ResourceId, Requests, State, ChannelId).
17✔
222

223
-spec on_batch_query(
224
    resource_id(),
225
    [{channel_id(), map()}],
226
    state()
227
) ->
228
    {ok, map()}
229
    | {error, {recoverable_error, term()}}
230
    | {error, term()}.
231
%% we only support batch insert
232
on_batch_query(ResourceId, [{ChannelId, _} | _] = Requests, State) ->
233
    ?tp(emqx_bridge_kinesis_impl_producer_sync_batch_query, #{requests => Requests}),
7✔
234
    do_send_requests_sync(ResourceId, Requests, State, ChannelId).
7✔
235

236
connect(Opts) ->
237
    Options = proplists:get_value(config, Opts),
41✔
238
    emqx_bridge_kinesis_connector_client:start_link(Options).
41✔
239

240
%%-------------------------------------------------------------------------------------------------
241
%% Helper fns
242
%%-------------------------------------------------------------------------------------------------
243

244
-spec do_send_requests_sync(
245
    resource_id(),
246
    [{channel_id(), map()}],
247
    state(),
248
    channel_id()
249
) ->
250
    {ok, jsx:json_term() | binary()}
251
    | {error, {recoverable_error, term()}}
252
    | {error, {unrecoverable_error, {invalid_request, term()}}}
253
    | {error, {unrecoverable_error, {unhealthy_target, string()}}}
254
    | {error, {unrecoverable_error, term()}}
255
    | {error, term()}.
256
do_send_requests_sync(
257
    InstanceId,
258
    Requests,
259
    #{
260
        pool_name := PoolName,
261
        installed_channels := InstalledChannels
262
    } = _State,
263
    ChannelId
264
) ->
265
    #{
24✔
266
        templates := Templates,
267
        stream_name := StreamName
268
    } = maps:get(ChannelId, InstalledChannels),
269
    Records = render_records(Requests, Templates),
24✔
270
    StructuredRecords = [
24✔
271
        #{data => Data, partition_key => PartitionKey}
33✔
272
     || {Data, PartitionKey} <- Records
24✔
273
    ],
274
    emqx_trace:rendered_action_template(ChannelId, StructuredRecords),
24✔
275
    Result = ecpool:pick_and_do(
24✔
276
        PoolName,
277
        {emqx_bridge_kinesis_connector_client, query, [Records, StreamName]},
278
        no_handover
279
    ),
280
    handle_result(Result, Requests, InstanceId).
24✔
281

282
handle_result({ok, _} = Result, _Requests, _InstanceId) ->
283
    Result;
19✔
284
handle_result({error, {<<"ResourceNotFoundException">>, _} = Reason}, Requests, InstanceId) ->
285
    ?SLOG(error, #{
2✔
286
        msg => "kinesis_error_response",
287
        request => Requests,
288
        connector => InstanceId,
289
        reason => Reason
290
    }),
×
291
    {error, {unrecoverable_error, {unhealthy_target, ?TOPIC_MESSAGE}}};
2✔
292
handle_result(
293
    {error, {<<"ProvisionedThroughputExceededException">>, _} = Reason}, Requests, InstanceId
294
) ->
295
    ?SLOG(error, #{
×
296
        msg => "kinesis_error_response",
297
        request => Requests,
298
        connector => InstanceId,
299
        reason => Reason
300
    }),
×
301
    {error, {recoverable_error, Reason}};
×
302
handle_result({error, {<<"InvalidArgumentException">>, _} = Reason}, Requests, InstanceId) ->
303
    ?SLOG(error, #{
2✔
304
        msg => "kinesis_error_response",
305
        request => Requests,
306
        connector => InstanceId,
307
        reason => Reason
308
    }),
×
309
    {error, {unrecoverable_error, Reason}};
2✔
310
handle_result({error, {econnrefused = Reason, _}}, Requests, InstanceId) ->
311
    ?SLOG(error, #{
1✔
312
        msg => "kinesis_error_response",
313
        request => Requests,
314
        connector => InstanceId,
315
        reason => Reason
316
    }),
×
317
    {error, {recoverable_error, Reason}};
1✔
318
handle_result({error, Reason} = Error, Requests, InstanceId) ->
319
    ?SLOG(error, #{
×
320
        msg => "kinesis_error_response",
321
        request => Requests,
322
        connector => InstanceId,
323
        reason => Reason
324
    }),
×
325
    Error.
×
326

327
on_format_query_result({ok, Result}) ->
328
    #{result => ok, info => Result};
×
329
on_format_query_result(Result) ->
330
    Result.
×
331

332
parse_template(Config) ->
333
    #{payload_template := PayloadTemplate, partition_key := PartitionKeyTemplate} = Config,
29✔
334
    Templates = #{send_message => PayloadTemplate, partition_key => PartitionKeyTemplate},
29✔
335
    maps:map(fun(_K, V) -> emqx_placeholder:preproc_tmpl(V) end, Templates).
29✔
336

337
render_records(Items, Templates) ->
338
    PartitionKeyTemplate = maps:get(partition_key, Templates),
24✔
339
    MsgTemplate = maps:get(send_message, Templates),
24✔
340
    render_messages(Items, {MsgTemplate, PartitionKeyTemplate}, []).
24✔
341

342
render_messages([], _Templates, RenderedMsgs) ->
343
    RenderedMsgs;
24✔
344
render_messages(
345
    [{_, Msg} | Others],
346
    {MsgTemplate, PartitionKeyTemplate} = Templates,
347
    RenderedMsgs
348
) ->
349
    Data = emqx_placeholder:proc_tmpl(MsgTemplate, Msg),
33✔
350
    PartitionKey = emqx_placeholder:proc_tmpl(PartitionKeyTemplate, Msg),
33✔
351
    RenderedMsg = {Data, PartitionKey},
33✔
352
    render_messages(Others, Templates, [RenderedMsg | RenderedMsgs]).
33✔
353

354
redact(Config) ->
355
    emqx_utils:redact(Config, fun(Any) -> Any =:= aws_secret_access_key end).
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc