• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 13979276088

20 Mar 2025 09:02PM UTC coverage: 83.253%. First build
13979276088

Pull #14901

github

web-flow
Merge 7b2283018 into 6c32718fc
Pull Request #14901: feat: port external HTTP serde from 4.x to schema registry

113 of 142 new or added lines in 7 files covered. (79.58%)

61708 of 74121 relevant lines covered (83.25%)

16593.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.5
/apps/emqx_message_transformation/src/emqx_message_transformation.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2024-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%--------------------------------------------------------------------
4
-module(emqx_message_transformation).
5

6
-feature(maybe_expr, enable).
7

8
-include_lib("snabbkaffe/include/trace.hrl").
9
-include_lib("emqx_utils/include/emqx_message.hrl").
10
-include_lib("emqx/include/emqx_hooks.hrl").
11
-include_lib("emqx/include/logger.hrl").
12

13
%% API
14
-export([
15
    list/0,
16
    reorder/1,
17
    lookup/1,
18
    insert/1,
19
    update/1,
20
    delete/1
21
]).
22

23
%% `emqx_hooks' API
24
-export([
25
    register_hooks/0,
26
    unregister_hooks/0,
27

28
    on_message_publish/1
29
]).
30

31
%% Internal exports
32
-export([run_transformation/2, trace_failure_context_to_map/1, prettify_operation/1]).
33

34
%%------------------------------------------------------------------------------
35
%% Type declarations
36
%%------------------------------------------------------------------------------
37

38
-define(TRACE_TAG, "MESSAGE_TRANSFORMATION").
39

40
-record(trace_failure_context, {
41
    transformation :: transformation(),
42
    tag :: atom(),
43
    context :: map()
44
}).
45
-type trace_failure_context() :: #trace_failure_context{}.
46

47
-type transformation_name() :: binary().
48
%% TODO: make more specific typespec
49
-type transformation() :: #{atom() => term()}.
50
%% TODO: make more specific typespec
51
-type variform() :: any().
52
-type failure_action() :: ignore | drop | disconnect.
53
-type operation() :: #{key := [binary(), ...], value := variform()}.
54
-type qos() :: 0..2.
55
-type rendered_value() :: qos() | boolean() | binary().
56

57
-type eval_context() :: #{
58
    client_attrs := map(),
59
    clientid := _,
60
    flags := _,
61
    id := _,
62
    node := _,
63
    payload := _,
64
    peername := _,
65
    pub_props := _,
66
    publish_received_at := _,
67
    qos := _,
68
    retain := _,
69
    timestamp := _,
70
    topic := _,
71
    user_property := _,
72
    username := _,
73
    dirty := #{
74
        payload => true,
75
        qos => true,
76
        retain => true,
77
        topic => true,
78
        user_property => true
79
    }
80
}.
81

82
-export_type([
83
    transformation/0,
84
    transformation_name/0,
85
    failure_action/0
86
]).
87

88
%%------------------------------------------------------------------------------
89
%% API
90
%%------------------------------------------------------------------------------
91

92
-spec list() -> [transformation()].
93
list() ->
94
    emqx_message_transformation_config:list().
40✔
95

96
-spec reorder([transformation_name()]) ->
97
    {ok, _} | {error, _}.
98
reorder(Order) ->
99
    emqx_message_transformation_config:reorder(Order).
9✔
100

101
-spec lookup(transformation_name()) -> {ok, transformation()} | {error, not_found}.
102
lookup(Name) ->
103
    emqx_message_transformation_config:lookup(Name).
243✔
104

105
-spec insert(transformation()) ->
106
    {ok, _} | {error, _}.
107
insert(Transformation) ->
108
    emqx_message_transformation_config:insert(Transformation).
87✔
109

110
-spec update(transformation()) ->
111
    {ok, _} | {error, _}.
112
update(Transformation) ->
113
    emqx_message_transformation_config:update(Transformation).
24✔
114

115
-spec delete(transformation_name()) ->
116
    {ok, _} | {error, _}.
117
delete(Name) ->
118
    emqx_message_transformation_config:delete(Name).
88✔
119

120
%%------------------------------------------------------------------------------
121
%% Hooks
122
%%------------------------------------------------------------------------------
123

124
-spec register_hooks() -> ok.
125
register_hooks() ->
126
    emqx_hooks:put(
3✔
127
        'message.publish', {?MODULE, on_message_publish, []}, ?HP_MESSAGE_TRANSFORMATION
128
    ).
129

130
-spec unregister_hooks() -> ok.
131
unregister_hooks() ->
132
    emqx_hooks:del('message.publish', {?MODULE, on_message_publish}).
3✔
133

134
-spec on_message_publish(emqx_types:message()) ->
135
    {ok, emqx_types:message()} | {stop, emqx_types:message()}.
136
on_message_publish(Message = #message{topic = Topic}) ->
137
    case emqx_message_transformation_registry:matching_transformations(Topic) of
373✔
138
        [] ->
139
            ok;
322✔
140
        Transformations ->
141
            run_transformations(Transformations, Message)
51✔
142
    end.
143

144
%%------------------------------------------------------------------------------
145
%% Internal exports
146
%%------------------------------------------------------------------------------
147

148
-spec run_transformation(transformation(), emqx_types:message()) ->
149
    {ok, emqx_types:message()} | {failure_action(), trace_failure_context()}.
150
run_transformation(Transformation, MessageIn) ->
151
    #{
64✔
152
        operations := Operations,
153
        failure_action := FailureAction,
154
        payload_decoder := PayloadDecoder
155
    } = Transformation,
156
    Fun = fun(Operation, Acc) ->
64✔
157
        case eval_operation(Operation, Transformation, Acc) of
117✔
158
            {ok, NewAcc} -> {cont, NewAcc};
99✔
159
            {error, TraceFailureContext} -> {halt, {error, TraceFailureContext}}
18✔
160
        end
161
    end,
162
    PayloadIn = MessageIn#message.payload,
64✔
163
    case decode(PayloadIn, PayloadDecoder, Transformation) of
64✔
164
        {ok, InitPayload} ->
165
            InitAcc = message_to_context(MessageIn, InitPayload, Transformation),
53✔
166
            case emqx_utils:foldl_while(Fun, InitAcc, Operations) of
53✔
167
                #{} = ContextOut ->
168
                    context_to_message(MessageIn, ContextOut, Transformation);
35✔
169
                {error, TraceFailureContext} ->
170
                    {FailureAction, TraceFailureContext}
18✔
171
            end;
172
        {error, TraceFailureContext} ->
173
            {FailureAction, TraceFailureContext}
11✔
174
    end.
175

176
prettify_operation(Operation0) ->
177
    %% TODO: remove injected bif module
178
    Operation = maps:update_with(
216✔
179
        value,
180
        fun(V) -> iolist_to_binary(emqx_variform:decompile(V)) end,
216✔
181
        Operation0
182
    ),
183
    maps:update_with(
216✔
184
        key,
185
        fun(Path) -> iolist_to_binary(lists:join(".", Path)) end,
216✔
186
        Operation
187
    ).
188

189
%%------------------------------------------------------------------------------
190
%% Internal functions
191
%%------------------------------------------------------------------------------
192

193
-spec eval_operation(operation(), transformation(), eval_context()) ->
194
    {ok, eval_context()} | {error, trace_failure_context()}.
195
eval_operation(Operation, Transformation, Context) ->
196
    #{key := K, value := V} = Operation,
117✔
197
    try
117✔
198
        case eval_variform(K, V, Context) of
117✔
199
            {error, Reason} ->
200
                FailureContext = #trace_failure_context{
17✔
201
                    transformation = Transformation,
202
                    tag = transformation_eval_operation_failure,
203
                    context = #{reason => Reason}
204
                },
205
                {error, FailureContext};
17✔
206
            {ok, Rendered} ->
207
                NewContext = put_value(K, Rendered, Context),
100✔
208
                {ok, NewContext}
99✔
209
        end
210
    catch
211
        Class:Error:Stacktrace ->
212
            FailureContext1 = #trace_failure_context{
1✔
213
                transformation = Transformation,
214
                tag = transformation_eval_operation_exception,
215
                context = #{
216
                    kind => Class,
217
                    reason => Error,
218
                    stacktrace => Stacktrace,
219
                    operation => prettify_operation(Operation)
220
                }
221
            },
222
            {error, FailureContext1}
1✔
223
    end.
224

225
-spec eval_variform([binary(), ...], _, eval_context()) ->
226
    {ok, rendered_value()} | {error, term()}.
227
eval_variform(K, V, Context) ->
228
    Opts =
117✔
229
        case K of
230
            [<<"payload">> | _] ->
231
                #{eval_as_string => false};
52✔
232
            _ ->
233
                #{}
65✔
234
        end,
235
    case emqx_variform:render(V, Context, Opts) of
117✔
236
        {error, Reason} ->
237
            {error, Reason};
17✔
238
        {ok, Rendered} ->
239
            map_result(Rendered, K)
100✔
240
    end.
241

242
-spec put_value([binary(), ...], rendered_value(), eval_context()) -> eval_context().
243
put_value([<<"payload">> | Rest], Rendered, Context0) ->
244
    Context = maps:update_with(dirty, fun(D) -> D#{payload => true} end, Context0),
52✔
245
    maps:update_with(
52✔
246
        payload,
247
        fun(P) ->
248
            case Rest of
52✔
249
                [] ->
250
                    Rendered;
10✔
251
                _ ->
252
                    emqx_utils_maps:deep_put(Rest, P, Rendered)
42✔
253
            end
254
        end,
255
        Context
256
    );
257
put_value([<<"user_property">>, Key], Rendered, Context0) ->
258
    Context = maps:update_with(dirty, fun(D) -> D#{user_property => true} end, Context0),
14✔
259
    maps:update_with(
14✔
260
        user_property,
261
        fun(Ps) -> maps:put(Key, Rendered, Ps) end,
14✔
262
        Context
263
    );
264
put_value([<<"qos">>], Rendered, Context0) ->
265
    Context = maps:update_with(dirty, fun(D) -> D#{qos => true} end, Context0),
8✔
266
    Context#{qos := Rendered};
8✔
267
put_value([<<"retain">>], Rendered, Context0) ->
268
    Context = maps:update_with(dirty, fun(D) -> D#{retain => true} end, Context0),
7✔
269
    Context#{retain := Rendered};
7✔
270
put_value([<<"topic">>], Rendered, Context0) ->
271
    Context = maps:update_with(dirty, fun(D) -> D#{topic => true} end, Context0),
19✔
272
    Context#{topic := Rendered}.
19✔
273

274
-spec map_result(binary(), [binary(), ...]) ->
275
    {ok, 0..2 | boolean() | binary()} | {error, map()}.
276
map_result(QoSBin, [<<"qos">>]) ->
277
    case QoSBin of
8✔
278
        <<"0">> -> {ok, 0};
2✔
279
        <<"1">> -> {ok, 1};
4✔
280
        <<"2">> -> {ok, 2};
2✔
281
        _ -> {error, #{reason => bad_qos_value, input => QoSBin}}
×
282
    end;
283
map_result(RetainBin, [<<"retain">>]) ->
284
    case RetainBin of
7✔
285
        <<"true">> -> {ok, true};
7✔
286
        <<"false">> -> {ok, false};
×
287
        _ -> {error, #{reason => bad_retain_value, input => RetainBin}}
×
288
    end;
289
map_result(Rendered, _Key) ->
290
    {ok, Rendered}.
85✔
291

292
run_transformations(Transformations, Message = #message{headers = Headers}) ->
293
    case do_run_transformations(Transformations, Message) of
51✔
294
        #message{} = FinalMessage ->
295
            emqx_metrics:inc('messages.transformation_succeeded'),
19✔
296
            {ok, FinalMessage};
19✔
297
        drop ->
298
            emqx_metrics:inc('messages.transformation_failed'),
25✔
299
            {stop, Message#message{headers = Headers#{allow_publish => false}}};
25✔
300
        disconnect ->
301
            emqx_metrics:inc('messages.transformation_failed'),
7✔
302
            {stop, Message#message{
7✔
303
                headers = Headers#{
304
                    allow_publish => false,
305
                    should_disconnect => true
306
                }
307
            }}
308
    end.
309

310
do_run_transformations(Transformations, Message) ->
311
    LastTransformation = #{name := LastTransformationName} = lists:last(Transformations),
51✔
312
    Fun = fun(Transformation, MessageAcc) ->
51✔
313
        #{name := Name} = Transformation,
60✔
314
        emqx_message_transformation_registry:inc_matched(Name),
60✔
315
        case run_transformation(Transformation, MessageAcc) of
60✔
316
            {ok, #message{} = NewAcc} ->
317
                %% If this is the last transformation, we can't bump its success counter
318
                %% yet.  We perform a check to see if the final payload is encoded as a
319
                %% binary after all transformations have run, and it's the last
320
                %% transformation's responsibility to properly encode it.
321
                case Name =:= LastTransformationName of
29✔
322
                    true ->
323
                        ok;
20✔
324
                    false ->
325
                        emqx_message_transformation_registry:inc_succeeded(Name)
9✔
326
                end,
327
                {cont, NewAcc};
29✔
328
            {ignore, TraceFailureContext} ->
329
                trace_failure_from_context(TraceFailureContext),
1✔
330
                emqx_message_transformation_registry:inc_failed(Name),
1✔
331
                run_message_transformation_failed_hook(Message, Transformation),
1✔
332
                {cont, MessageAcc};
1✔
333
            {FailureAction, TraceFailureContext} ->
334
                trace_failure_from_context(TraceFailureContext),
30✔
335
                trace_failure(Transformation, transformation_failed, #{
30✔
336
                    transformation => Name,
337
                    action => FailureAction
338
                }),
339
                emqx_message_transformation_registry:inc_failed(Name),
30✔
340
                run_message_transformation_failed_hook(Message, Transformation),
30✔
341
                {halt, FailureAction}
30✔
342
        end
343
    end,
344
    case emqx_utils:foldl_while(Fun, Message, Transformations) of
51✔
345
        #message{} = FinalMessage ->
346
            case is_payload_properly_encoded(FinalMessage) of
21✔
347
                true ->
348
                    emqx_message_transformation_registry:inc_succeeded(LastTransformationName),
19✔
349
                    FinalMessage;
19✔
350
                false ->
351
                    %% Take the last validation's failure action, as it's the one
352
                    %% responsible for getting the right encoding.
353
                    emqx_message_transformation_registry:inc_failed(LastTransformationName),
2✔
354
                    #{failure_action := FailureAction} = LastTransformation,
2✔
355
                    trace_failure(LastTransformation, transformation_bad_encoding, #{
2✔
356
                        action => FailureAction,
357
                        explain => <<"final payload must be encoded as a binary">>
358
                    }),
359
                    FailureAction
2✔
360
            end;
361
        FailureAction ->
362
            FailureAction
30✔
363
    end.
364

365
-spec message_to_context(emqx_types:message(), _Payload, transformation()) -> eval_context().
366
message_to_context(#message{} = Message, Payload, Transformation) ->
367
    #{
53✔
368
        payload_decoder := PayloadDecoder,
369
        payload_encoder := PayloadEncoder
370
    } = Transformation,
371
    Dirty =
53✔
372
        case PayloadEncoder =:= PayloadDecoder of
373
            true -> #{};
48✔
374
            false -> #{payload => true}
5✔
375
        end,
376
    Flags = emqx_message:get_flags(Message),
53✔
377
    Props = emqx_message:get_header(properties, Message, #{}),
53✔
378
    UserProperties0 = maps:get('User-Property', Props, []),
53✔
379
    UserProperties = maps:from_list(UserProperties0),
53✔
380
    Headers = Message#message.headers,
53✔
381
    Peername =
53✔
382
        case maps:get(peername, Headers, undefined) of
383
            Peername0 when is_tuple(Peername0) ->
384
                iolist_to_binary(emqx_utils:ntoa(Peername0));
53✔
385
            _ ->
386
                undefined
×
387
        end,
388
    Username = maps:get(username, Headers, undefined),
53✔
389
    Timestamp = erlang:system_time(millisecond),
53✔
390
    #{
53✔
391
        dirty => Dirty,
392

393
        client_attrs => emqx_message:get_header(client_attrs, Message, #{}),
394
        clientid => Message#message.from,
395
        flags => Flags,
396
        id => emqx_guid:to_hexstr(Message#message.id),
397
        node => node(),
398
        payload => Payload,
399
        peername => Peername,
400
        pub_props => Props#{'User-Property' => UserProperties},
401
        publish_received_at => Message#message.timestamp,
402
        qos => Message#message.qos,
403
        retain => emqx_message:get_flag(retain, Message, false),
404
        timestamp => Timestamp,
405
        topic => Message#message.topic,
406
        user_property => UserProperties,
407
        username => Username
408
    }.
409

410
-spec context_to_message(emqx_types:message(), eval_context(), transformation()) ->
411
    {ok, emqx_types:message()} | {failure_action(), trace_failure_context()}.
412
context_to_message(Message, Context, Transformation) ->
413
    #{
35✔
414
        failure_action := FailureAction,
415
        payload_encoder := PayloadEncoder
416
    } = Transformation,
417
    #{payload := PayloadOut} = Context,
35✔
418
    case encode(PayloadOut, PayloadEncoder, Transformation) of
35✔
419
        {ok, Payload} ->
420
            {ok, take_from_context(Context#{payload := Payload}, Message)};
30✔
421
        {error, TraceFailureContext} ->
422
            {FailureAction, TraceFailureContext}
5✔
423
    end.
424

425
take_from_context(Context, Message) ->
426
    maps:fold(
30✔
427
        fun
428
            (payload, _, Acc) ->
429
                Acc#message{payload = maps:get(payload, Context)};
19✔
430
            (qos, _, Acc) ->
431
                Acc#message{qos = maps:get(qos, Context)};
7✔
432
            (topic, _, Acc) ->
433
                Acc#message{topic = maps:get(topic, Context)};
17✔
434
            (retain, _, Acc) ->
435
                emqx_message:set_flag(retain, maps:get(retain, Context), Acc);
7✔
436
            (user_property, _, Acc) ->
437
                Props0 = emqx_message:get_header(properties, Acc, #{}),
7✔
438
                UserProperties0 = maps:to_list(maps:get(user_property, Context)),
7✔
439
                UserProperties = lists:keysort(1, UserProperties0),
7✔
440
                Props = maps:merge(Props0, #{'User-Property' => UserProperties}),
7✔
441
                emqx_message:set_header(properties, Props, Acc)
7✔
442
        end,
443
        Message,
444
        maps:get(dirty, Context)
445
    ).
446

447
decode(Payload, #{type := none}, _Transformation) ->
448
    {ok, Payload};
4✔
449
decode(Payload, #{type := json}, Transformation) when is_binary(Payload) ->
450
    case emqx_utils_json:safe_decode(Payload) of
41✔
451
        {ok, JSON} ->
452
            {ok, JSON};
39✔
453
        {error, Reason} ->
454
            TraceFailureContext = #trace_failure_context{
2✔
455
                transformation = Transformation,
456
                tag = payload_decode_failed,
457
                context = #{
458
                    decoder => json,
459
                    reason => Reason
460
                }
461
            },
462
            {error, TraceFailureContext}
2✔
463
    end;
464
decode(Payload, #{type := avro, schema := SerdeName}, Transformation) when is_binary(Payload) ->
465
    try
7✔
466
        {ok, emqx_schema_registry_serde:decode(SerdeName, Payload)}
7✔
467
    catch
468
        error:{serde_not_found, _} ->
469
            TraceFailureContext = #trace_failure_context{
1✔
470
                transformation = Transformation,
471
                tag = payload_decode_schema_not_found,
472
                context = #{
473
                    decoder => avro,
474
                    schema_name => SerdeName
475
                }
476
            },
477
            {error, TraceFailureContext};
1✔
478
        Class:Error:Stacktrace ->
479
            TraceFailureContext = #trace_failure_context{
2✔
480
                transformation = Transformation,
481
                tag = payload_decode_schema_failure,
482
                context = #{
483
                    decoder => avro,
484
                    schema_name => SerdeName,
485
                    kind => Class,
486
                    reason => Error,
487
                    stacktrace => Stacktrace
488
                }
489
            },
490
            {error, TraceFailureContext}
2✔
491
    end;
492
decode(
493
    Payload, #{type := protobuf, schema := SerdeName, message_type := MessageType}, Transformation
494
) ->
495
    try
10✔
496
        {ok, emqx_schema_registry_serde:decode(SerdeName, Payload, [MessageType])}
10✔
497
    catch
498
        error:{serde_not_found, _} ->
499
            TraceFailureContext = #trace_failure_context{
×
500
                transformation = Transformation,
501
                tag = payload_decode_schema_not_found,
502
                context = #{
503
                    decoder => protobuf,
504
                    schema_name => SerdeName,
505
                    message_type => MessageType
506
                }
507
            },
508
            {error, TraceFailureContext};
×
509
        throw:{schema_decode_error, ExtraContext} ->
510
            TraceFailureContext = #trace_failure_context{
1✔
511
                transformation = Transformation,
512
                tag = payload_decode_error,
513
                context = ExtraContext#{
514
                    decoder => protobuf,
515
                    schema_name => SerdeName,
516
                    message_type => MessageType
517
                }
518
            },
519
            {error, TraceFailureContext};
1✔
520
        Class:Error:Stacktrace ->
521
            TraceFailureContext = #trace_failure_context{
4✔
522
                transformation = Transformation,
523
                tag = payload_decode_schema_failure,
524
                context = #{
525
                    decoder => protobuf,
526
                    schema_name => SerdeName,
527
                    message_type => MessageType,
528
                    kind => Class,
529
                    reason => Error,
530
                    stacktrace => Stacktrace
531
                }
532
            },
533
            {error, TraceFailureContext}
4✔
534
    end;
535
decode(Payload, #{type := external_http, schema := SerdeName}, Transformation) when
536
    is_binary(Payload)
537
->
538
    try
1✔
539
        {ok, emqx_schema_registry_serde:decode(SerdeName, Payload)}
1✔
540
    catch
541
        error:{serde_not_found, _} ->
NEW
542
            TraceFailureContext = #trace_failure_context{
×
543
                transformation = Transformation,
544
                tag = payload_decode_schema_not_found,
545
                context = #{
546
                    decoder => external_http,
547
                    schema_name => SerdeName
548
                }
549
            },
NEW
550
            {error, TraceFailureContext};
×
551
        Class:Error:Stacktrace ->
NEW
552
            TraceFailureContext = #trace_failure_context{
×
553
                transformation = Transformation,
554
                tag = payload_decode_schema_failure,
555
                context = #{
556
                    decoder => external_http,
557
                    schema_name => SerdeName,
558
                    kind => Class,
559
                    reason => Error,
560
                    stacktrace => Stacktrace
561
                }
562
            },
NEW
563
            {error, TraceFailureContext}
×
564
    end;
565
decode(NotABinary, #{} = Decoder, Transformation) ->
566
    DecoderContext0 = maps:with([type, name, message_type], Decoder),
1✔
567
    DecoderContext1 = emqx_utils_maps:rename(name, schema_name, DecoderContext0),
1✔
568
    DecoderContext = emqx_utils_maps:rename(type, decoder, DecoderContext1),
1✔
569
    Context =
1✔
570
        maps:merge(
571
            DecoderContext,
572
            #{
573
                reason => <<"payload must be a binary">>,
574
                hint => <<"check the transformation(s) before this one for inconsistencies">>,
575
                bad_payload => NotABinary
576
            }
577
        ),
578
    TraceFailureContext = #trace_failure_context{
1✔
579
        transformation = Transformation,
580
        tag = payload_decode_failed,
581
        context = Context
582
    },
583
    {error, TraceFailureContext}.
1✔
584

585
encode(Payload, #{type := none}, _Transformation) ->
586
    {ok, Payload};
6✔
587
encode(Payload, #{type := json}, Transformation) ->
588
    case emqx_utils_json:safe_encode(Payload) of
19✔
589
        {ok, Bin} ->
590
            {ok, Bin};
19✔
591
        {error, Reason} ->
592
            TraceFailureContext = #trace_failure_context{
×
593
                transformation = Transformation,
594
                tag = payload_encode_failed,
595
                context = #{
596
                    encoder => json,
597
                    reason => Reason
598
                }
599
            },
600
            {error, TraceFailureContext}
×
601
    end;
602
encode(Payload, #{type := avro, schema := SerdeName}, Transformation) ->
603
    try
5✔
604
        {ok, emqx_schema_registry_serde:encode(SerdeName, Payload)}
5✔
605
    catch
606
        error:{serde_not_found, _} ->
607
            TraceFailureContext = #trace_failure_context{
1✔
608
                transformation = Transformation,
609
                tag = payload_encode_schema_not_found,
610
                context = #{
611
                    encoder => avro,
612
                    schema_name => SerdeName
613
                }
614
            },
615
            {error, TraceFailureContext};
1✔
616
        Class:Error:Stacktrace ->
617
            TraceFailureContext = #trace_failure_context{
2✔
618
                transformation = Transformation,
619
                tag = payload_encode_schema_failure,
620
                context = #{
621
                    encoder => avro,
622
                    schema_name => SerdeName,
623
                    kind => Class,
624
                    reason => Error,
625
                    stacktrace => Stacktrace
626
                }
627
            },
628
            {error, TraceFailureContext}
2✔
629
    end;
630
encode(
631
    Payload, #{type := protobuf, schema := SerdeName, message_type := MessageType}, Transformation
632
) ->
633
    try
4✔
634
        {ok, emqx_schema_registry_serde:encode(SerdeName, Payload, [MessageType])}
4✔
635
    catch
636
        error:{serde_not_found, _} ->
637
            TraceFailureContext = #trace_failure_context{
×
638
                transformation = Transformation,
639
                tag = payload_encode_schema_failure,
640
                context = #{
641
                    encoder => protobuf,
642
                    schema_name => SerdeName,
643
                    message_type => MessageType
644
                }
645
            },
646
            {error, TraceFailureContext};
×
647
        Class:Error:Stacktrace ->
648
            TraceFailureContext = #trace_failure_context{
2✔
649
                transformation = Transformation,
650
                tag = payload_encode_schema_failure,
651
                context = #{
652
                    encoder => protobuf,
653
                    schema_name => SerdeName,
654
                    message_type => MessageType,
655
                    kind => Class,
656
                    reason => Error,
657
                    stacktrace => Stacktrace
658
                }
659
            },
660
            {error, TraceFailureContext}
2✔
661
    end;
662
encode(Payload, #{type := external_http, schema := SerdeName}, Transformation) ->
663
    try
1✔
664
        {ok, emqx_schema_registry_serde:encode(SerdeName, Payload)}
1✔
665
    catch
666
        error:{serde_not_found, _} ->
NEW
667
            TraceFailureContext = #trace_failure_context{
×
668
                transformation = Transformation,
669
                tag = payload_encode_schema_not_found,
670
                context = #{
671
                    encoder => external_http,
672
                    schema_name => SerdeName
673
                }
674
            },
NEW
675
            {error, TraceFailureContext};
×
676
        Class:Error:Stacktrace ->
NEW
677
            TraceFailureContext = #trace_failure_context{
×
678
                transformation = Transformation,
679
                tag = payload_encode_schema_failure,
680
                context = #{
681
                    encoder => external_http,
682
                    schema_name => SerdeName,
683
                    kind => Class,
684
                    reason => Error,
685
                    stacktrace => Stacktrace
686
                }
687
            },
NEW
688
            {error, TraceFailureContext}
×
689
    end.
690

691
trace_failure_from_context(
692
    #trace_failure_context{
693
        transformation = Transformation,
694
        tag = Tag,
695
        context = Context
696
    }
697
) ->
698
    trace_failure(Transformation, Tag, Context).
31✔
699

700
%% Internal export for HTTP API.
701
trace_failure_context_to_map(
702
    #trace_failure_context{
703
        tag = Tag,
704
        context = Context
705
    }
706
) ->
707
    Context#{msg => Tag}.
3✔
708

709
trace_failure(#{log_failure := #{level := none}} = Transformation, _Msg, _Meta) ->
710
    #{
2✔
711
        name := _Name,
712
        failure_action := _Action
713
    } = Transformation,
714
    ?tp(message_transformation_failed, _Meta#{log_level => none, name => _Name, message => _Msg}),
2✔
715
    ok;
2✔
716
trace_failure(#{log_failure := #{level := Level}} = Transformation, Msg, Meta0) when is_atom(Msg) ->
717
    #{
61✔
718
        name := Name,
719
        failure_action := Action
720
    } = Transformation,
721
    Meta = maps:merge(#{name => Name}, Meta0),
61✔
722
    ?tp(message_transformation_failed, Meta#{
61✔
723
        log_level => Level, name => Name, action => Action, message => Msg
724
    }),
725
    ?SLOG_THROTTLE(Level, Name, #{msg => Msg, name => Name, action => Action}, Meta#{
61✔
726
        tag => ?TRACE_TAG
727
    }).
×
728

729
run_message_transformation_failed_hook(Message, Transformation) ->
730
    #{name := Name} = Transformation,
31✔
731
    TransformationContext = #{name => Name},
31✔
732
    emqx_hooks:run('message.transformation_failed', [Message, TransformationContext]).
31✔
733

734
is_payload_properly_encoded(#message{payload = Payload}) ->
735
    try iolist_size(Payload) of
21✔
736
        _ ->
737
            true
19✔
738
    catch
739
        error:badarg ->
740
            false
2✔
741
    end.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc