• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 12741272085

13 Jan 2025 05:33AM UTC coverage: 82.355%. First build
12741272085

Pull #14475

github

web-flow
Merge b56877530 into 0fc8025be
Pull Request #14475: refactor(limiter): refactor and simplify the limiter

199 of 249 new or added lines in 21 files covered. (79.92%)

57201 of 69457 relevant lines covered (82.35%)

15186.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.22
/apps/emqx/src/emqx_limiter/src/emqx_limiter_allocator.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2021-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
%% Handle limiters for a zone.
18
%% Currently, each allocator corresponds to one zone,
19
%% if there are thousands of limiters, we can change to using allocator pools
20

21
-module(emqx_limiter_allocator).
22

23
-behaviour(gen_server).
24

25
-include_lib("emqx/include/logger.hrl").
26

27
%% gen_server callbacks
28
-export([
29
    init/1,
30
    handle_call/3,
31
    handle_cast/2,
32
    handle_info/2,
33
    terminate/2
34
]).
35

36
-export([
37
    add_bucket/2, add_bucket/3,
38
    delete_bucket/1, delete_bucket/2
39
]).
40

41
-export([start_link/1]).
42

43
-type bucket() :: #{
44
    name := bucket_name(),
45
    rate := rate(),
46
    burst => rate(),
47
    capacity := capacity(),
48
    counter := counters:counters_ref(),
49
    index := index(),
50
    correction := float()
51
}.
52

53
-type allocator_name() :: emqx_limiter:zone() | binary().
54

55
-type state() :: #{
56
    name := allocator_name(),
57
    buckets := buckets(),
58
    counter := counters:counters_ref(),
59
    index := index(),
60
    alloc_interval := millisecond(),
61
    lasttime := millisecond(),
62
    free_index := [index()]
63
}.
64

65
-type buckets() :: #{bucket_name() => bucket()}.
66
-type bucket_name() :: atom().
67
-type rate() :: number().
68
-type millisecond() :: non_neg_integer().
69
-type capacity() :: number().
70
-type index() :: pos_integer().
71

72
-define(COUNTER_SIZE, 8).
73
-define(NOW, erlang:system_time(millisecond)).
74

75
-export_type([allocator_name/0, state/0, index/0]).
76

77
%%--------------------------------------------------------------------
78
%% API
79
%%--------------------------------------------------------------------
80

81
-spec start_link(allocator_name()) -> _.
82
start_link(Name) ->
83
    gen_server:start_link({via, emqx_limiter_manager, Name}, ?MODULE, [Name], []).
1,179✔
84

85
add_bucket(Name, Cfg) ->
86
    add_bucket(emqx_limiter:internal_allocator(), Name, Cfg).
191✔
87

88
add_bucket(AllocatorName, Name, Cfg) ->
89
    gen_server:call({via, emqx_limiter_manager, AllocatorName}, {?FUNCTION_NAME, Name, Cfg}).
191✔
90

91
delete_bucket(Name) ->
92
    delete_bucket(emqx_limiter:internal_allocator(), Name).
143✔
93

94
delete_bucket(AllocatorName, Name) ->
95
    gen_server:call({via, emqx_limiter_manager, AllocatorName}, {?FUNCTION_NAME, Name}).
143✔
96

97
%%--------------------------------------------------------------------
98
%%% gen_server callbacks
99
%%--------------------------------------------------------------------
100

101
-spec init([allocator_name()]) -> {ok, State :: state()}.
102
init([Name]) ->
103
    State = init_state(Name),
1,179✔
104
    tick_alloc_event(State),
1,179✔
105
    {ok, State}.
1,179✔
106

107
handle_call(
108
    {add_bucket, Name, #{rate := Rate, burst := Burst}},
109
    _From,
110
    #{
111
        name := AllocatorName,
112
        counter := Counter,
113
        index := Index,
114
        alloc_interval := Interval,
115
        buckets := Buckets,
116
        free_index := Free
117
    } = State
118
) ->
119
    case Free of
191✔
120
        [ToUse | Free2] ->
121
            NewIndex = Index;
105✔
122
        [] ->
123
            ToUse = Index + 1,
86✔
124
            NewIndex = ToUse,
86✔
125
            Free2 = Free
86✔
126
    end,
127

128
    case ToUse > ?COUNTER_SIZE of
191✔
129
        false ->
130
            Bucket = do_create_bucket(
191✔
131
                Name, Rate, Burst, AllocatorName, Counter, Interval, ToUse
132
            ),
133

134
            {reply, ok, State#{
191✔
135
                buckets := Buckets#{Name => Bucket},
136
                index := NewIndex,
137
                free_index := Free2
138
            }};
139
        _ ->
NEW
140
            {reply, {error, <<"exceeded_max_index">>}, State}
×
141
    end;
142
handle_call(
143
    {delete_bucket, Name},
144
    _From,
145
    #{name := AllocatorName, buckets := Buckets, free_index := Free} = State
146
) ->
147
    case maps:take(Name, Buckets) of
143✔
148
        error ->
149
            {reply, ok, State};
1✔
150
        {#{index := Index}, Buckets2} ->
151
            emqx_limiter_manager:delete_bucket(AllocatorName, Name),
142✔
152
            {reply, ok, State#{buckets := Buckets2, free_index := [Index | Free]}}
142✔
153
    end;
154
handle_call(Req, _From, State) ->
NEW
155
    ?SLOG(error, #{msg => "emqx_limiter_allocator_unexpected_call", call => Req}),
×
NEW
156
    {reply, ignored, State}.
×
157

158
handle_cast(Req, State) ->
NEW
159
    ?SLOG(error, #{msg => "emqx_limiter_allocator_unexpected_cast", cast => Req}),
×
NEW
160
    {noreply, State}.
×
161

162
handle_info(tick_alloc_event, State) ->
163
    {noreply, do_alloc(State)};
114,153✔
164
handle_info(Info, State) ->
NEW
165
    ?SLOG(error, #{msg => "emqx_limiter_allocator_unexpected_info", info => Info}),
×
NEW
166
    {noreply, State}.
×
167

168
terminate(_Reason, #{name := Name, buckets := Buckets} = _State) ->
NEW
169
    maps:foreach(
×
170
        fun(LimiterName, _) ->
NEW
171
            emqx_limiter_manager:delete_bucket(Name, LimiterName)
×
172
        end,
173
        Buckets
174
    ),
NEW
175
    ok.
×
176

177
%%--------------------------------------------------------------------
178
%%% Internal functions
179
%%--------------------------------------------------------------------
180
tick_alloc_event(#{alloc_interval := Interval}) ->
181
    erlang:send_after(Interval, self(), ?FUNCTION_NAME).
115,332✔
182

183
%% an allocator server with a zone name is used for this zone, or it is a dynamic server
184
init_state(Name) when is_binary(Name) ->
185
    Counter = counters:new(?COUNTER_SIZE, [write_concurrency]),
1,179✔
186
    #{
1,179✔
187
        name => Name,
188
        counter => Counter,
189
        buckets => #{},
190
        index => 0,
191
        alloc_interval => emqx_limiter:default_alloc_interval(),
192
        lasttime => ?NOW,
193
        free_index => []
194
    };
195
init_state(Zone) when is_atom(Zone) ->
NEW
196
    Cfg = emqx_config:get_zone_conf(Zone, [mqtt, limiter]),
×
NEW
197
    init_state(Zone, Cfg).
×
198

199
init_state(Zone, #{alloc_interval := Interval} = Cfg) ->
NEW
200
    Counter = counters:new(?COUNTER_SIZE, [write_concurrency]),
×
NEW
201
    Names = emqx_limiter_schema:mqtt_limiter_names(),
×
NEW
202
    Buckets = init_buckets(Names, Zone, Counter, Cfg, #{}),
×
NEW
203
    #{
×
204
        name => Zone,
205
        counter => Counter,
206
        buckets => Buckets,
207
        index => maps:size(Buckets),
208
        alloc_interval => Interval,
209
        lasttime => ?NOW,
210
        free_index => []
211
    }.
212

213
init_buckets([Name | Names], Zone, Counter, #{alloc_interval := Interval} = Cfg, Buckets) ->
NEW
214
    {ok, RateKey} = emqx_limiter:to_rate_key(Name),
×
NEW
215
    {ok, BurstKey} = emqx_limiter:to_burst_key(Name),
×
NEW
216
    case maps:get(RateKey, Cfg, infinity) of
×
217
        infinity ->
NEW
218
            init_buckets(Names, Zone, Counter, Cfg, Buckets);
×
219
        Rate ->
NEW
220
            Burst = maps:get(BurstKey, Cfg, 0),
×
NEW
221
            Bucket = do_create_bucket(
×
222
                Name, Rate, Burst, Zone, Counter, Interval, maps:size(Buckets) + 1
223
            ),
NEW
224
            init_buckets(Names, Zone, Counter, Cfg, Buckets#{Name => Bucket})
×
225
    end;
226
init_buckets([], _Zone, _Counter, _, Buckets) ->
NEW
227
    Buckets.
×
228

229
%% @doc generate tokens, and then spread to leaf nodes
230
-spec do_alloc(state()) -> state().
231
do_alloc(
232
    #{
233
        lasttime := LastTime,
234
        buckets := Buckets
235
    } = State
236
) ->
237
    tick_alloc_event(State),
114,153✔
238
    Now = ?NOW,
114,153✔
239
    Elapsed = Now - LastTime,
114,153✔
240
    Buckets2 = do_buckets_alloc(Buckets, Elapsed),
114,153✔
241
    State#{
114,153✔
242
        lasttime := Now,
243
        buckets := Buckets2
244
    }.
245

246
do_buckets_alloc(Buckets, Elapsed) ->
247
    maps:map(
114,153✔
248
        fun(_, Bucket) ->
249
            do_bucket_alloc(Bucket, Elapsed)
6,973✔
250
        end,
251
        Buckets
252
    ).
253

254
do_bucket_alloc(
255
    #{
256
        rate := Rate,
257
        correction := Correction,
258
        counter := Counter,
259
        index := Index,
260
        capacity := Capacity
261
    } = Bucket,
262
    Elapsed
263
) ->
264
    Val = counters:get(Counter, Index),
6,973✔
265
    case Val >= Capacity of
6,973✔
266
        true ->
267
            Bucket;
6,874✔
268
        _ ->
269
            Inc = Rate * Elapsed + Correction,
99✔
270
            Inc2 = erlang:floor(Inc),
99✔
271
            Correction2 = Inc - Inc2,
99✔
272
            add_tokens(Bucket, Inc2),
99✔
273
            Bucket#{correction := Correction2}
99✔
274
    end.
275

276
set_tokens(Counter, Ix, Tokens) ->
277
    counters:put(Counter, Ix, erlang:floor(Tokens)).
191✔
278

279
add_tokens(_, 0) ->
280
    ok;
60✔
281
add_tokens(#{counter := Counter, index := Index, capacity := Capacity}, Tokens) ->
282
    Val = counters:get(Counter, Index),
39✔
283
    case erlang:min(Capacity, Val + Tokens) - Val of
39✔
284
        Inc when Inc > 0 ->
285
            counters:put(Counter, Index, Inc + Val);
39✔
286
        _ ->
NEW
287
            ok
×
288
    end.
289

290
do_create_bucket(
291
    LimiterName,
292
    Rate,
293
    Burst,
294
    AllocatorName,
295
    Counter,
296
    Interval,
297
    Index
298
) ->
299
    Capacity = emqx_limiter:calc_capacity(Rate, Interval),
191✔
300

301
    set_tokens(Counter, Index, Capacity),
191✔
302
    Ref = emqx_limiter_bucket_ref:new(Counter, Index),
191✔
303
    emqx_limiter_manager:insert_bucket(AllocatorName, LimiterName, Ref),
191✔
304
    #{
191✔
305
        name => LimiterName,
306
        rate => Rate,
307
        burst => Burst,
308
        capacity => Capacity,
309
        counter => Counter,
310
        index => Index,
311
        correction => 0
312
    }.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc