• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 12725937429

11 Jan 2025 04:46PM UTC coverage: 82.385%. First build
12725937429

Pull #14286

github

web-flow
Merge 356a7ec02 into 0fc8025be
Pull Request #14286: Implement node-level authentication/authorization cache

321 of 357 new or added lines in 30 files covered. (89.92%)

57681 of 70014 relevant lines covered (82.38%)

15154.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.29
/apps/emqx_auth/src/emqx_auth_cache.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2024-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(emqx_auth_cache).
18

19
-behaviour(gen_server).
20

21
-include_lib("snabbkaffe/include/trace.hrl").
22
-include_lib("stdlib/include/ms_transform.hrl").
23

24
-export([
25
    start_link/3,
26
    child_spec/3,
27
    with_cache/3,
28
    reset/1,
29
    metrics/1
30
]).
31

32
-export([
33
    reset_v1/1,
34
    metrics_v1/1
35
]).
36

37
-export([
38
    init/1,
39
    handle_call/3,
40
    handle_cast/2,
41
    handle_info/2,
42
    terminate/2
43
]).
44

45
-record(cache_record, {
46
    key :: term(),
47
    value :: term(),
48
    expire_at :: integer() | '_'
49
}).
50

51
-define(stat_key, stats).
52
-record(stats, {
53
    key :: ?stat_key,
54
    count :: non_neg_integer(),
55
    memory :: non_neg_integer()
56
}).
57

58
-define(pt_key(NAME), {?MODULE, NAME}).
59
-define(unlimited, unlimited).
60

61
-define(DEFAULT_STAT_UPDATE_INTERVAL, 5000).
62
-define(DEFAULT_CLEANUP_INTERVAL, 30000).
63

64
%%--------------------------------------------------------------------
65
%% Metrics
66
%%--------------------------------------------------------------------
67

68
-define(metric_hit, hits).
69
-define(metric_miss, misses).
70
-define(metric_insert, inserts).
71
-define(metric_count, count).
72
-define(metric_memory, memory).
73

74
-define(metric_counters, [?metric_hit, ?metric_miss, ?metric_insert]).
75
-define(metric_gauges, [?metric_count, ?metric_memory]).
76

77
%% For gauges we use only one "virtual" worker
78
-define(worker_id, worker_id).
79

80
%%--------------------------------------------------------------------
81
%% Types
82
%%--------------------------------------------------------------------
83

84
-type cache_key() :: term() | fun(() -> term()).
85
-type name() :: atom().
86
-type config_path() :: emqx_config:runtime_config_key_path().
87
-type callback() :: fun(() -> {cache | nocache, term()}).
88

89
-type metrics_worker() :: emqx_metrics_worker:handler_name().
90

91
-export_type([
92
    name/0,
93
    cache_key/0
94
]).
95

96
%%--------------------------------------------------------------------
97
%% Messages
98
%%--------------------------------------------------------------------
99

100
-record(cleanup, {}).
101
-record(update_stats, {}).
102

103
%%--------------------------------------------------------------------
104
%% API
105
%%--------------------------------------------------------------------
106

107
-spec start_link(name(), config_path(), metrics_worker()) -> {ok, pid()}.
108
start_link(Name, ConfigPath, MetricsWorker) ->
109
    gen_server:start_link(?MODULE, [Name, ConfigPath, MetricsWorker], []).
455✔
110

111
-spec child_spec(name(), config_path(), metrics_worker()) -> supervisor:child_spec().
112
child_spec(Name, ConfigPath, MetricsWorker) ->
113
    #{
446✔
114
        id => {?MODULE, Name},
115
        start => {?MODULE, start_link, [Name, ConfigPath, MetricsWorker]},
116
        restart => permanent,
117
        shutdown => 5000,
118
        type => worker
119
    }.
120

121
-spec with_cache(name(), cache_key(), callback()) -> term().
122
with_cache(Name, Key, Fun) ->
123
    case is_cache_enabled(Name) of
674✔
124
        false ->
125
            with_cache_disabled(Fun);
585✔
126
        {true, PtState} ->
127
            with_cache_enabled(PtState, Key, Fun)
89✔
128
    end.
129

130
-spec reset(name()) -> ok.
131
reset(Name) ->
132
    try
69✔
133
        #{tab := Tab, metrics_worker := MetricsWorker} = persistent_term:get(?pt_key(Name)),
69✔
134
        ets:delete_all_objects(Tab),
69✔
135
        ok = emqx_metrics_worker:reset_metrics(MetricsWorker, Name),
69✔
136
        ?tp(info, auth_cache_reset, #{name => Name, status => ok}),
69✔
137
        ok
69✔
138
    catch
139
        error:badarg ->
NEW
140
            ?tp(warning, auth_cache_reset, #{name => Name, status => not_found}),
×
NEW
141
            ok
×
142
    end.
143

144
-spec metrics(name()) -> map() | no_return().
145
metrics(Name) ->
146
    try persistent_term:get(?pt_key(Name)) of
41✔
147
        #{metrics_worker := MetricsWorker} ->
148
            RawMetrics = emqx_metrics_worker:get_metrics(MetricsWorker, Name),
41✔
149
            Metrics0 = fold_counters(RawMetrics, ?metric_counters),
41✔
150
            Metrics1 = fold_gauges(RawMetrics, ?metric_gauges),
41✔
151
            maps:merge(Metrics0, Metrics1)
41✔
152
    catch
153
        error:badarg ->
NEW
154
            error({cache_not_found, Name})
×
155
    end.
156

157
%%--------------------------------------------------------------------
158
%% RPC Targets
159
%%--------------------------------------------------------------------
160

161
-spec reset_v1(name()) -> ok.
162
reset_v1(Name) ->
163
    reset(Name).
3✔
164

165
-spec metrics_v1(name()) -> {node(), map()}.
166
metrics_v1(Name) ->
167
    {node(), metrics(Name)}.
5✔
168

169
%%--------------------------------------------------------------------
170
%% gen_server callbacks
171
%%--------------------------------------------------------------------
172

173
init([Name, ConfigPath, MetricsWorker]) ->
174
    Tab = ets:new(emqx_node_cache, [
455✔
175
        public,
176
        ordered_set,
177
        {keypos, #cache_record.key},
178
        {read_concurrency, true},
179
        {write_concurrency, true}
180
    ]),
181
    StatTab = ets:new(emqx_node_cache_tab, [
455✔
182
        public, set, {keypos, #stats.key}, {read_concurrency, true}
183
    ]),
184
    ok = create_metrics(Name, MetricsWorker),
455✔
185
    PtState = #{
455✔
186
        name => Name,
187
        tab => Tab,
188
        stat_tab => StatTab,
189
        config_path => ConfigPath,
190
        metrics_worker => MetricsWorker
191
    },
192
    ok = update_stats(PtState),
455✔
193
    _ = persistent_term:put(?pt_key(Name), PtState),
455✔
194
    _ = erlang:send_after(cleanup_interval(PtState), self(), #cleanup{}),
455✔
195
    _ = erlang:send_after(stat_update_interval(PtState), self(), #update_stats{}),
455✔
196
    {ok, #{name => Name}}.
455✔
197

198
handle_call(Msg, _From, State) ->
NEW
199
    ?tp(warning, auth_cache_unkown_call, #{
×
200
        msg => Msg
201
    }),
NEW
202
    {reply, ok, State}.
×
203

204
handle_cast(Msg, State) ->
NEW
205
    ?tp(warning, auth_cache_unkown_cast, #{
×
206
        msg => Msg
207
    }),
NEW
208
    {noreply, State}.
×
209

210
handle_info(#cleanup{}, State) ->
211
    PtState = pt_state(State),
43✔
212
    ok = cleanup(PtState),
43✔
213
    erlang:send_after(cleanup_interval(PtState), self(), #cleanup{}),
43✔
214
    {noreply, State};
43✔
215
handle_info(#update_stats{}, State) ->
216
    PtState = pt_state(State),
728✔
217
    ok = update_stats(PtState),
728✔
218
    erlang:send_after(stat_update_interval(PtState), self(), #update_stats{}),
728✔
219
    {noreply, State};
728✔
220
handle_info(Msg, State) ->
NEW
221
    ?tp(warning, auth_cache_unkown_info, #{
×
222
        msg => Msg
223
    }),
NEW
224
    {noreply, State}.
×
225

226
terminate(_Reason, #{name := Name}) ->
NEW
227
    _ = persistent_term:erase(?pt_key(Name)),
×
NEW
228
    ok.
×
229

230
%%--------------------------------------------------------------------
231
%% Internal functions
232
%%--------------------------------------------------------------------
233

234
is_cache_enabled(Name) ->
235
    try persistent_term:get(?pt_key(Name)) of
674✔
236
        #{config_path := ConfigPath} = PtState ->
237
            case config_value(ConfigPath, enable) of
674✔
238
                true -> {true, PtState};
89✔
239
                false -> false
585✔
240
            end
241
    catch
NEW
242
        error:badarg -> false
×
243
    end.
244

245
with_cache_disabled(Fun) ->
246
    dont_cache(Fun()).
585✔
247

248
create_metrics(Name, MetricsWorker) ->
249
    ok = emqx_metrics_worker:create_metrics(
455✔
250
        MetricsWorker, Name, ?metric_counters, ?metric_counters
251
    ).
252

253
with_cache_enabled(#{tab := Tab} = PtState, KeyOrFun, Fun) ->
254
    Key = evaluate_key(KeyOrFun),
89✔
255
    case lookup(Tab, Key) of
89✔
256
        {ok, Value} ->
257
            ok = inc_metric(PtState, ?metric_hit),
25✔
258
            Value;
25✔
259
        not_found ->
260
            ok = inc_metric(PtState, ?metric_miss),
64✔
261
            maybe_cache(PtState, Key, Fun());
64✔
262
        error ->
NEW
263
            dont_cache(Fun())
×
264
    end.
265

266
evaluate_key(Key) when is_function(Key) ->
267
    Key();
65✔
268
evaluate_key(Key) ->
269
    Key.
24✔
270

271
inc_metric(#{name := Name, metrics_worker := MetricsWorker}, Metric) ->
272
    ok = emqx_metrics_worker:inc(MetricsWorker, Name, Metric).
140✔
273

274
set_gauge(#{name := Name, metrics_worker := MetricsWorker}, Metric, Value) ->
275
    ok = emqx_metrics_worker:set_gauge(MetricsWorker, Name, ?worker_id, Metric, Value).
2,366✔
276

277
cleanup(#{name := Name, tab := Tab}) ->
278
    Now = now_ms_monotonic(),
43✔
279
    MS = ets:fun2ms(fun(#cache_record{expire_at = ExpireAt}) when ExpireAt < Now -> true end),
43✔
280
    NumDeleted = ets:select_delete(Tab, MS),
43✔
281
    ?tp(info, auth_cache_cleanup, #{
43✔
282
        name => Name,
283
        num_deleted => NumDeleted
284
    }),
285
    ok.
43✔
286

287
update_stats(#{tab := Tab, stat_tab := StatTab, name := Name} = PtState) ->
288
    #{count := Count, memory := Memory} = tab_stats(Tab),
1,183✔
289
    Stats = #stats{
1,183✔
290
        key = ?stat_key,
291
        count = Count,
292
        memory = Memory
293
    },
294
    ok = set_gauge(PtState, ?metric_count, Count),
1,183✔
295
    ok = set_gauge(PtState, ?metric_memory, Memory),
1,183✔
296
    ?tp(info, auth_cache_update_stats, #{
1,183✔
297
        name => Name,
298
        stats => Stats
299
    }),
300
    _ = ets:insert(StatTab, Stats),
1,183✔
301
    ok.
1,183✔
302

303
deadline(ConfigPath) ->
304
    now_ms_monotonic() + config_value(ConfigPath, cache_ttl).
51✔
305

306
cleanup_interval(#{config_path := ConfigPath}) ->
307
    config_value(ConfigPath, cleanup_interval, ?DEFAULT_CLEANUP_INTERVAL).
498✔
308

309
stat_update_interval(#{config_path := ConfigPath}) ->
310
    config_value(ConfigPath, stat_update_interval, ?DEFAULT_STAT_UPDATE_INTERVAL).
1,183✔
311

312
now_ms_monotonic() ->
313
    erlang:monotonic_time(millisecond).
183✔
314

315
config_value(ConfigPath, Key) ->
316
    maps:get(Key, emqx_config:get(ConfigPath)).
725✔
317

318
config_value(ConfigPath, Key, Default) ->
319
    maps:get(Key, emqx_config:get(ConfigPath), Default).
1,791✔
320

321
lookup(Tab, Key) ->
322
    Now = now_ms_monotonic(),
89✔
323
    try ets:lookup(Tab, Key) of
89✔
324
        [#cache_record{value = Value, expire_at = ExpireAt}] when ExpireAt > Now ->
325
            {ok, Value};
25✔
326
        _ ->
327
            not_found
64✔
328
    catch
NEW
329
        error:badarg -> error
×
330
    end.
331

332
maybe_cache(PtState, Key, {cache, Value}) ->
333
    ok = maybe_insert(PtState, Key, Value),
55✔
334
    Value;
55✔
335
maybe_cache(_PtState, _Key, {nocache, Value}) ->
336
    Value.
9✔
337

338
dont_cache({nocache, Value}) -> Value;
62✔
339
dont_cache({cache, Value}) -> Value.
523✔
340

341
tab_stats(Tab) ->
342
    try
1,183✔
343
        Memory = ets:info(Tab, memory) * erlang:system_info(wordsize),
1,183✔
344
        Count = ets:info(Tab, size),
1,183✔
345
        #{count => Count, memory => Memory}
1,183✔
346
    catch
NEW
347
        error:badarg -> not_found
×
348
    end.
349

350
maybe_insert(#{tab := Tab, stat_tab := StatTab, config_path := ConfigPath} = PtState, Key, Value) ->
351
    LimitsReached = limits_reached(ConfigPath, StatTab),
55✔
352
    ?tp(auth_cache_insert, #{
55✔
353
        key => Key,
354
        value => Value,
355
        limits_reached => LimitsReached
356
    }),
357
    case LimitsReached of
55✔
358
        true ->
359
            ok;
4✔
360
        false ->
361
            ok = inc_metric(PtState, ?metric_insert),
51✔
362
            insert(Tab, Key, Value, ConfigPath)
51✔
363
    end.
364

365
insert(Tab, Key, Value, ConfigPath) ->
366
    Record = #cache_record{
51✔
367
        key = Key,
368
        value = Value,
369
        expire_at = deadline(ConfigPath)
370
    },
371
    try ets:insert(Tab, Record) of
51✔
372
        true -> ok
51✔
373
    catch
NEW
374
        error:badarg -> ok
×
375
    end.
376

377
limits_reached(ConfigPath, StatTab) ->
378
    MaxCount = config_value(ConfigPath, max_count, ?unlimited),
55✔
379
    MaxMemory = config_value(ConfigPath, max_memory, ?unlimited),
55✔
380
    [#stats{count = Count, memory = Memory}] = ets:lookup(StatTab, ?stat_key),
55✔
381
    ?tp(auth_cache_limits, #{
55✔
382
        count => Count,
383
        memory => Memory,
384
        max_count => MaxCount,
385
        max_memory => MaxMemory
386
    }),
387
    case {MaxCount, MaxMemory} of
55✔
388
        {MaxCount, _} when is_integer(MaxCount) andalso Count >= MaxCount -> true;
2✔
389
        {_, MaxMemory} when is_integer(MaxMemory) andalso Memory >= MaxMemory -> true;
2✔
390
        _ -> false
51✔
391
    end.
392

393
pt_state(#{name := Name} = _State) ->
394
    persistent_term:get(?pt_key(Name)).
771✔
395

396
%%--------------------------------------------------------------------
397
%% Metric helpers
398
%%--------------------------------------------------------------------
399

400
fold_counters(RawMetrics, Counters) ->
401
    lists:foldl(
41✔
402
        fun(Counter, Acc) ->
403
            case RawMetrics of
123✔
404
                #{counters := #{Counter := Value}, rate := #{Counter := Rate}} ->
405
                    Acc#{Counter => #{value => Value, rate => Rate}};
123✔
406
                _ ->
NEW
407
                    Acc
×
408
            end
409
        end,
410
        #{},
411
        Counters
412
    ).
413

414
fold_gauges(RawMetrics, Gauges) ->
415
    lists:foldl(
41✔
416
        fun(Gauge, Acc) ->
417
            case RawMetrics of
82✔
418
                #{gauges := #{Gauge := Value}} ->
419
                    Acc#{Gauge => Value};
46✔
420
                _ ->
421
                    Acc
36✔
422
            end
423
        end,
424
        #{},
425
        Gauges
426
    ).
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc