• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 13974320991

20 Mar 2025 04:23PM UTC coverage: 83.247%. First build
13974320991

Pull #14901

github

web-flow
Merge efefaa9b8 into 6c32718fc
Pull Request #14901: feat: port external HTTP serde from 4.x to schema registry

108 of 129 new or added lines in 5 files covered. (83.72%)

61712 of 74131 relevant lines covered (83.25%)

16590.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.39
/apps/emqx_schema_registry/src/emqx_schema_registry_http_api.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2023-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%--------------------------------------------------------------------
4
-module(emqx_schema_registry_http_api).
5

6
-behaviour(minirest_api).
7

8
-include("emqx_schema_registry.hrl").
9
-include_lib("hocon/include/hoconsc.hrl").
10
-include_lib("emqx/include/logger.hrl").
11
-include_lib("emqx_utils/include/emqx_utils_api.hrl").
12

13
-export([
14
    namespace/0,
15
    api_spec/0,
16
    paths/0,
17
    schema/1
18
]).
19

20
-export([
21
    '/schema_registry'/2,
22
    '/schema_registry/:name'/2,
23
    '/schema_registry_external'/2,
24
    '/schema_registry_external/registry/:name'/2
25
]).
26

27
%% BPAPI RPC Targets
28
-export([
29
    lookup_resource_from_local_node_v1/2
30
]).
31

32
-define(TAGS, [<<"Schema Registry">>]).
33
-define(BPAPI_NAME, emqx_schema_registry_http_api).
34

35
-define(IS_TYPE_WITH_RESOURCE(CONFIG), (map_get(type, CONFIG) == ?external_http)).
36

37
%%-------------------------------------------------------------------------------------------------
38
%% `minirest' and `minirest_trails' API
39
%%-------------------------------------------------------------------------------------------------
40

41
namespace() -> "schema_registry_http_api".
×
42

43
api_spec() ->
44
    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
39✔
45

46
paths() ->
47
    [
48
        "/schema_registry",
39✔
49
        "/schema_registry/:name",
50
        "/schema_registry_external",
51
        "/schema_registry_external/registry/:name"
52
    ].
53

54
schema("/schema_registry") ->
55
    #{
75✔
56
        'operationId' => '/schema_registry',
57
        get => #{
58
            tags => ?TAGS,
59
            summary => <<"List registered schemas">>,
60
            description => ?DESC("desc_schema_registry_api_list"),
61
            responses =>
62
                #{
63
                    200 =>
64
                        emqx_dashboard_swagger:schema_with_examples(
65
                            hoconsc:array(emqx_schema_registry_schema:api_schema("get")),
66
                            #{
67
                                sample =>
68
                                    #{value => sample_list_schemas_response()}
69
                            }
70
                        )
71
                }
72
        },
73
        post => #{
74
            tags => ?TAGS,
75
            summary => <<"Register a new schema">>,
76
            description => ?DESC("desc_schema_registry_api_post"),
77
            'requestBody' => emqx_dashboard_swagger:schema_with_examples(
78
                emqx_schema_registry_schema:api_schema("post"),
79
                post_examples()
80
            ),
81
            responses =>
82
                #{
83
                    201 =>
84
                        emqx_dashboard_swagger:schema_with_examples(
85
                            emqx_schema_registry_schema:api_schema("post"),
86
                            post_examples()
87
                        ),
88
                    400 => error_schema('ALREADY_EXISTS', "Schema already exists")
89
                }
90
        }
91
    };
92
schema("/schema_registry/:name") ->
93
    #{
70✔
94
        'operationId' => '/schema_registry/:name',
95
        get => #{
96
            tags => ?TAGS,
97
            summary => <<"Get registered schema">>,
98
            description => ?DESC("desc_schema_registry_api_get"),
99
            parameters => [param_path_schema_name()],
100
            responses =>
101
                #{
102
                    200 =>
103
                        emqx_dashboard_swagger:schema_with_examples(
104
                            emqx_schema_registry_schema:api_schema("get"),
105
                            get_examples()
106
                        ),
107
                    404 => error_schema('NOT_FOUND', "Schema not found")
108
                }
109
        },
110
        put => #{
111
            tags => ?TAGS,
112
            summary => <<"Update a schema">>,
113
            description => ?DESC("desc_schema_registry_api_put"),
114
            parameters => [param_path_schema_name()],
115
            'requestBody' => emqx_dashboard_swagger:schema_with_examples(
116
                emqx_schema_registry_schema:api_schema("put"),
117
                put_examples()
118
            ),
119
            responses =>
120
                #{
121
                    200 =>
122
                        emqx_dashboard_swagger:schema_with_examples(
123
                            emqx_schema_registry_schema:api_schema("put"),
124
                            post_examples()
125
                        ),
126
                    404 => error_schema('NOT_FOUND', "Schema not found")
127
                }
128
        },
129
        delete => #{
130
            tags => ?TAGS,
131
            summary => <<"Delete registered schema">>,
132
            description => ?DESC("desc_schema_registry_api_delete"),
133
            parameters => [param_path_schema_name()],
134
            responses =>
135
                #{
136
                    204 => <<"Schema deleted">>,
137
                    404 => error_schema('NOT_FOUND', "Schema not found")
138
                }
139
        }
140
    };
141
schema("/schema_registry_external") ->
142
    #{
49✔
143
        'operationId' => '/schema_registry_external',
144
        get => #{
145
            tags => ?TAGS,
146
            summary => <<"List external registries">>,
147
            description => ?DESC("external_registry_list"),
148
            responses =>
149
                #{
150
                    200 =>
151
                        emqx_dashboard_swagger:schema_with_examples(
152
                            hoconsc:map(
153
                                name, emqx_schema_registry_schema:external_registries_type()
154
                            ),
155
                            #{
156
                                sample =>
157
                                    #{value => sample_list_external_registries_response()}
158
                            }
159
                        )
160
                }
161
        },
162
        post => #{
163
            tags => ?TAGS,
164
            summary => <<"Add a new external registry">>,
165
            description => ?DESC("external_registry_create"),
166
            'requestBody' => emqx_dashboard_swagger:schema_with_examples(
167
                hoconsc:union(fun create_external_registry_union/1),
168
                create_external_registry_input_examples(post)
169
            ),
170
            responses =>
171
                #{
172
                    201 =>
173
                        emqx_dashboard_swagger:schema_with_examples(
174
                            emqx_schema_registry_schema:external_registry_type(),
175
                            create_external_registry_input_examples(get)
176
                        ),
177
                    400 => error_schema('ALREADY_EXISTS', "Schema already exists")
178
                }
179
        }
180
    };
181
schema("/schema_registry_external/registry/:name") ->
182
    #{
54✔
183
        'operationId' => '/schema_registry_external/registry/:name',
184
        get => #{
185
            tags => ?TAGS,
186
            summary => <<"Lookup external registry">>,
187
            description => ?DESC("external_registry_lookup"),
188
            parameters => [param_path_external_registry_name()],
189
            responses =>
190
                #{
191
                    200 =>
192
                        emqx_dashboard_swagger:schema_with_examples(
193
                            emqx_schema_registry_schema:external_registry_type(),
194
                            create_external_registry_input_examples(put)
195
                        ),
196
                    404 => error_schema('NOT_FOUND', "Schema not found")
197
                }
198
        },
199
        put => #{
200
            tags => ?TAGS,
201
            summary => <<"Update external registry">>,
202
            description => ?DESC("external_registry_update"),
203
            parameters => [param_path_external_registry_name()],
204
            'requestBody' => emqx_dashboard_swagger:schema_with_examples(
205
                emqx_schema_registry_schema:external_registry_type(),
206
                create_external_registry_input_examples(put)
207
            ),
208
            responses =>
209
                #{
210
                    200 =>
211
                        emqx_dashboard_swagger:schema_with_examples(
212
                            emqx_schema_registry_schema:external_registry_type(),
213
                            create_external_registry_input_examples(put)
214
                        ),
215
                    404 => error_schema('NOT_FOUND', "Schema not found")
216
                }
217
        },
218
        delete => #{
219
            tags => ?TAGS,
220
            summary => <<"Delete external registry">>,
221
            description => ?DESC("external_registry_delete"),
222
            parameters => [param_path_external_registry_name()],
223
            responses => #{204 => <<"Deleted">>}
224
        }
225
    }.
226

227
%%-------------------------------------------------------------------------------------------------
228
%% API
229
%%-------------------------------------------------------------------------------------------------
230

231
'/schema_registry'(get, _Params) ->
232
    Schemas = emqx_schema_registry:list_schemas(),
6✔
233
    Response =
6✔
234
        lists:map(
235
            fun({Name, Params}) ->
236
                Params#{name => Name}
3✔
237
            end,
238
            maps:to_list(Schemas)
239
        ),
240
    ?OK(Response);
6✔
241
'/schema_registry'(post, #{body := Params0 = #{<<"name">> := Name}}) ->
242
    try
18✔
243
        ok = emqx_resource:validate_name(Name),
18✔
244
        Params = maps:without([<<"name">>], Params0),
12✔
245
        case emqx_schema_registry:get_schema(Name) of
12✔
246
            {error, not_found} ->
247
                case emqx_schema_registry:add_schema(Name, Params) of
9✔
248
                    ok ->
249
                        {ok, Res} = emqx_schema_registry:get_schema(Name),
6✔
250
                        {201, Res#{name => Name}};
6✔
251
                    {error, Error} ->
252
                        ?BAD_REQUEST(Error)
3✔
253
                end;
254
            {ok, _} ->
255
                ?BAD_REQUEST('ALREADY_EXISTS', <<"Schema already exists">>)
3✔
256
        end
257
    catch
258
        throw:#{kind := Kind, reason := Reason} ->
259
            Msg0 = ?ERROR_MSG('BAD_REQUEST', Reason),
6✔
260
            Msg = Msg0#{kind => Kind},
6✔
261
            {400, Msg}
6✔
262
    end.
263

264
'/schema_registry/:name'(get, #{bindings := #{name := Name}}) ->
265
    case emqx_schema_registry:get_schema(Name) of
14✔
266
        {error, not_found} ->
267
            ?NOT_FOUND(<<"Schema not found">>);
9✔
268
        {ok, Schema} when ?IS_TYPE_WITH_RESOURCE(Schema) ->
269
            add_resource_info(Name, Schema);
2✔
270
        {ok, Schema} ->
271
            ?OK(Schema#{name => Name})
3✔
272
    end;
273
'/schema_registry/:name'(put, #{bindings := #{name := Name}, body := Params}) ->
274
    case emqx_schema_registry:get_schema(Name) of
10✔
275
        {error, not_found} ->
276
            ?NOT_FOUND(<<"Schema not found">>);
3✔
277
        {ok, _} ->
278
            case emqx_schema_registry:add_schema(Name, Params) of
7✔
279
                ok ->
280
                    {ok, Res} = emqx_schema_registry:get_schema(Name),
4✔
281
                    ?OK(Res#{name => Name});
4✔
282
                {error, Error} ->
283
                    ?BAD_REQUEST(Error)
3✔
284
            end
285
    end;
286
'/schema_registry/:name'(delete, #{bindings := #{name := Name}}) ->
287
    case emqx_schema_registry:get_schema(Name) of
7✔
288
        {error, not_found} ->
289
            ?NOT_FOUND(<<"Schema not found">>);
3✔
290
        {ok, _} ->
291
            case emqx_schema_registry:delete_schema(Name) of
4✔
292
                ok ->
293
                    ?NO_CONTENT;
4✔
294
                {error, Error} ->
295
                    ?BAD_REQUEST(Error)
×
296
            end
297
    end.
298

299
%% External registries
300
'/schema_registry_external'(get, _Params) ->
301
    Registries0 = emqx_schema_registry_config:list_external_registries_raw(),
5✔
302
    Registries = maps:map(
5✔
303
        fun(_Name, Registry) -> emqx_utils:redact(Registry) end,
5✔
304
        Registries0
305
    ),
306
    ?OK(Registries);
5✔
307
'/schema_registry_external'(post, #{body := Params0 = #{<<"name">> := Name}}) ->
308
    Params = maps:remove(<<"name">>, Params0),
4✔
309
    with_external_registry(
4✔
310
        Name,
311
        fun() ->
312
            ?BAD_REQUEST(<<"External registry already exists">>)
×
313
        end,
314
        fun() ->
315
            case emqx_schema_registry_config:upsert_external_registry(Name, Params) of
4✔
316
                {ok, Registry} ->
317
                    ?CREATED(external_registry_out(Registry));
4✔
318
                {error, Reason} ->
319
                    ?BAD_REQUEST(Reason)
×
320
            end
321
        end
322
    ).
323

324
'/schema_registry_external/registry/:name'(get, #{bindings := #{name := Name}}) ->
325
    with_external_registry(
6✔
326
        Name,
327
        fun(Registry) ->
328
            ?OK(external_registry_out(Registry))
3✔
329
        end,
330
        not_found()
331
    );
332
'/schema_registry_external/registry/:name'(put, #{bindings := #{name := Name}, body := Params}) ->
333
    with_external_registry(
4✔
334
        Name,
335
        fun() ->
336
            case emqx_schema_registry_config:upsert_external_registry(Name, Params) of
1✔
337
                {ok, Registry} ->
338
                    ?OK(external_registry_out(Registry));
1✔
339
                {error, Reason} ->
340
                    ?BAD_REQUEST(Reason)
×
341
            end
342
        end,
343
        not_found()
344
    );
345
'/schema_registry_external/registry/:name'(delete, #{bindings := #{name := Name}}) ->
346
    with_external_registry(
4✔
347
        Name,
348
        fun() ->
349
            case emqx_schema_registry_config:delete_external_registry(Name) of
1✔
350
                ok ->
351
                    ?NO_CONTENT;
1✔
352
                {error, Reason} ->
353
                    ?BAD_REQUEST(Reason)
×
354
            end
355
        end,
356
        fun() -> ?NO_CONTENT end
3✔
357
    ).
358

359
%%-------------------------------------------------------------------------------------------------
360
%% Examples
361
%%-------------------------------------------------------------------------------------------------
362

363
sample_list_schemas_response() ->
364
    [sample_get_schema_response(avro)].
75✔
365

366
sample_get_schema_response(avro) ->
367
    #{
435✔
368
        type => <<"avro">>,
369
        name => <<"my_avro_schema">>,
370
        description => <<"My Avro Schema">>,
371
        source => <<
372
            "{\"type\":\"record\","
373
            "\"name\":\"test\","
374
            "\"fields\":[{\"type\":\"int\",\"name\":\"i\"},"
375
            "{\"type\":\"string\",\"name\":\"s\"}]}"
376
        >>
377
    }.
378

379
sample_get_schema_response(avro, put) ->
380
    maps:without([name], sample_get_schema_response(avro));
70✔
381
sample_get_schema_response(avro, _Method) ->
382
    sample_get_schema_response(avro).
290✔
383

384
put_examples() ->
385
    example_template(put).
70✔
386

387
post_examples() ->
388
    example_template(post).
220✔
389

390
get_examples() ->
391
    example_template(get).
70✔
392

393
example_template(Method) ->
394
    #{
360✔
395
        <<"avro_schema">> =>
396
            #{
397
                summary => <<"Avro">>,
398
                value => sample_get_schema_response(avro, Method)
399
            }
400
    }.
401

402
sample_list_external_registries_response() ->
403
    #{<<"my_registry">> => sample_get_external_registry_response(confluent)}.
49✔
404

405
sample_get_external_registry_response(confluent) ->
406
    #{
309✔
407
        type => <<"confluent">>,
408
        name => <<"test">>,
409
        url => <<"http://confluent_schema_registry:8081">>,
410
        auth => #{
411
            mechanism => <<"basic">>,
412
            username => <<"cpsruser">>,
413
            password => <<"******">>
414
        }
415
    }.
416

417
sample_get_external_registry_response(confluent, put) ->
418
    maps:without([name], sample_get_external_registry_response(confluent));
162✔
419
sample_get_external_registry_response(confluent, _Method) ->
420
    sample_get_external_registry_response(confluent).
98✔
421

422
create_external_registry_input_examples(Method) ->
423
    #{
260✔
424
        <<"confluent">> =>
425
            #{
426
                summary => <<"Confluent">>,
427
                value => sample_get_external_registry_response(confluent, Method)
428
            }
429
    }.
430

431
%%-------------------------------------------------------------------------------------------------
432
%% Schemas and hocon types
433
%%-------------------------------------------------------------------------------------------------
434

435
param_path_schema_name() ->
436
    {name,
210✔
437
        mk(
438
            binary(),
439
            #{
440
                in => path,
441
                required => true,
442
                example => <<"my_schema">>,
443
                desc => ?DESC("desc_param_path_schema_name")
444
            }
445
        )}.
446

447
param_path_external_registry_name() ->
448
    {name,
162✔
449
        mk(
450
            binary(),
451
            #{
452
                in => path,
453
                required => true,
454
                example => <<"my_registry">>,
455
                desc => ?DESC("param_path_external_registry_name")
456
            }
457
        )}.
458

459
%%-------------------------------------------------------------------------------------------------
460
%% BPAPI RPC Targets
461
%%-------------------------------------------------------------------------------------------------
462

463
-spec lookup_resource_from_local_node_v1(serde_type(), schema_name()) ->
464
    {ok, map()} | {error, not_found}.
465
lookup_resource_from_local_node_v1(_Type, Name) ->
466
    maybe
2✔
467
        {ok, #serde{eval_context = #{resource_id := ResourceId}}} ?=
2✔
468
            emqx_schema_registry:get_serde(Name),
469
        {ok, ?SCHEMA_REGISTRY_RESOURCE_GROUP, #{status := Status}} ?=
2✔
470
            emqx_resource:get_instance(ResourceId),
471
        Formatted = #{status => Status},
2✔
472
        {ok, Formatted}
2✔
473
    else
NEW
474
        _ -> {error, not_found}
×
475
    end.
476

477
%%-------------------------------------------------------------------------------------------------
478
%% Internal fns
479
%%-------------------------------------------------------------------------------------------------
480

481
mk(Type, Meta) -> hoconsc:mk(Type, Meta).
372✔
482

483
error_schema(Code, Message) when is_atom(Code) ->
484
    error_schema([Code], Message);
442✔
485
error_schema(Codes, Message) when is_list(Message) ->
486
    error_schema(Codes, list_to_binary(Message));
442✔
487
error_schema(Codes, Message) when is_list(Codes) andalso is_binary(Message) ->
488
    emqx_dashboard_swagger:error_codes(Codes, Message).
442✔
489

490
external_registry_out(Registry) ->
491
    emqx_utils:redact(Registry).
8✔
492

493
create_external_registry_union(all_union_members) ->
494
    ?UNION(UnionFn) = emqx_schema_registry_schema:external_registry_type(),
140✔
495
    Refs = UnionFn(all_union_members),
140✔
496
    lists:map(
140✔
497
        fun(?R_REF(emqx_schema_registry_schema, Name)) ->
498
            NameStr = emqx_utils_conv:str(Name),
140✔
499
            Struct = "external_registry_api_create_" ++ NameStr,
140✔
500
            ?R_REF(emqx_schema_registry_schema, Struct)
140✔
501
        end,
502
        Refs
503
    );
504
create_external_registry_union({value, V}) ->
505
    ?UNION(UnionFn) = emqx_schema_registry_schema:external_registry_type(),
5✔
506
    %% will throw if there's no match; always return single match
507
    [?R_REF(emqx_schema_registry_schema, Name)] = UnionFn({value, V}),
5✔
508
    NameStr = emqx_utils_conv:str(Name),
5✔
509
    Struct = "external_registry_api_create_" ++ NameStr,
5✔
510
    [?R_REF(emqx_schema_registry_schema, Struct)].
5✔
511

512
not_found() -> fun() -> ?NOT_FOUND(<<"External registry not found">>) end.
10✔
513

514
with_external_registry(Name, FoundFn, NotFoundFn) ->
515
    case emqx_schema_registry_config:lookup_external_registry_raw(Name) of
18✔
516
        {ok, _Registry} when is_function(FoundFn, 0) ->
517
            FoundFn();
2✔
518
        {ok, Registry} when is_function(FoundFn, 1) ->
519
            FoundFn(Registry);
3✔
520
        {error, not_found} ->
521
            NotFoundFn()
13✔
522
    end.
523

524
add_resource_info(Name, SchemaConfig) ->
525
    case lookup_resource_from_all_nodes(Name, SchemaConfig) of
2✔
526
        {ok, NodeResources} ->
527
            NodeStatus = nodes_status(NodeResources),
2✔
528
            Status = aggregate_status(NodeStatus),
2✔
529
            ?OK(SchemaConfig#{
2✔
530
                name => Name,
531
                status => Status,
532
                node_status => NodeStatus
533
            });
534
        {error, NodeErrors} ->
NEW
535
            ?INTERNAL_ERROR(NodeErrors)
×
536
    end.
537

538
nodes_status(NodeResources) ->
539
    lists:map(
2✔
540
        fun({Node, Res}) -> {Node, maps:get(status, Res, undefined)} end,
2✔
541
        maps:to_list(NodeResources)
542
    ).
543

544
aggregate_status(NodeStatus) ->
545
    AllStatus = lists:map(fun({_Node, Status}) -> Status end, NodeStatus),
2✔
546
    case lists:usort(AllStatus) of
2✔
547
        [Status] ->
548
            Status;
2✔
549
        _ ->
NEW
550
            inconsistent
×
551
    end.
552

553
lookup_resource_from_all_nodes(Name, SchemaConfig) ->
554
    #{type := Type} = SchemaConfig,
2✔
555
    Nodes = nodes_supporting_bpapi_version(1),
2✔
556
    Results = emqx_schema_registry_http_api_proto_v1:lookup_resource_from_all_nodes(
2✔
557
        Nodes, Type, Name
558
    ),
559
    NodeResults = lists:zip(Nodes, Results),
2✔
560
    sequence_node_results(NodeResults).
2✔
561

562
nodes_supporting_bpapi_version(Vsn) ->
563
    [
2✔
564
        N
2✔
565
     || N <- emqx:running_nodes(),
2✔
566
        case emqx_bpapi:supported_version(N, ?BPAPI_NAME) of
2✔
NEW
567
            undefined -> false;
×
568
            NVsn when is_number(NVsn) -> NVsn >= Vsn
2✔
569
        end
570
    ].
571

572
sequence_node_results(NodeResults) ->
573
    {Ok, Error} =
2✔
574
        lists:foldl(
575
            fun
576
                ({Node, {ok, {ok, Val}}}, {OkAcc, ErrAcc}) ->
577
                    {OkAcc#{Node => Val}, ErrAcc};
2✔
578
                ({Node, {ok, Error}}, {OkAcc, ErrAcc}) ->
NEW
579
                    {OkAcc, ErrAcc#{Node => Error}};
×
580
                ({Node, Error}, {OkAcc, ErrAcc}) ->
NEW
581
                    {OkAcc, ErrAcc#{Node => Error}}
×
582
            end,
583
            {#{}, #{}},
584
            NodeResults
585
        ),
586
    EmptyResults = map_size(Ok) == 0,
2✔
587
    case map_size(Error) == 0 of
2✔
588
        true when not EmptyResults ->
589
            {ok, Ok};
2✔
590
        true ->
NEW
591
            {error, empty_results};
×
592
        false ->
NEW
593
            {error, Error}
×
594
    end.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc