• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 13023435557

29 Jan 2025 02:24AM UTC coverage: 82.621%. First build
13023435557

Pull #14623

github

web-flow
Merge 3d53c9549 into 02c116efb
Pull Request #14623: Sync release-58

158 of 186 new or added lines in 14 files covered. (84.95%)

58190 of 70430 relevant lines covered (82.62%)

15367.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.67
/apps/emqx_schema_validation/src/emqx_schema_validation.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2024-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%--------------------------------------------------------------------
4
-module(emqx_schema_validation).
5

6
-feature(maybe_expr, enable).
7

8
-include_lib("snabbkaffe/include/trace.hrl").
9
-include_lib("emqx_utils/include/emqx_message.hrl").
10
-include_lib("emqx/include/emqx_hooks.hrl").
11
-include_lib("emqx/include/logger.hrl").
12

13
%% API
14
-export([
15
    list/0,
16
    reorder/1,
17
    lookup/1,
18
    insert/1,
19
    update/1,
20
    delete/1
21
]).
22

23
%% `emqx_hooks' API
24
-export([
25
    register_hooks/0,
26
    unregister_hooks/0,
27

28
    on_message_publish/1
29
]).
30

31
%% Internal exports
32
-export([parse_sql_check/1]).
33

34
%% Internal functions; exported for tests
35
-export([
36
    evaluate_sql_check/3
37
]).
38

39
%%------------------------------------------------------------------------------
40
%% Type declarations
41
%%------------------------------------------------------------------------------
42

43
-define(TRACE_TAG, "SCHEMA_VALIDATION").
44

45
-type validation_name() :: binary().
46
-type raw_validation() :: #{binary() => _}.
47
-type validation() :: #{
48
    name := validation_name(),
49
    strategy := all_pass | any_pass,
50
    failure_action := drop | disconnect | ignore,
51
    log_failure := #{level := error | warning | notice | info | debug | none}
52
}.
53

54
-export_type([
55
    validation/0,
56
    validation_name/0
57
]).
58

59
%%------------------------------------------------------------------------------
60
%% API
61
%%------------------------------------------------------------------------------
62

63
-spec list() -> [validation()].
64
list() ->
65
    emqx_schema_validation_config:list().
37✔
66

67
-spec reorder([validation_name()]) ->
68
    {ok, _} | {error, _}.
69
reorder(Order) ->
70
    emqx_schema_validation_config:reorder(Order).
9✔
71

72
-spec lookup(validation_name()) -> {ok, validation()} | {error, not_found}.
73
lookup(Name) ->
74
    emqx_schema_validation_config:lookup(Name).
229✔
75

76
-spec insert(raw_validation()) ->
77
    {ok, _} | {error, _}.
78
insert(Validation) ->
79
    emqx_schema_validation_config:insert(Validation).
86✔
80

81
-spec update(raw_validation()) ->
82
    {ok, _} | {error, _}.
83
update(Validation) ->
84
    emqx_schema_validation_config:update(Validation).
22✔
85

86
-spec delete(validation_name()) ->
87
    {ok, _} | {error, _}.
88
delete(Name) ->
89
    emqx_schema_validation_config:delete(Name).
83✔
90

91
%%------------------------------------------------------------------------------
92
%% Hooks
93
%%------------------------------------------------------------------------------
94

95
-spec register_hooks() -> ok.
96
register_hooks() ->
97
    emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_SCHEMA_VALIDATION).
3✔
98

99
-spec unregister_hooks() -> ok.
100
unregister_hooks() ->
101
    emqx_hooks:del('message.publish', {?MODULE, on_message_publish}).
1✔
102

103
-spec on_message_publish(emqx_types:message()) ->
104
    {ok, emqx_types:message()} | {stop, emqx_types:message()}.
105
on_message_publish(Message = #message{topic = Topic, headers = Headers}) ->
106
    case emqx_schema_validation_registry:matching_validations(Topic) of
369✔
107
        [] ->
108
            ok;
314✔
109
        Validations ->
110
            case run_validations(Validations, Message) of
54✔
111
                ok ->
112
                    emqx_metrics:inc('messages.validation_succeeded'),
19✔
113
                    {ok, Message};
19✔
114
                drop ->
115
                    emqx_metrics:inc('messages.validation_failed'),
27✔
116
                    {stop, Message#message{headers = Headers#{allow_publish => false}}};
27✔
117
                disconnect ->
118
                    emqx_metrics:inc('messages.validation_failed'),
8✔
119
                    {stop, Message#message{
8✔
120
                        headers = Headers#{
121
                            allow_publish => false,
122
                            should_disconnect => true
123
                        }
124
                    }}
125
            end
126
    end.
127

128
%%------------------------------------------------------------------------------
129
%% Internal exports
130
%%------------------------------------------------------------------------------
131

132
parse_sql_check(SQL) ->
133
    case emqx_rule_sqlparser:parse(SQL, #{with_from => false}) of
2,866✔
134
        {ok, Select} ->
135
            case emqx_rule_sqlparser:select_is_foreach(Select) of
2,865✔
136
                true ->
137
                    {error, foreach_not_allowed};
1✔
138
                false ->
139
                    Check = #{
2,864✔
140
                        type => sql,
141
                        fields => emqx_rule_sqlparser:select_fields(Select),
142
                        conditions => emqx_rule_sqlparser:select_where(Select)
143
                    },
144
                    {ok, Check}
2,864✔
145
            end;
146
        Error = {error, _} ->
147
            Error
1✔
148
    end.
149

150
%%------------------------------------------------------------------------------
151
%% Internal functions
152
%%------------------------------------------------------------------------------
153

154
evaluate_sql_check(Check, Validation, Message) ->
155
    #{
58✔
156
        fields := Fields,
157
        conditions := Conditions
158
    } = Check,
159
    #{name := Name} = Validation,
58✔
160
    {Data, _} = emqx_rule_events:eventmsg_publish(Message),
58✔
161
    try emqx_rule_runtime:evaluate_select(Fields, Data, Conditions) of
58✔
162
        {ok, _} ->
163
            true;
24✔
164
        false ->
165
            false
34✔
166
    catch
167
        throw:Reason ->
NEW
168
            trace_failure(Validation, validation_sql_check_throw, #{
×
169
                validation => Name,
170
                reason => Reason
171
            }),
172
            false;
×
173
        Class:Error:Stacktrace ->
NEW
174
            trace_failure(Validation, validation_sql_check_failure, #{
×
175
                validation => Name,
176
                kind => Class,
177
                reason => Error,
178
                stacktrace => Stacktrace
179
            }),
180
            false
×
181
    end.
182

183
evaluate_schema_check(Check, Validation, #message{payload = Data}) ->
184
    #{schema := SerdeName} = Check,
14✔
185
    #{name := Name} = Validation,
14✔
186
    ExtraArgs =
14✔
187
        case Check of
188
            #{type := protobuf, message_type := MessageType} ->
189
                [MessageType];
6✔
190
            _ ->
191
                []
8✔
192
        end,
193
    try
14✔
194
        emqx_schema_registry_serde:schema_check(SerdeName, Data, ExtraArgs)
14✔
195
    catch
196
        error:{serde_not_found, _} ->
NEW
197
            trace_failure(Validation, validation_schema_check_schema_not_found, #{
×
198
                validation => Name,
199
                schema_name => SerdeName
200
            }),
201
            false;
×
202
        Class:Error:Stacktrace ->
203
            trace_failure(Validation, validation_schema_check_failure, #{
2✔
204
                validation => Name,
205
                schema_name => SerdeName,
206
                kind => Class,
207
                reason => Error,
208
                stacktrace => Stacktrace
209
            }),
210
            false
2✔
211
    end.
212

213
run_validations(Validations, Message) ->
214
    try
54✔
215
        emqx_rule_runtime:clear_rule_payload(),
54✔
216
        Fun = fun(Validation, Acc) ->
54✔
217
            #{name := Name} = Validation,
60✔
218
            emqx_schema_validation_registry:inc_matched(Name),
60✔
219
            case run_validation(Validation, Message) of
60✔
220
                ok ->
221
                    emqx_schema_validation_registry:inc_succeeded(Name),
24✔
222
                    {cont, Acc};
24✔
223
                ignore ->
224
                    trace_failure(Validation, validation_failed, #{
1✔
225
                        validation => Name,
226
                        action => ignore
227
                    }),
228
                    emqx_schema_validation_registry:inc_failed(Name),
1✔
229
                    run_schema_validation_failed_hook(Message, Validation),
1✔
230
                    {cont, Acc};
1✔
231
                FailureAction ->
232
                    trace_failure(Validation, validation_failed, #{
35✔
233
                        validation => Name,
234
                        action => FailureAction
235
                    }),
236
                    emqx_schema_validation_registry:inc_failed(Name),
35✔
237
                    run_schema_validation_failed_hook(Message, Validation),
35✔
238
                    {halt, FailureAction}
35✔
239
            end
240
        end,
241
        emqx_utils:foldl_while(Fun, _Passed = ok, Validations)
54✔
242
    after
243
        emqx_rule_runtime:clear_rule_payload()
54✔
244
    end.
245

246
run_validation(#{strategy := all_pass} = Validation, Message) ->
247
    #{
57✔
248
        checks := Checks,
249
        failure_action := FailureAction
250
    } = Validation,
251
    Fun = fun(Check, Acc) ->
57✔
252
        case run_check(Check, Validation, Message) of
59✔
253
            true -> {cont, Acc};
24✔
254
            false -> {halt, FailureAction}
35✔
255
        end
256
    end,
257
    emqx_utils:foldl_while(Fun, _Passed = ok, Checks);
57✔
258
run_validation(#{strategy := any_pass} = Validation, Message) ->
259
    #{
3✔
260
        checks := Checks,
261
        failure_action := FailureAction
262
    } = Validation,
263
    case lists:any(fun(C) -> run_check(C, Validation, Message) end, Checks) of
3✔
264
        true ->
265
            ok;
2✔
266
        false ->
267
            FailureAction
1✔
268
    end.
269

270
run_check(#{type := sql} = Check, Validation, Message) ->
271
    evaluate_sql_check(Check, Validation, Message);
49✔
272
run_check(Check, Validation, Message) ->
273
    evaluate_schema_check(Check, Validation, Message).
14✔
274

275
trace_failure(#{log_failure := #{level := none}} = Validation, _Msg, _Meta) ->
276
    #{
1✔
277
        name := _Name,
278
        failure_action := _Action
279
    } = Validation,
280
    ?tp(schema_validation_failed, #{log_level => none, name => _Name, action => _Action}),
1✔
281
    ok;
1✔
282
trace_failure(#{log_failure := #{level := Level}} = Validation, Msg, Meta) when is_atom(Msg) ->
283
    #{
37✔
284
        name := Name,
285
        failure_action := Action
286
    } = Validation,
287
    ?tp(schema_validation_failed, #{log_level => Level, name => Name, action => Action}),
37✔
288
    ?SLOG_THROTTLE(Level, Name, #{msg => Msg, name => Name, action => Action}, Meta#{
37✔
289
        tag => ?TRACE_TAG
NEW
290
    }).
×
291

292
run_schema_validation_failed_hook(Message, Validation) ->
293
    #{name := Name} = Validation,
36✔
294
    ValidationContext = #{name => Name},
36✔
295
    emqx_hooks:run('schema.validation_failed', [Message, ValidationContext]).
36✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc