• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 13979276088

20 Mar 2025 09:02PM UTC coverage: 83.253%. First build
13979276088

Pull #14901

github

web-flow
Merge 7b2283018 into 6c32718fc
Pull Request #14901: feat: port external HTTP serde from 4.x to schema registry

113 of 142 new or added lines in 7 files covered. (79.58%)

61708 of 74121 relevant lines covered (83.25%)

16593.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.67
/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2023-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%--------------------------------------------------------------------
4
-module(emqx_schema_registry_serde).
5

6
-feature(maybe_expr, enable).
7

8
-behaviour(emqx_rule_funcs).
9

10
-include("emqx_schema_registry.hrl").
11
-include_lib("emqx/include/logger.hrl").
12
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
13

14
%% API
15
-export([
16
    make_serde/3,
17
    handle_rule_function/2,
18
    schema_check/3,
19
    is_existing_type/1,
20
    is_existing_type/2,
21
    destroy/1
22
]).
23

24
%% Tests
25
-export([
26
    decode/2,
27
    decode/3,
28
    encode/2,
29
    encode/3,
30
    eval_decode/2,
31
    eval_encode/2
32
]).
33

34
-elvis([{elvis_style, no_match_in_condition, disable}]).
35

36
%%------------------------------------------------------------------------------
37
%% Type definitions
38
%%------------------------------------------------------------------------------
39

40
-define(BOOL(SerdeName, EXPR),
41
    try
42
        _ = EXPR,
43
        true
44
    catch
45
        error:Reason ->
46
            ?SLOG(debug, #{msg => "schema_check_failed", schema => SerdeName, reason => Reason}),
47
            false
48
    end
49
).
50

51
-type eval_context() :: term().
52

53
-type fingerprint() :: binary().
54

55
-type otp_release() :: string().
56

57
-type protobuf_cache_key() :: {schema_name(), otp_release(), fingerprint()}.
58

59
-export_type([serde_type/0]).
60

61
%%------------------------------------------------------------------------------
62
%% API
63
%%------------------------------------------------------------------------------
64

65
-spec is_existing_type(schema_name()) -> boolean().
66
is_existing_type(SchemaName) ->
67
    is_existing_type(SchemaName, []).
×
68

69
-spec is_existing_type(schema_name(), [binary()]) -> boolean().
70
is_existing_type(SchemaName, Path) ->
71
    maybe
25✔
72
        {ok, #serde{type = SerdeType, eval_context = EvalContext}} ?=
25✔
73
            emqx_schema_registry:get_serde(SchemaName),
74
        has_inner_type(SerdeType, EvalContext, Path)
16✔
75
    else
76
        _ -> false
9✔
77
    end.
78

79
-spec handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}.
80
handle_rule_function(sparkplug_decode, [Data]) ->
81
    handle_rule_function(
2✔
82
        schema_decode,
83
        [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data, <<"Payload">>]
84
    );
85
handle_rule_function(sparkplug_decode, [Data | MoreArgs]) ->
86
    handle_rule_function(
1✔
87
        schema_decode,
88
        [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data | MoreArgs]
89
    );
90
handle_rule_function(sparkplug_encode, [Term]) ->
91
    handle_rule_function(
3✔
92
        schema_encode,
93
        [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term, <<"Payload">>]
94
    );
95
handle_rule_function(sparkplug_encode, [Term | MoreArgs]) ->
96
    handle_rule_function(
1✔
97
        schema_encode,
98
        [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs]
99
    );
100
handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) ->
101
    decode(SchemaId, Data, MoreArgs);
14✔
102
handle_rule_function(schema_decode, Args) ->
103
    error({args_count_error, {schema_decode, Args}});
×
104
handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) ->
105
    %% encode outputs iolists, but when the rule actions process those
106
    %% it might wrongly encode them as JSON lists, so we force them to
107
    %% binaries here.
108
    IOList = encode(SchemaId, Term, MoreArgs),
18✔
109
    iolist_to_binary(IOList);
13✔
110
handle_rule_function(schema_encode, Args) ->
111
    error({args_count_error, {schema_encode, Args}});
×
112
handle_rule_function(schema_check, [SchemaId, Data | MoreArgs]) ->
113
    schema_check(SchemaId, Data, MoreArgs);
7✔
114
handle_rule_function(avro_encode, [RegistryName, Data | Args]) ->
115
    case emqx_schema_registry_external:encode(RegistryName, Data, Args, #{tag => false}) of
1✔
116
        {ok, Encoded} ->
117
            Encoded;
1✔
118
        {error, Reason} ->
119
            error(Reason)
×
120
    end;
121
handle_rule_function(avro_decode, [RegistryName, Data | Args]) ->
122
    case emqx_schema_registry_external:decode(RegistryName, Data, Args, _Opts = #{}) of
1✔
123
        {ok, Decoded} ->
124
            Decoded;
1✔
125
        {error, Reason} ->
126
            error(Reason)
×
127
    end;
128
handle_rule_function(schema_encode_and_tag, [OurSchemaName, RegistryName, Data | Args]) ->
129
    case handle_schema_encode_and_tag(OurSchemaName, RegistryName, Data, Args) of
1✔
130
        {ok, Encoded} ->
131
            Encoded;
1✔
132
        {error, Reason} ->
133
            error(Reason)
×
134
    end;
135
handle_rule_function(schema_decode_tagged, [RegistryName, Data | Args]) ->
136
    case emqx_schema_registry_external:decode(RegistryName, Data, Args, _Opts = #{}) of
1✔
137
        {ok, Decoded} ->
138
            Decoded;
1✔
139
        {error, Reason} ->
140
            error(Reason)
×
141
    end;
142
handle_rule_function(_, _) ->
143
    {error, no_match_for_function}.
×
144

145
-spec schema_check(schema_name(), decoded_data() | encoded_data(), [term()]) -> decoded_data().
146
schema_check(SerdeName, Data, VarArgs) when is_list(VarArgs), is_binary(Data) ->
147
    with_serde(
18✔
148
        SerdeName,
149
        fun(Serde) ->
150
            ?BOOL(SerdeName, eval_decode(Serde, [Data | VarArgs]))
17✔
151
        end
152
    );
153
schema_check(SerdeName, Data, VarArgs) when is_list(VarArgs), is_map(Data) ->
154
    with_serde(
3✔
155
        SerdeName,
156
        fun(Serde) ->
157
            ?BOOL(SerdeName, eval_encode(Serde, [Data | VarArgs]))
2✔
158
        end
159
    ).
160

161
-spec decode(schema_name(), encoded_data()) -> decoded_data().
162
decode(SerdeName, RawData) ->
163
    decode(SerdeName, RawData, []).
11✔
164

165
-spec decode(schema_name(), encoded_data(), [term()]) -> decoded_data().
166
decode(SerdeName, RawData, VarArgs) when is_list(VarArgs) ->
167
    with_serde(SerdeName, fun(Serde) ->
40✔
168
        eval_decode(Serde, [RawData | VarArgs])
38✔
169
    end).
170

171
-spec encode(schema_name(), decoded_data()) -> encoded_data().
172
encode(SerdeName, RawData) ->
173
    encode(SerdeName, RawData, []).
16✔
174

175
-spec encode(schema_name(), decoded_data(), [term()]) -> encoded_data().
176
encode(SerdeName, Data, VarArgs) when is_list(VarArgs) ->
177
    with_serde(
55✔
178
        SerdeName,
179
        fun(Serde) ->
180
            eval_encode(Serde, [Data | VarArgs])
52✔
181
        end
182
    ).
183

184
with_serde(Name, F) ->
185
    case emqx_schema_registry:get_serde(Name) of
116✔
186
        {ok, Serde} ->
187
            Meta =
109✔
188
                case logger:get_process_metadata() of
189
                    undefined -> #{};
12✔
190
                    Meta0 -> Meta0
97✔
191
                end,
192
            logger:update_process_metadata(#{schema_name => Name}),
109✔
193
            try
109✔
194
                F(Serde)
109✔
195
            after
196
                logger:set_process_metadata(Meta)
109✔
197
            end;
198
        {error, not_found} ->
199
            error({serde_not_found, Name})
7✔
200
    end.
201

202
-spec make_serde(serde_type(), schema_name(), schema_source()) -> serde().
203
make_serde(?avro, Name, Source) ->
204
    Store0 = avro_schema_store:new([map]),
20✔
205
    %% import the schema into the map store with an assigned name
206
    %% if it's a named schema (e.g. struct), then Name is added as alias
207
    Store = avro_schema_store:import_schema_json(Name, Source, Store0),
20✔
208
    #serde{
16✔
209
        name = Name,
210
        type = ?avro,
211
        eval_context = Store
212
    };
213
make_serde(?protobuf, Name, Source) ->
214
    {CacheKey, SerdeMod} = make_protobuf_serde_mod(Name, Source),
48✔
215
    #serde{
44✔
216
        name = Name,
217
        type = ?protobuf,
218
        eval_context = SerdeMod,
219
        extra = #{cache_key => CacheKey}
220
    };
221
make_serde(?json, Name, Source) ->
222
    case json_decode(Source) of
21✔
223
        SchemaObj when is_map(SchemaObj) ->
224
            %% jesse:add_schema adds any map() without further validation
225
            %% if it's not a map, then case_clause
226
            ok = jesse_add_schema(Name, SchemaObj),
18✔
227
            #serde{name = Name, type = ?json};
18✔
228
        _NotMap ->
229
            error({invalid_json_schema, bad_schema_object})
3✔
230
    end;
231
make_serde(?external_http, Name, Params) ->
232
    Context = create_external_http_resource(Name, Params),
3✔
233
    #serde{
3✔
234
        name = Name,
235
        type = ?external_http,
236
        eval_context = Context,
237
        extra = #{}
238
    }.
239

240
eval_decode(#serde{type = ?avro, name = Name, eval_context = Store}, [Data]) ->
241
    Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]),
17✔
242
    avro_binary_decoder:decode(Data, Name, Store, Opts);
17✔
243
eval_decode(#serde{type = ?protobuf}, [#{} = DecodedData, MessageType]) ->
244
    %% Already decoded, so it's an user error.
245
    throw(
1✔
246
        {schema_decode_error, #{
247
            error_type => decoding_failure,
248
            data => DecodedData,
249
            message_type => MessageType,
250
            explain =>
251
                <<
252
                    "Attempted to schema decode an already decoded message."
253
                    " Check your rules or transformation pipeline."
254
                >>
255
        }}
256
    );
257
eval_decode(#serde{type = ?protobuf, eval_context = SerdeMod}, [EncodedData, MessageType0]) ->
258
    MessageType = binary_to_existing_atom(MessageType0, utf8),
32✔
259
    try
30✔
260
        Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageType]),
30✔
261
        emqx_utils_maps:binary_key_map(Decoded)
23✔
262
    catch
263
        error:{gpb_error, {decoding_failure, {_Data, _Schema, {error, function_clause, _Stack}}}} ->
264
            #{schema_name := SchemaName} = logger:get_process_metadata(),
3✔
265
            throw(
3✔
266
                {schema_decode_error, #{
267
                    error_type => decoding_failure,
268
                    data => EncodedData,
269
                    message_type => MessageType,
270
                    schema_name => SchemaName,
271
                    explain =>
272
                        <<"The given data could not be decoded. Please check the input data and the schema.">>
273
                }}
274
            )
275
    end;
276
eval_decode(#serde{type = ?json, name = Name}, [Data]) ->
277
    true = is_binary(Data),
12✔
278
    Term = json_decode(Data),
12✔
279
    {ok, NewTerm} = jesse_validate(Name, Term),
12✔
280
    NewTerm;
8✔
281
eval_decode(#serde{type = ?external_http, name = Name, eval_context = Context}, [Payload]) ->
282
    Request = generate_external_http_request(Payload, decode, Name, Context),
3✔
283
    exec_external_http_request(Request, Context).
3✔
284

285
eval_encode(#serde{type = ?avro, name = Name, eval_context = Store}, [Data]) ->
286
    avro_binary_encoder:encode(Store, Name, Data);
18✔
287
eval_encode(#serde{type = ?protobuf, eval_context = SerdeMod}, [DecodedData0, MessageName0]) ->
288
    DecodedData = emqx_utils_maps:safe_atom_key_map(DecodedData0),
33✔
289
    MessageName = binary_to_existing_atom(MessageName0, utf8),
33✔
290
    apply(SerdeMod, encode_msg, [DecodedData, MessageName]);
33✔
291
eval_encode(#serde{type = ?json, name = Name}, [Map]) ->
292
    %% The input Map may not be a valid JSON term for jesse
293
    Data = iolist_to_binary(emqx_utils_json:encode(Map)),
8✔
294
    NewMap = json_decode(Data),
8✔
295
    case jesse_validate(Name, NewMap) of
8✔
296
        {ok, _} ->
297
            Data;
7✔
298
        {error, Reason} ->
299
            error(Reason)
1✔
300
    end;
301
eval_encode(#serde{type = ?external_http, name = Name, eval_context = Context}, [Payload]) ->
302
    Request = generate_external_http_request(Payload, encode, Name, Context),
6✔
303
    exec_external_http_request(Request, Context).
6✔
304

305
destroy(#serde{type = ?avro = Type, name = _Name}) ->
306
    ?tp(serde_destroyed, #{type => Type, name => _Name}),
16✔
307
    ok;
16✔
308
destroy(#serde{type = ?protobuf = Type, name = _Name, eval_context = SerdeMod} = Serde) ->
309
    unload_code(SerdeMod),
24✔
310
    destroy_protobuf_code(Serde),
24✔
311
    ?tp(serde_destroyed, #{type => Type, name => _Name}),
24✔
312
    ok;
24✔
313
destroy(#serde{type = ?json = Type, name = Name}) ->
314
    ok = jesse_del_schema(Name),
17✔
315
    ?tp(serde_destroyed, #{type => Type, name => Name}),
17✔
316
    ok;
17✔
317
destroy(#serde{type = ?external_http = Type, name = _Name, eval_context = Context}) ->
318
    ok = destroy_external_http_resource(Context),
3✔
319
    ?tp(serde_destroyed, #{type => Type, name => _Name}),
3✔
320
    ok.
3✔
321

322
%%------------------------------------------------------------------------------
323
%% Internal fns
324
%%------------------------------------------------------------------------------
325

326
json_decode(Data) ->
327
    emqx_utils_json:decode(Data, [return_maps]).
41✔
328

329
jesse_add_schema(Name, Obj) ->
330
    jesse:add_schema(jesse_name(Name), Obj).
18✔
331

332
jesse_del_schema(Name) ->
333
    jesse:del_schema(jesse_name(Name)).
17✔
334

335
jesse_validate(Name, Map) ->
336
    jesse:validate(jesse_name(Name), Map, []).
20✔
337

338
jesse_name(Str) ->
339
    unicode:characters_to_list(Str).
55✔
340

341
-spec make_protobuf_serde_mod(schema_name(), schema_source()) -> {protobuf_cache_key(), module()}.
342
make_protobuf_serde_mod(Name, Source) ->
343
    {SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name),
48✔
344
    case lazy_generate_protobuf_code(Name, SerdeMod0, Source) of
48✔
345
        {ok, SerdeMod, ModBinary} ->
346
            load_code(SerdeMod, SerdeModFileName, ModBinary),
44✔
347
            CacheKey = protobuf_cache_key(Name, Source),
44✔
348
            {CacheKey, SerdeMod};
44✔
349
        {error, #{error := Error, warnings := Warnings}} ->
350
            ?SLOG(
4✔
351
                warning,
4✔
352
                #{
353
                    msg => "error_generating_protobuf_code",
354
                    error => Error,
355
                    warnings => Warnings
356
                }
4✔
357
            ),
358
            error({invalid_protobuf_schema, Error})
4✔
359
    end.
360

361
-spec protobuf_serde_mod_name(schema_name()) -> {module(), string()}.
362
protobuf_serde_mod_name(Name) ->
363
    %% must be a string (list)
364
    SerdeModName = "$schema_parser_" ++ binary_to_list(Name),
48✔
365
    SerdeMod = list_to_atom(SerdeModName),
48✔
366
    %% the "path" to the module, for `code:load_binary'.
367
    SerdeModFileName = SerdeModName ++ ".memory",
48✔
368
    {SerdeMod, SerdeModFileName}.
48✔
369

370
%% Fixme: we cannot uncomment the following typespec because Dialyzer complains that
371
%% `Source' should be `string()' due to `gpb_compile:string/3', but it does work fine with
372
%% binaries...
373
%% -spec protobuf_cache_key(schema_name(), schema_source()) -> {schema_name(), fingerprint()}.
374
protobuf_cache_key(Name, Source) ->
375
    {Name, erlang:system_info(otp_release), erlang:md5(Source)}.
92✔
376

377
-spec lazy_generate_protobuf_code(schema_name(), module(), schema_source()) ->
378
    {ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}.
379
lazy_generate_protobuf_code(Name, SerdeMod0, Source) ->
380
    %% We run this inside a transaction with locks to avoid running
381
    %% the compile on all nodes; only one will get the lock, compile
382
    %% the schema, and other nodes will simply read the final result.
383
    {atomic, Res} = mria:transaction(
48✔
384
        ?SCHEMA_REGISTRY_SHARD,
385
        fun lazy_generate_protobuf_code_trans/3,
386
        [Name, SerdeMod0, Source]
387
    ),
388
    Res.
48✔
389

390
-spec lazy_generate_protobuf_code_trans(schema_name(), module(), schema_source()) ->
391
    {ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}.
392
lazy_generate_protobuf_code_trans(Name, SerdeMod0, Source) ->
393
    CacheKey = protobuf_cache_key(Name, Source),
48✔
394
    _ = mnesia:lock({record, ?PROTOBUF_CACHE_TAB, CacheKey}, write),
48✔
395
    case mnesia:read(?PROTOBUF_CACHE_TAB, CacheKey) of
48✔
396
        [#protobuf_cache{module = SerdeMod, module_binary = ModBinary}] ->
397
            ?tp(schema_registry_protobuf_cache_hit, #{name => Name}),
8✔
398
            {ok, SerdeMod, ModBinary};
8✔
399
        [] ->
400
            ?tp(schema_registry_protobuf_cache_miss, #{name => Name}),
40✔
401
            case generate_protobuf_code(SerdeMod0, Source) of
40✔
402
                {ok, SerdeMod, ModBinary} ->
403
                    CacheEntry = #protobuf_cache{
36✔
404
                        fingerprint = CacheKey,
405
                        module = SerdeMod,
406
                        module_binary = ModBinary
407
                    },
408
                    ok = mnesia:write(?PROTOBUF_CACHE_TAB, CacheEntry, write),
36✔
409
                    {ok, SerdeMod, ModBinary};
36✔
410
                {ok, SerdeMod, ModBinary, _Warnings} ->
411
                    CacheEntry = #protobuf_cache{
×
412
                        fingerprint = CacheKey,
413
                        module = SerdeMod,
414
                        module_binary = ModBinary
415
                    },
416
                    ok = mnesia:write(?PROTOBUF_CACHE_TAB, CacheEntry, write),
×
417
                    {ok, SerdeMod, ModBinary};
×
418
                error ->
419
                    {error, #{error => undefined, warnings => []}};
×
420
                {error, Error} ->
421
                    {error, #{error => Error, warnings => []}};
4✔
422
                {error, Error, Warnings} ->
423
                    {error, #{error => Error, warnings => Warnings}}
×
424
            end
425
    end.
426

427
generate_protobuf_code(SerdeMod, Source) ->
428
    gpb_compile:string(
40✔
429
        SerdeMod,
430
        Source,
431
        [
432
            binary,
433
            strings_as_binaries,
434
            {maps, true},
435
            %% Fixme: currently, some bug in `gpb' prevents this
436
            %% option from working with `oneof' types...  We're then
437
            %% forced to use atom key maps.
438
            %% {maps_key_type, binary},
439
            {maps_oneof, flat},
440
            {verify, always},
441
            {maps_unset_optional, omitted}
442
        ]
443
    ).
444

445
-spec load_code(module(), string(), binary()) -> ok.
446
load_code(SerdeMod, SerdeModFileName, ModBinary) ->
447
    _ = code:purge(SerdeMod),
44✔
448
    {module, SerdeMod} = code:load_binary(SerdeMod, SerdeModFileName, ModBinary),
44✔
449
    ok.
44✔
450

451
-spec unload_code(module()) -> ok.
452
unload_code(SerdeMod) ->
453
    _ = code:purge(SerdeMod),
24✔
454
    _ = code:delete(SerdeMod),
24✔
455
    ok.
24✔
456

457
-spec destroy_protobuf_code(serde()) -> ok.
458
destroy_protobuf_code(Serde) ->
459
    #serde{extra = #{cache_key := CacheKey}} = Serde,
24✔
460
    {atomic, Res} = mria:transaction(
24✔
461
        ?SCHEMA_REGISTRY_SHARD,
462
        fun destroy_protobuf_code_trans/1,
463
        [CacheKey]
464
    ),
465
    ?tp("schema_registry_protobuf_cache_destroyed", #{name => Serde#serde.name}),
24✔
466
    Res.
24✔
467

468
-spec destroy_protobuf_code_trans({schema_name(), fingerprint()}) -> ok.
469
destroy_protobuf_code_trans(CacheKey) ->
470
    mnesia:delete(?PROTOBUF_CACHE_TAB, CacheKey, write).
24✔
471

472
-spec has_inner_type(serde_type(), eval_context(), [binary()]) ->
473
    boolean().
474
has_inner_type(protobuf, _SerdeMod, [_, _ | _]) ->
475
    %% Protobuf only has one level of message types.
476
    false;
1✔
477
has_inner_type(protobuf, SerdeMod, [MessageTypeBin]) ->
478
    try apply(SerdeMod, get_msg_names, []) of
6✔
479
        Names ->
480
            lists:member(MessageTypeBin, [atom_to_binary(N, utf8) || N <- Names])
6✔
481
    catch
482
        _:_ ->
483
            false
×
484
    end;
485
has_inner_type(_SerdeType, _EvalContext, []) ->
486
    %% This function is only called if we already found a serde, so the root does exist.
487
    true;
7✔
488
has_inner_type(_SerdeType, _EvalContext, _Path) ->
489
    false.
2✔
490

491
handle_schema_encode_and_tag(OurSchemaName, RegistryName, Data, Args) ->
492
    maybe
1✔
493
        {ok, Schema} ?= emqx_schema_registry:get_schema(OurSchemaName),
1✔
494
        Source = maps:get(source, Schema),
1✔
495
        emqx_schema_registry_external:encode_with(
1✔
496
            RegistryName,
497
            OurSchemaName,
498
            Source,
499
            Data,
500
            Args,
501
            #{tag => true}
502
        )
503
    end.
504

505
create_external_http_resource(Name, Params) ->
506
    ResourceId = <<"schema_registry:external_http:", Name/binary>>,
3✔
507
    #{
3✔
508
        url := URL,
509
        max_retries := MaxRetries,
510
        connect_timeout := ConnectTimeout,
511
        request_timeout := RequestTimeout,
512
        headers := Headers,
513
        pool_type := PoolType,
514
        pool_size := PoolSize,
515
        external_params := ExternalParams
516
    } = Params,
517
    {ok, {Base, Path, QueryParams}} = emqx_schema_registry_schema:parse_url(URL),
3✔
518
    ConnectorConfig = #{
3✔
519
        request_base => Base,
520
        connect_timeout => ConnectTimeout,
521
        pool_type => PoolType,
522
        pool_size => PoolSize
523
    },
524
    ResourceOpts = #{
3✔
525
        start_after_created => true,
526
        spawn_buffer_workers => false,
527
        query_mode => simple_sync
528
    },
529
    {ok, _} = emqx_resource:create_local(
3✔
530
        ResourceId,
531
        ?SCHEMA_REGISTRY_RESOURCE_GROUP,
532
        emqx_bridge_http_connector,
533
        ConnectorConfig,
534
        ResourceOpts
535
    ),
536
    #{
3✔
537
        resource_id => ResourceId,
538
        headers => maps:to_list(Headers),
539
        path => Path,
540
        query_params => QueryParams,
541
        external_params => ExternalParams,
542
        request_timeout => RequestTimeout,
543
        max_retries => MaxRetries
544
    }.
545

546
destroy_external_http_resource(Context) ->
547
    #{resource_id := ResourceId} = Context,
3✔
548
    emqx_resource:remove_local(ResourceId).
3✔
549

550
generate_external_http_request(Payload, EncodeOrDecode, Name, Context) ->
551
    #{
9✔
552
        headers := Headers,
553
        path := Path,
554
        query_params := QueryParams,
555
        external_params := ExternalParams
556
    } = Context,
557
    PathWithQuery = append_query(Path, QueryParams),
9✔
558
    Body = #{
9✔
559
        <<"payload">> => base64:encode(Payload),
560
        <<"type">> => EncodeOrDecode,
561
        <<"schema_name">> => Name,
562
        <<"opts">> => ExternalParams
563
    },
564
    BodyBin = emqx_utils_json:encode(Body),
9✔
565
    {PathWithQuery, Headers, BodyBin}.
9✔
566

567
exec_external_http_request(Request, Context) ->
568
    #{
9✔
569
        resource_id := ResourceId,
570
        request_timeout := RequestTimeout,
571
        max_retries := MaxRetries
572
    } = Context,
573
    Query = {
9✔
574
        _ActionResId = undefined,
575
        _KeyOrNum = undefined,
576
        _Method = post,
577
        Request,
578
        RequestTimeout,
579
        MaxRetries
580
    },
581
    Result = emqx_resource:query(ResourceId, Query),
9✔
582
    handle_external_http_result(Result).
9✔
583

584
handle_external_http_result({ok, 200, _RespHeaders, RespEncoded}) ->
585
    try base64:decode(RespEncoded) of
7✔
586
        Resp ->
587
            Resp
6✔
588
    catch
589
        error:Reason ->
590
            error(#{
1✔
591
                msg => bad_external_http_response_format,
592
                hint => <<"server response is not a valid base64-encoded string">>,
593
                response => RespEncoded,
594
                reason => Reason
595
            })
596
    end;
597
handle_external_http_result({ok, StatusCode, _RespHeaders, RespBody}) ->
NEW
598
    error(#{
×
599
        msg => bad_external_http_response_status_code,
600
        response => RespBody,
601
        status_code => StatusCode
602
    });
603
handle_external_http_result({error, {unrecoverable_error, #{status_code := _} = Reason}}) ->
604
    error(#{
1✔
605
        msg => external_http_request_failed,
606
        reason => maps:with([status_code, body], Reason)
607
    });
608
handle_external_http_result({error, {recoverable_error, #{status_code := _} = Reason}}) ->
NEW
609
    error(#{
×
610
        msg => external_http_request_failed,
611
        reason => maps:with([status_code, body], Reason)
612
    });
613
handle_external_http_result({error, {unrecoverable_error, Reason}}) ->
NEW
614
    error(#{
×
615
        msg => external_http_request_failed,
616
        reason => Reason
617
    });
618
handle_external_http_result({error, {recoverable_error, Reason}}) ->
619
    error(#{
1✔
620
        msg => external_http_request_failed,
621
        reason => Reason
622
    });
623
handle_external_http_result({error, Reason}) ->
NEW
624
    error(#{
×
625
        msg => external_http_request_failed,
626
        reason => Reason
627
    }).
628

629
append_query(Path, <<"">>) ->
NEW
630
    Path;
×
631
append_query(Path, Query) ->
632
    [Path, $?, Query].
9✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc