• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 12741272085

13 Jan 2025 05:33AM UTC coverage: 82.355%. First build
12741272085

Pull #14475

github

web-flow
Merge b56877530 into 0fc8025be
Pull Request #14475: refactor(limiter): refactor and simplify the limiter

199 of 249 new or added lines in 21 files covered. (79.92%)

57201 of 69457 relevant lines covered (82.35%)

15186.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.56
/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2021-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(emqx_limiter_schema).
18

19
-include_lib("typerefl/include/types.hrl").
20
-include_lib("hocon/include/hoconsc.hrl").
21

22
-export([
23
    roots/0,
24
    fields/1,
25
    namespace/0,
26
    desc/1
27
]).
28

29
-export([
30
    to_rate/1,
31
    to_capacity/1,
32
    to_burst/1,
33
    to_burst_rate/1,
34
    to_initial/1,
35
    rate_type/0
36
]).
37

38
-export([
39
    mqtt_limiter_names/0
40
]).
41

42
-define(KILOBYTE, 1024).
43

44
-type rate() :: infinity | number().
45
-type burst_rate() :: number().
46
%% this is a compatible type for the deprecated field and type `capacity`.
47
-type burst() :: burst_rate().
48

49
%% the processing strategy after the failure of the token request
50
-typerefl_from_string({rate/0, ?MODULE, to_rate}).
51
-typerefl_from_string({burst_rate/0, ?MODULE, to_burst_rate}).
52
-typerefl_from_string({burst/0, ?MODULE, to_burst}).
53

54
-reflect_type([
55
    rate/0,
56
    burst_rate/0,
57
    burst/0
58
]).
59

60
%%--------------------------------------------------------------------
61
%% schema
62
%%--------------------------------------------------------------------
63

64
namespace() -> limiter.
730✔
65

66
roots() ->
67
    [].
24✔
68

69
fields(mqtt_with_interval) ->
70
    fields(mqtt) ++
4,761✔
71
        [
72
            {alloc_interval,
73
                ?HOCON(emqx_schema:duration_ms(), #{
74
                    desc => ?DESC(alloc_interval),
75
                    default => emqx_limiter:default_alloc_interval(),
76
                    importance => ?IMPORTANCE_LOW
77
                })}
78
        ];
79
fields(mqtt) ->
80
    lists:foldl(fun make_mqtt_limiters_schema/2, [], mqtt_limiter_names()).
106,178✔
81

82
desc(mqtt_with_interval) ->
83
    ?DESC(mqtt);
4,397✔
84
desc(mqtt) ->
NEW
85
    ?DESC(mqtt);
×
86
desc(_) ->
87
    undefined.
×
88

89
%%--------------------------------------------------------------------
90
%% API
91
%%--------------------------------------------------------------------
92
mqtt_limiter_names() ->
93
    [
106,180✔
94
        max_conn,
95
        messages,
96
        bytes
97
    ].
98

99
%%--------------------------------------------------------------------
100
%% Internal functions
101
%%--------------------------------------------------------------------
102
to_rate(Str) ->
103
    to_rate(Str, true, false).
4,371✔
104

105
to_burst_rate(Str) ->
106
    to_rate(Str, false, true).
×
107

108
%% The default value of `capacity` is `infinity`,
109
%% but we have changed `capacity` to `burst` which should not be `infinity`
110
%% and its default value is 0, so we should convert `infinity` to 0
111
to_burst(Str) ->
112
    case to_rate(Str, true, true) of
×
113
        {ok, infinity} ->
114
            {ok, 0};
×
115
        Any ->
116
            Any
×
117
    end.
118

119
%% rate can be: 10 10MB 10MB/s 10MB/2s infinity
120
%% e.g. the bytes_in regex tree is:
121
%%
122
%%        __ infinity
123
%%        |                 - xMB
124
%%  rate -|                 |
125
%%        __ ?Size(/?Time) -|            - xMB/s
126
%%                          |            |
127
%%                          - xMB/?Time -|
128
%%                                       - xMB/ys
129
to_rate(Str, CanInfinity, CanZero) ->
130
    Regex = "^\s*(?:([0-9]+[a-zA-Z]*)(?:/([0-9]*)([m s h d M S H D]{1,2}))?\s*$)|infinity\s*$",
4,371✔
131
    {ok, MP} = re:compile(Regex),
4,371✔
132
    case re:run(Str, MP, [{capture, all_but_first, list}]) of
4,371✔
133
        {match, []} when CanInfinity ->
134
            {ok, infinity};
105✔
135
        %% if time unit is 1s, it can be omitted
136
        {match, [QuotaStr]} ->
137
            Fun = fun(Quota) ->
37✔
138
                {ok, erlang:float(Quota) / 1000}
33✔
139
            end,
140
            to_capacity(QuotaStr, Str, CanZero, Fun);
37✔
141
        {match, [QuotaStr, TimeVal, TimeUnit]} ->
142
            Interval =
4,217✔
143
                case TimeVal of
144
                    %% for xM/s
145
                    [] -> "1" ++ TimeUnit;
4,199✔
146
                    %% for xM/ys
147
                    _ -> TimeVal ++ TimeUnit
18✔
148
                end,
149
            Fun = fun(Quota) ->
4,217✔
150
                try
4,217✔
151
                    case emqx_schema:to_duration_ms(Interval) of
4,217✔
152
                        {ok, Ms} when Ms > 0 ->
153
                            {ok, erlang:float(Quota) / Ms};
4,217✔
154
                        {ok, 0} when CanZero ->
155
                            {ok, 0};
×
156
                        _ ->
157
                            {error, Str}
×
158
                    end
159
                catch
160
                    _:_ ->
161
                        {error, Str}
×
162
                end
163
            end,
164
            to_capacity(QuotaStr, Str, CanZero, Fun);
4,217✔
165
        _ ->
166
            {error, Str}
12✔
167
    end.
168

169
to_capacity(QuotaStr, Str, CanZero, Fun) ->
170
    case to_capacity(QuotaStr) of
4,254✔
171
        {ok, Val} -> check_capacity(Str, Val, CanZero, Fun);
4,252✔
172
        {error, _Error} -> {error, Str}
2✔
173
    end.
174

175
check_capacity(_Str, 0, true, Cont) ->
176
    %% must check the interval part or maybe will get incorrect config, e.g. "0/0sHello"
177
    Cont(0);
×
178
check_capacity(Str, 0, false, _Cont) ->
179
    {error, Str};
2✔
180
check_capacity(_Str, Quota, _CanZero, Cont) ->
181
    Cont(Quota).
4,250✔
182

183
to_capacity(Str) ->
184
    Regex = "^\s*(?:([0-9]+)([a-zA-Z]*))|infinity\s*$",
4,264✔
185
    to_quota(Str, Regex).
4,264✔
186

187
to_initial(Str) ->
188
    Regex = "^\s*([0-9]+)([a-zA-Z]*)\s*$",
×
189
    to_quota(Str, Regex).
×
190

191
to_quota(Str, Regex) ->
192
    {ok, MP} = re:compile(Regex),
4,264✔
193
    try
4,264✔
194
        Result = re:run(Str, MP, [{capture, all_but_first, list}]),
4,264✔
195
        case Result of
4,264✔
196
            {match, [Quota, Unit]} ->
197
                Val = erlang:list_to_integer(Quota),
4,262✔
198
                Unit2 = string:to_lower(Unit),
4,262✔
199
                apply_unit(Unit2, Val);
4,262✔
200
            {match, [Quota, ""]} ->
201
                {ok, erlang:list_to_integer(Quota)};
×
202
            {match, ""} ->
203
                {ok, infinity};
2✔
204
            _ ->
205
                {error, Str}
×
206
        end
207
    catch
208
        _:Error ->
209
            {error, Error}
×
210
    end.
211

212
apply_unit("", Val) -> {ok, Val};
4,246✔
213
apply_unit("kb", Val) -> {ok, Val * ?KILOBYTE};
4✔
214
apply_unit("mb", Val) -> {ok, Val * ?KILOBYTE * ?KILOBYTE};
8✔
215
apply_unit("gb", Val) -> {ok, Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE};
2✔
216
apply_unit(Unit, _) -> {error, "invalid unit:" ++ Unit}.
2✔
217

218
make_mqtt_limiters_schema(Name, Schemas) ->
219
    NameStr = erlang:atom_to_list(Name),
318,534✔
220
    Rate = erlang:list_to_atom(NameStr ++ "_rate"),
318,534✔
221
    Burst = erlang:list_to_atom(NameStr ++ "_burst"),
318,534✔
222
    [
318,534✔
223
        {Rate,
224
            ?HOCON(rate_type(), #{
225
                desc => ?DESC(Rate),
226
                required => false
227
            })},
228
        {Burst,
229
            ?HOCON(burst_rate_type(), #{
230
                desc => ?DESC(burst),
231
                required => false
232
            })}
233
        | Schemas
234
    ].
235

236
rate_type() ->
237
    typerefl:alias("string", rate()).
324,452✔
238

239
burst_rate_type() ->
240
    typerefl:alias("string", burst_rate()).
318,534✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc