• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 12580002997

02 Jan 2025 09:05AM UTC coverage: 82.034%. First build
12580002997

Pull #14475

github

web-flow
Merge 2f431ed61 into d5a56c20b
Pull Request #14475: refactor(limiter): refactor and simplify the limiter

165 of 233 new or added lines in 20 files covered. (70.82%)

56317 of 68651 relevant lines covered (82.03%)

15172.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.5
/apps/emqx/src/emqx_limiter/src/emqx_limiter_allocator.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
%% Handle limiters for a zone.
18
%% Currently, each allocator corresponds to one zone,
19
%% if there are thousands of limiters, we can change to using allocator pools
20

21
-module(emqx_limiter_allocator).
22

23
-behaviour(gen_server).
24

25
-include_lib("emqx/include/logger.hrl").
26

27
%% gen_server callbacks
28
-export([
29
    init/1,
30
    handle_call/3,
31
    handle_cast/2,
32
    handle_info/2,
33
    terminate/2
34
]).
35

36
-export([
37
    add_bucket/2, add_bucket/3,
38
    delete_bucket/1, delete_bucket/2
39
]).
40

41
-export([start_link/1]).
42

43
-type bucket() :: #{
44
    name := bucket_name(),
45
    rate := rate(),
46
    burst => rate(),
47
    capacity := capacity(),
48
    counter := counters:counters_ref(),
49
    index := index(),
50
    correction := float()
51
}.
52

53
-type allocator_name() :: emqx_limiter:zone() | binary().
54

55
-type state() :: #{
56
    name := allocator_name(),
57
    buckets := buckets(),
58
    counter := counters:counters_ref(),
59
    index := index(),
60
    alloc_interval := millisecond(),
61
    lasttime := millisecond()
62
}.
63

64
-type buckets() :: #{bucket_name() => bucket()}.
65
-type bucket_name() :: atom().
66
-type rate() :: number().
67
-type millisecond() :: non_neg_integer().
68
-type capacity() :: number().
69
-type index() :: pos_integer().
70

71
-define(COUNTER_SIZE, 8).
72
-define(NOW, erlang:system_time(millisecond)).
73

74
-export_type([allocator_name/0, state/0, index/0]).
75

76
%%--------------------------------------------------------------------
77
%% API
78
%%--------------------------------------------------------------------
79

80
-spec start_link(allocator_name()) -> _.
81
start_link(Name) ->
82
    gen_server:start_link({via, emqx_limiter_manager, Name}, ?MODULE, [Name], []).
1,140✔
83

84
add_bucket(Name, Cfg) ->
85
    add_bucket(emqx_limiter:internal_allocator(), Name, Cfg).
96✔
86

87
add_bucket(AllocatorName, Name, Cfg) ->
88
    gen_server:call({via, emqx_limiter_manager, AllocatorName}, {?FUNCTION_NAME, Name, Cfg}).
96✔
89

90
delete_bucket(Name) ->
91
    delete_bucket(emqx_limiter:internal_allocator(), Name).
48✔
92

93
delete_bucket(AllocatorName, Name) ->
94
    gen_server:call({via, emqx_limiter_manager, AllocatorName}, {?FUNCTION_NAME, Name}).
48✔
95

96
%%--------------------------------------------------------------------
97
%%% gen_server callbacks
98
%%--------------------------------------------------------------------
99

100
-spec init([allocator_name()]) -> {ok, State :: state()}.
101
init([Name]) ->
102
    State = init_state(Name),
1,140✔
103
    tick_alloc_event(State),
1,140✔
104
    {ok, State}.
1,140✔
105

106
handle_call(
107
    {add_bucket, Name, #{rate := Rate, burst := Burst}},
108
    _From,
109
    #{
110
        name := AllocatorName,
111
        counter := Counter,
112
        index := Index,
113
        alloc_interval := Interval,
114
        buckets := Buckets
115
    } = State
116
) ->
117
    Bucket = do_create_bucket(Name, Rate, Burst, AllocatorName, Counter, Interval, Index),
96✔
118
    {reply, ok, State#{buckets := Buckets#{Name => Bucket}, index := Index + 1}};
96✔
119
handle_call(
120
    {delete_bucket, Name},
121
    _From,
122
    #{name := AllocatorName, buckets := Buckets} = State
123
) ->
124
    emqx_limiter_manager:delete_bucket(AllocatorName, Name),
48✔
125
    {reply, ok, State#{buckets := maps:remove(Name, Buckets)}};
48✔
126
handle_call(Req, _From, State) ->
NEW
127
    ?SLOG(error, #{msg => "unexpected_call", call => Req}),
×
NEW
128
    {reply, ignored, State}.
×
129

130
handle_cast(Req, State) ->
NEW
131
    ?SLOG(error, #{msg => "unexpected_cast", cast => Req}),
×
NEW
132
    {noreply, State}.
×
133

134
handle_info(tick_alloc_event, State) ->
135
    {noreply, do_alloc(State)};
112,936✔
136
handle_info(Info, State) ->
NEW
137
    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
×
NEW
138
    {noreply, State}.
×
139

140
terminate(_Reason, #{name := Name, buckets := Buckets} = _State) ->
NEW
141
    maps:foreach(
×
142
        fun(LimiterName, _) ->
NEW
143
            emqx_limiter_manager:delete_bucket(Name, LimiterName)
×
144
        end,
145
        Buckets
146
    ),
NEW
147
    ok.
×
148

149
%%--------------------------------------------------------------------
150
%%% Internal functions
151
%%--------------------------------------------------------------------
152
tick_alloc_event(#{alloc_interval := Interval}) ->
153
    erlang:send_after(Interval, self(), ?FUNCTION_NAME).
114,076✔
154

155
%% an allocator server with a zone name is used for this zone, or it is a dynamic server
156
init_state(Name) when is_binary(Name) ->
157
    Counter = counters:new(?COUNTER_SIZE, [write_concurrency]),
1,140✔
158
    #{
1,140✔
159
        name => Name,
160
        counter => Counter,
161
        buckets => #{},
162
        index => 0,
163
        alloc_interval => emqx_limiter:default_alloc_interval(),
164
        lasttime => ?NOW
165
    };
166
init_state(Zone) when is_atom(Zone) ->
NEW
167
    Cfg = emqx_config:get_zone_conf(Zone, [mqtt, limiter]),
×
NEW
168
    init_state(Zone, Cfg).
×
169

170
init_state(Zone, #{alloc_interval := Interval} = Cfg) ->
NEW
171
    Counter = counters:new(?COUNTER_SIZE, [write_concurrency]),
×
NEW
172
    Names = emqx_limiter_schema:mqtt_limiter_names(),
×
NEW
173
    Buckets = init_buckets(Names, Zone, Counter, Cfg, #{}),
×
NEW
174
    #{
×
175
        name => Zone,
176
        counter => Counter,
177
        buckets => Buckets,
178
        index => maps:size(Buckets),
179
        alloc_interval => Interval,
180
        lasttime => ?NOW
181
    }.
182

183
init_buckets([Name | Names], Zone, Counter, #{alloc_interval := Interval} = Cfg, Buckets) ->
NEW
184
    NameStr = erlang:atom_to_list(Name),
×
NEW
185
    {ok, RateKey} = emqx_utils:safe_to_existing_atom(NameStr ++ "_rate"),
×
NEW
186
    {ok, BurstKey} = emqx_utils:safe_to_existing_atom(NameStr ++ "_burst"),
×
NEW
187
    case maps:get(RateKey, Cfg, infinity) of
×
188
        infinity ->
NEW
189
            init_buckets(Names, Zone, Counter, Cfg, Buckets);
×
190
        Rate ->
NEW
191
            Burst = maps:get(BurstKey, Cfg, 0),
×
NEW
192
            Bucket = do_create_bucket(
×
193
                Name, Rate, Burst, Zone, Counter, Interval, maps:size(Buckets)
194
            ),
NEW
195
            init_buckets(Names, Zone, Counter, Cfg, Buckets#{Name => Bucket})
×
196
    end;
197
init_buckets([], _Zone, _Counter, _, Buckets) ->
NEW
198
    Buckets.
×
199

200
%% @doc generate tokens, and then spread to leaf nodes
201
-spec do_alloc(state()) -> state().
202
do_alloc(
203
    #{
204
        lasttime := LastTime,
205
        buckets := Buckets
206
    } = State
207
) ->
208
    tick_alloc_event(State),
112,936✔
209
    Now = ?NOW,
112,936✔
210
    Elapsed = Now - LastTime,
112,936✔
211
    Buckets2 = do_buckets_alloc(Buckets, Elapsed),
112,936✔
212
    State#{
112,936✔
213
        lasttime := Now,
214
        buckets := Buckets2
215
    }.
216

217
do_buckets_alloc(Buckets, Elapsed) ->
218
    maps:map(
112,936✔
219
        fun(_, Bucket) ->
220
            do_bucket_alloc(Bucket, Elapsed)
6,640✔
221
        end,
222
        Buckets
223
    ).
224

225
do_bucket_alloc(#{rate := Rate, correction := Correction} = Bucket, Elapsed) ->
226
    Inc = Rate * Elapsed + Correction,
6,640✔
227
    Inc2 = erlang:floor(Inc),
6,640✔
228
    Correction2 = Inc - Inc2,
6,640✔
229
    add_tokens(Bucket, Inc2),
6,640✔
230
    Bucket#{correction := Correction2}.
6,640✔
231

232
set_tokens(Counter, Ix, Tokens) ->
233
    counters:put(Counter, Ix, erlang:floor(Tokens)).
96✔
234

235
add_tokens(_, 0) ->
236
    ok;
152✔
237
add_tokens(#{counter := Counter, index := Index, capacity := Capacity}, Tokens) ->
238
    Val = counters:get(Counter, Index),
6,488✔
239
    case erlang:min(Capacity, Val + Tokens) - Val of
6,488✔
240
        Inc when Inc > 0 ->
241
            counters:add(Counter, Index, Inc);
52✔
242
        _ ->
243
            ok
6,436✔
244
    end.
245

246
do_create_bucket(
247
    LimiterName,
248
    Rate,
249
    Burst,
250
    AllocatorName,
251
    Counter,
252
    Interval,
253
    Index0
254
) ->
255
    Index = Index0 + 1,
96✔
256
    Capacity = emqx_limiter:calc_capacity(Rate, Interval),
96✔
257

258
    set_tokens(Counter, Index, Capacity),
96✔
259
    Ref = emqx_limiter_bucket_ref:new(Counter, Index),
96✔
260
    emqx_limiter_manager:insert_bucket(AllocatorName, LimiterName, Ref),
96✔
261
    #{
96✔
262
        name => LimiterName,
263
        rate => Rate,
264
        burst => Burst,
265
        capacity => Capacity,
266
        counter => Counter,
267
        index => Index,
268
        correction => 0
269
    }.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc