• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 13974320991

20 Mar 2025 04:23PM UTC coverage: 83.247%. First build
13974320991

Pull #14901

github

web-flow
Merge efefaa9b8 into 6c32718fc
Pull Request #14901: feat: port external HTTP serde from 4.x to schema registry

108 of 129 new or added lines in 5 files covered. (83.72%)

61712 of 74131 relevant lines covered (83.25%)

16590.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.71
/apps/emqx_schema_registry/src/emqx_schema_registry_schema.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2023-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%--------------------------------------------------------------------
4

5
-module(emqx_schema_registry_schema).
6

7
-include_lib("typerefl/include/types.hrl").
8
-include_lib("hocon/include/hoconsc.hrl").
9
-include("emqx_schema_registry.hrl").
10

11
%% `hocon_schema' API
12
-export([
13
    namespace/0,
14
    roots/0,
15
    fields/1,
16
    desc/1,
17
    tags/0,
18
    union_member_selector/1
19
]).
20

21
%% `minirest_trails' API
22
-export([
23
    api_schema/1
24
]).
25

26
%% API
27
-export([
28
    external_registry_type/0,
29
    external_registries_type/0,
30
    parse_url/1
31
]).
32

33
%%------------------------------------------------------------------------------
34
%% API
35
%%------------------------------------------------------------------------------
36

37
external_registry_type() ->
38
    emqx_schema:mkunion(
648✔
39
        type,
40
        #{
41
            <<"confluent">> => ref(confluent_schema_registry)
42
        },
43
        <<"confluent">>
44
    ).
45

46
external_registries_type() ->
47
    hoconsc:map(name, external_registry_type()).
292✔
48

49
parse_url(URL) ->
50
    Parsed = emqx_utils_uri:parse(URL),
2✔
51
    case Parsed of
2✔
52
        #{scheme := undefined} ->
NEW
53
            {error, {invalid_url, {no_scheme, URL}}};
×
54
        #{authority := undefined} ->
NEW
55
            {error, {invalid_url, {no_host, URL}}};
×
56
        #{authority := #{userinfo := Userinfo}} when Userinfo =/= undefined ->
NEW
57
            {error, {invalid_url, {userinfo_not_supported, URL}}};
×
58
        #{fragment := Fragment} when Fragment =/= undefined ->
NEW
59
            {error, {invalid_url, {fragments_not_supported, URL}}};
×
60
        _ ->
61
            case emqx_utils_uri:request_base(Parsed) of
2✔
62
                {ok, Base} ->
63
                    Path = emqx_utils_uri:path(Parsed),
2✔
64
                    QueryParams = emqx_maybe:define(emqx_utils_uri:query(Parsed), <<"">>),
2✔
65
                    {ok, {Base, Path, QueryParams}};
2✔
66
                {error, Reason} ->
NEW
67
                    {error, {invalid_url, {invalid_base, Reason, URL}}}
×
68
            end
69
    end.
70

71
%%------------------------------------------------------------------------------
72
%% `hocon_schema' APIs
73
%%------------------------------------------------------------------------------
74

75
namespace() -> ?CONF_KEY_ROOT.
2,195✔
76

77
roots() ->
78
    [{?CONF_KEY_ROOT, mk(ref(?CONF_KEY_ROOT), #{required => false})}].
4,477✔
79

80
tags() ->
81
    [<<"Schema Registry">>].
419✔
82

83
fields(?CONF_KEY_ROOT) ->
84
    [
1✔
85
        {external,
242✔
86
            mk(
87
                external_registries_type(),
88
                #{
89
                    default => #{},
90
                    desc => ?DESC("confluent_schema_registry")
91
                }
92
            )},
93
        {schemas,
94
            mk(
95
                hoconsc:map(
96
                    name,
97
                    hoconsc:union(fun union_member_selector/1)
98
                ),
99
                #{
100
                    default => #{},
101
                    desc => ?DESC("schema_registry_schemas"),
102
                    validator => fun validate_name/1
103
                }
104
            )}
105
    ];
106
fields(avro) ->
107
    [
108
        {type, mk(?avro, #{required => true, desc => ?DESC("schema_type_avro")})}
157✔
109
        | common_fields(emqx_schema:json_binary())
110
    ];
111
fields(protobuf) ->
112
    [
113
        {type, mk(?protobuf, #{required => true, desc => ?DESC("schema_type_protobuf")})}
160✔
114
        | common_fields(binary())
115
    ];
116
fields(json) ->
117
    [
118
        {type, mk(?json, #{required => true, desc => ?DESC("schema_type_json")})}
156✔
119
        | common_fields(emqx_schema:json_binary())
120
    ];
121
fields(external_http) ->
122
    [
123
        {type, mk(?external_http, #{required => true, desc => ?DESC("schema_type_external_http")})},
121✔
124
        {description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})},
125
        {parameters,
126
            mk(ref(external_http_params), #{required => true, desc => ?DESC("external_http_params")})}
127
    ];
128
fields(external_http_params) ->
129
    ConnectorFields0 = emqx_bridge_http_connector:fields(config),
121✔
130
    UnsupportedFields = [request, retry_interval, max_retries],
121✔
131
    ConnectorFields = lists:filter(
121✔
132
        fun({Field, _Sc}) -> not lists:member(Field, UnsupportedFields) end,
1,089✔
133
        ConnectorFields0
134
    ),
135
    [
136
        {url, mk(binary(), #{required => true, desc => ?DESC("external_http_url")})},
121✔
137
        {headers, mk(map(), #{default => #{}, desc => ?DESC("external_http_headers")})},
138
        {max_retries,
139
            mk(non_neg_integer(), #{
140
                default => 2, desc => ?DESC(emqx_bridge_http_schema, "config_max_retries")
141
            })},
142
        {request_timeout,
143
            mk(emqx_schema:timeout_duration_ms(), #{
144
                default => <<"10s">>, desc => ?DESC(emqx_bridge_http_connector, "request_timeout")
145
            })},
146
        {external_params,
147
            mk(binary(), #{default => <<"">>, desc => ?DESC("external_http_external_params")})}
148
        | ConnectorFields
149
    ];
150
fields(confluent_schema_registry) ->
151
    [
152
        {type,
109✔
153
            mk(confluent, #{default => confluent, desc => ?DESC("schema_registry_external_type")})},
154
        {url, mk(binary(), #{required => true, desc => ?DESC("confluent_schema_registry_url")})},
155
        {auth,
156
            mk(
157
                hoconsc:union([none, ref(confluent_schema_registry_auth_basic)]),
158
                #{default => none, desc => ?DESC("confluent_schema_registry_auth")}
159
            )}
160
    ];
161
fields(confluent_schema_registry_auth_basic) ->
162
    [
163
        {mechanism,
93✔
164
            mk(basic, #{
165
                required => true,
166
                default => basic,
167
                importance => ?IMPORTANCE_HIDDEN,
168
                desc => ?DESC("confluent_schema_registry_auth_basic")
169
            })},
170
        {username,
171
            mk(binary(), #{
172
                required => true,
173
                desc => ?DESC("confluent_schema_registry_auth_basic_username")
174
            })},
175
        {password,
176
            emqx_schema_secret:mk(#{
177
                required => true,
178
                desc => ?DESC("confluent_schema_registry_auth_basic_password")
179
            })}
180
    ];
181
fields("external_registry_api_create_" ++ NameStr) ->
182
    Name = list_to_existing_atom(NameStr),
44✔
183
    [
184
        {name, mk(binary(), #{required => true, desc => ?DESC("external_registry_name")})}
44✔
185
        | fields(Name)
186
    ];
187
fields("get_avro") ->
188
    [{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(avro)];
86✔
189
fields("get_protobuf") ->
190
    [{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(protobuf)];
85✔
191
fields("get_json") ->
192
    [{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(json)];
86✔
193
fields("get_external_http") ->
194
    %% TODO: move those structs related to HTTP API to the HTTP API module.
195
    [
196
        {name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})}
79✔
197
        | fields(external_http) ++ fields(api_resource_status)
198
    ];
199
fields(api_resource_status) ->
200
    %% TODO: move those structs related to HTTP API to the HTTP API module.
201
    [
202
        {status, mk(binary(), #{})},
79✔
203
        {node_status, mk(hoconsc:array(ref(api_node_status)), #{})}
204
    ];
205
fields(api_node_status) ->
206
    %% TODO: move those structs related to HTTP API to the HTTP API module.
207
    [
208
        {node, mk(binary(), #{})},
39✔
209
        {status, mk(binary(), #{})}
210
    ];
211
fields("put_avro") ->
212
    fields(avro);
×
213
fields("put_protobuf") ->
214
    fields(protobuf);
×
215
fields("put_json") ->
216
    fields(json);
×
217
fields("put_external_http") ->
NEW
218
    fields(external_http);
×
219
fields("post_external_http") ->
220
    Fields = fields("get_external_http"),
40✔
221
    GetOnlyFields = [node_status, status],
40✔
222
    lists:filter(fun({Field, _Sc}) -> not lists:member(Field, GetOnlyFields) end, Fields);
40✔
223
fields("post_" ++ Type) ->
224
    fields("get_" ++ Type).
140✔
225

226
common_fields(SourceType) ->
227
    [
228
        {source, mk(SourceType, #{required => true, desc => ?DESC("schema_source")})},
473✔
229
        {description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})}
230
    ].
231

232
desc(?CONF_KEY_ROOT) ->
233
    ?DESC("schema_registry_root");
243✔
234
desc(avro) ->
235
    ?DESC("avro_type");
32✔
236
desc(protobuf) ->
237
    ?DESC("protobuf_type");
36✔
238
desc(json) ->
239
    ?DESC("json_type");
31✔
240
desc(external_http) ->
241
    ?DESC("external_http_type");
3✔
242
desc(external_http_params) ->
243
    ?DESC("external_http_params");
4✔
244
desc(confluent_schema_registry) ->
245
    ?DESC("confluent_schema_registry");
26✔
246
desc(confluent_schema_registry_auth_basic) ->
247
    ?DESC("confluent_schema_registry_auth");
15✔
248
desc(_) ->
249
    undefined.
29✔
250

251
union_member_selector(all_union_members) ->
252
    refs();
328✔
253
union_member_selector({value, V}) ->
254
    refs(V).
102✔
255

256
union_member_selector_api(Method) ->
257
    fun
295✔
258
        (all_union_members) ->
259
            refs_api(Method);
745✔
260
        ({value, V}) ->
261
            refs_api(Method, V)
30✔
262
    end.
263

264
validate_name(NameSchemaMap) ->
265
    case maps:is_key(?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, NameSchemaMap) of
294✔
266
        true ->
267
            {error,
×
268
                <<"Illegal schema name ", ?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME/binary>>};
269
        false ->
270
            ok
294✔
271
    end.
272

273
%%------------------------------------------------------------------------------
274
%% `minirest_trails' "APIs"
275
%%------------------------------------------------------------------------------
276

277
api_schema("get") ->
278
    hoconsc:union(union_member_selector_api("get"));
145✔
279
api_schema("post") ->
280
    hoconsc:union(union_member_selector_api("post"));
150✔
281
api_schema("put") ->
282
    hoconsc:union(fun union_member_selector/1).
140✔
283

284
%%------------------------------------------------------------------------------
285
%% Internal fns
286
%%------------------------------------------------------------------------------
287

288
mk(Type, Meta) -> hoconsc:mk(Type, Meta).
8,479✔
289
ref(Name) -> hoconsc:ref(?MODULE, Name).
9,852✔
290

291
supported_serde_types() ->
292
    [?avro, ?protobuf, ?json, ?external_http].
1,079✔
293

294
refs() ->
295
    [ref(Type) || Type <- supported_serde_types()].
328✔
296

297
refs(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) ->
298
    refs(Value#{<<"type">> := atom_to_binary(TypeAtom)});
66✔
299
refs(#{<<"type">> := <<"avro">>}) ->
300
    [ref(avro)];
32✔
301
refs(#{<<"type">> := <<"protobuf">>}) ->
302
    [ref(protobuf)];
36✔
303
refs(#{<<"type">> := <<"json">>}) ->
304
    [ref(json)];
31✔
305
refs(#{<<"type">> := <<"external_http">>}) ->
306
    [ref(external_http)];
3✔
307
refs(_) ->
308
    Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]),
×
309
    throw(#{
×
310
        field_name => type,
311
        expected => Expected
312
    }).
313

314
refs_api(Method) ->
315
    [ref(Method ++ "_" ++ atom_to_list(T)) || T <- supported_serde_types()].
745✔
316

317
refs_api(Method, #{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) ->
NEW
318
    refs_api(Method, Value#{<<"type">> := atom_to_binary(TypeAtom)});
×
319
refs_api(Method, #{<<"type">> := <<"avro">>}) ->
320
    [ref(Method ++ "_avro")];
8✔
321
refs_api(Method, #{<<"type">> := <<"protobuf">>}) ->
322
    [ref(Method ++ "_protobuf")];
7✔
323
refs_api(Method, #{<<"type">> := <<"json">>}) ->
324
    [ref(Method ++ "_json")];
8✔
325
refs_api(Method, #{<<"type">> := <<"external_http">>}) ->
326
    [ref(Method ++ "_external_http")];
1✔
327
refs_api(_Method, _) ->
328
    Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]),
6✔
329
    throw(#{
6✔
330
        field_name => type,
331
        expected => Expected
332
    }).
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc