• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 12583614476

02 Jan 2025 02:04PM UTC coverage: 82.082%. First build
12583614476

Pull #14286

github

web-flow
Merge c9df97f1e into d5a56c20b
Pull Request #14286: Implement node-level authentication/authorization cache

305 of 338 new or added lines in 29 files covered. (90.24%)

56808 of 69209 relevant lines covered (82.08%)

15216.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.78
/apps/emqx_auth/src/emqx_auth_cache.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2024-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(emqx_auth_cache).
18

19
-behaviour(gen_server).
20

21
-include_lib("snabbkaffe/include/trace.hrl").
22
-include_lib("stdlib/include/ms_transform.hrl").
23

24
-export([
25
    start_link/3,
26
    child_spec/3,
27
    with_cache/3,
28
    reset/1,
29
    metrics/1
30
]).
31

32
-export([
33
    reset_v1/1,
34
    metrics_v1/1
35
]).
36

37
-export([
38
    init/1,
39
    handle_call/3,
40
    handle_cast/2,
41
    handle_info/2,
42
    terminate/2
43
]).
44

45
-record(cache_record, {
46
    key :: term(),
47
    value :: term(),
48
    deadline :: integer() | '_'
49
}).
50

51
-define(stat_key, stats).
52
-record(stats, {
53
    key :: ?stat_key,
54
    count :: non_neg_integer(),
55
    memory :: non_neg_integer()
56
}).
57

58
-define(pt_key(NAME), {?MODULE, NAME}).
59
-define(unlimited, unlimited).
60

61
-define(DEFAULT_STAT_UPDATE_INTERVAL, 5000).
62
-define(DEFAULT_CLEANUP_INTERVAL, 30000).
63

64
%%--------------------------------------------------------------------
65
%% Metrics
66
%%--------------------------------------------------------------------
67

68
-define(metric_hit, hits).
69
-define(metric_miss, misses).
70
-define(metric_insert, inserts).
71
-define(metric_count, count).
72
-define(metric_memory, memory).
73

74
-define(metric_counters, [?metric_hit, ?metric_miss, ?metric_insert]).
75
-define(metric_gauges, [?metric_count, ?metric_memory]).
76

77
%% For gauges we use only one "virtual" worker
78
-define(worker_id, worker_id).
79

80
%%--------------------------------------------------------------------
81
%% Types
82
%%--------------------------------------------------------------------
83

84
-type cache_key() :: term() | fun(() -> term()).
85
-type name() :: atom().
86
-type config_path() :: emqx_config:runtime_config_key_path().
87
-type callback() :: fun(() -> {cache | nocache, term()}).
88

89
-type metrics_worker() :: emqx_metrics_worker:handler_name().
90

91
-export_type([
92
    name/0,
93
    cache_key/0
94
]).
95

96
%%--------------------------------------------------------------------
97
%% Messages
98
%%--------------------------------------------------------------------
99

100
-record(cleanup, {}).
101
-record(update_stats, {}).
102

103
%%--------------------------------------------------------------------
104
%% API
105
%%--------------------------------------------------------------------
106

107
-spec start_link(name(), config_path(), metrics_worker()) -> {ok, pid()}.
108
start_link(Name, ConfigPath, MetricsWorker) ->
109
    gen_server:start_link(?MODULE, [Name, ConfigPath, MetricsWorker], []).
451✔
110

111
-spec child_spec(name(), config_path(), metrics_worker()) -> supervisor:child_spec().
112
child_spec(Name, ConfigPath, MetricsWorker) ->
113
    #{
442✔
114
        id => {?MODULE, Name},
115
        start => {?MODULE, start_link, [Name, ConfigPath, MetricsWorker]},
116
        restart => permanent,
117
        shutdown => 5000,
118
        type => worker
119
    }.
120

121
-spec with_cache(name(), cache_key(), callback()) -> term().
122
with_cache(Name, Key, Fun) ->
123
    case is_cache_enabled(Name) of
595✔
124
        false ->
125
            with_cache_disabled(Fun);
570✔
126
        {true, PtState} ->
127
            with_cache_enabled(PtState, Key, Fun)
25✔
128
    end.
129

130
-spec reset(name()) -> ok.
131
reset(Name) ->
132
    try
28✔
133
        #{tab := Tab} = persistent_term:get(?pt_key(Name)),
28✔
134
        ets:delete_all_objects(Tab),
28✔
135
        ok
28✔
136
    catch
NEW
137
        error:badarg -> ok
×
138
    end.
139

140
-spec metrics(name()) -> map() | no_return().
141
metrics(Name) ->
142
    try persistent_term:get(?pt_key(Name)) of
7✔
143
        #{metrics_worker := MetricsWorker} ->
144
            RawMetrics = emqx_metrics_worker:get_metrics(MetricsWorker, Name),
7✔
145
            Metrics0 = fold_counters(RawMetrics, ?metric_counters),
7✔
146
            Metrics1 = fold_gauges(RawMetrics, ?metric_gauges),
7✔
147
            maps:merge(Metrics0, Metrics1)
7✔
148
    catch
149
        error:badarg ->
NEW
150
            error({cache_not_found, Name})
×
151
    end.
152

153
%%--------------------------------------------------------------------
154
%% RPC Targets
155
%%--------------------------------------------------------------------
156

157
-spec reset_v1(name()) -> ok.
158
reset_v1(Name) ->
159
    reset(Name).
3✔
160

161
-spec metrics_v1(name()) -> {node(), map()}.
162
metrics_v1(Name) ->
163
    {node(), metrics(Name)}.
5✔
164

165
%%--------------------------------------------------------------------
166
%% gen_server callbacks
167
%%--------------------------------------------------------------------
168

169
init([Name, ConfigPath, MetricsWorker]) ->
170
    Tab = ets:new(emqx_node_cache, [
451✔
171
        public,
172
        ordered_set,
173
        {keypos, #cache_record.key},
174
        {read_concurrency, true},
175
        {write_concurrency, true}
176
    ]),
177
    StatTab = ets:new(emqx_node_cache_tab, [
451✔
178
        public, set, {keypos, #stats.key}, {read_concurrency, true}
179
    ]),
180
    ok = create_metrics(Name, MetricsWorker),
451✔
181
    PtState = #{
451✔
182
        name => Name,
183
        tab => Tab,
184
        stat_tab => StatTab,
185
        config_path => ConfigPath,
186
        metrics_worker => MetricsWorker
187
    },
188
    ok = update_stats(PtState),
451✔
189
    _ = persistent_term:put(?pt_key(Name), PtState),
451✔
190
    _ = erlang:send_after(cleanup_interval(PtState), self(), #cleanup{}),
451✔
191
    _ = erlang:send_after(stat_update_interval(PtState), self(), #update_stats{}),
451✔
192
    {ok, #{name => Name}}.
451✔
193

194
handle_call(Msg, _From, State) ->
NEW
195
    ?tp(warning, auth_cache_unkown_call, #{
×
196
        msg => Msg
197
    }),
NEW
198
    {reply, ok, State}.
×
199

200
handle_cast(Msg, State) ->
NEW
201
    ?tp(warning, auth_cache_unkown_cast, #{
×
202
        msg => Msg
203
    }),
NEW
204
    {noreply, State}.
×
205

206
handle_info(#cleanup{}, State) ->
207
    PtState = pt_state(State),
44✔
208
    ok = cleanup(PtState),
44✔
209
    erlang:send_after(cleanup_interval(PtState), self(), #cleanup{}),
44✔
210
    {noreply, State};
43✔
211
handle_info(#update_stats{}, State) ->
212
    PtState = pt_state(State),
708✔
213
    ok = update_stats(PtState),
708✔
214
    erlang:send_after(stat_update_interval(PtState), self(), #update_stats{}),
708✔
215
    {noreply, State};
708✔
216
handle_info(Msg, State) ->
NEW
217
    ?tp(warning, auth_cache_unkown_info, #{
×
218
        msg => Msg
219
    }),
NEW
220
    {noreply, State}.
×
221

222
terminate(_Reason, #{name := Name}) ->
223
    _ = persistent_term:erase(?pt_key(Name)),
1✔
NEW
224
    ok.
×
225

226
%%--------------------------------------------------------------------
227
%% Internal functions
228
%%--------------------------------------------------------------------
229

230
is_cache_enabled(Name) ->
231
    try persistent_term:get(?pt_key(Name)) of
595✔
232
        #{config_path := ConfigPath} = PtState ->
233
            case config_value(ConfigPath, enable) of
595✔
234
                true -> {true, PtState};
25✔
235
                false -> false
570✔
236
            end
237
    catch
NEW
238
        error:badarg -> false
×
239
    end.
240

241
with_cache_disabled(Fun) ->
242
    dont_cache(Fun()).
570✔
243

244
create_metrics(Name, MetricsWorker) ->
245
    ok = emqx_metrics_worker:create_metrics(
451✔
246
        MetricsWorker, Name, ?metric_counters, ?metric_counters
247
    ).
248

249
with_cache_enabled(#{tab := Tab} = PtState, KeyOrFun, Fun) ->
250
    Key = evaluate_key(KeyOrFun),
25✔
251
    case lookup(Tab, Key) of
25✔
252
        {ok, Value} ->
253
            ok = inc_metric(PtState, ?metric_hit),
7✔
254
            Value;
7✔
255
        not_found ->
256
            ok = inc_metric(PtState, ?metric_miss),
18✔
257
            maybe_cache(PtState, Key, Fun());
18✔
258
        error ->
NEW
259
            dont_cache(Fun())
×
260
    end.
261

262
evaluate_key(Key) when is_function(Key) ->
263
    Key();
1✔
264
evaluate_key(Key) ->
265
    Key.
24✔
266

267
inc_metric(#{name := Name, metrics_worker := MetricsWorker}, Metric) ->
268
    ok = emqx_metrics_worker:inc(MetricsWorker, Name, Metric).
36✔
269

270
set_gauge(#{name := Name, metrics_worker := MetricsWorker}, Metric, Value) ->
271
    ok = emqx_metrics_worker:set_gauge(MetricsWorker, Name, ?worker_id, Metric, Value).
2,318✔
272

273
cleanup(#{name := Name, tab := Tab}) ->
274
    Now = now_ms_monotonic(),
44✔
275
    MS = ets:fun2ms(fun(#cache_record{deadline = Deadline}) when Deadline < Now -> true end),
44✔
276
    NumDeleted = ets:select_delete(Tab, MS),
44✔
277
    ?tp(info, node_cache_cleanup, #{
44✔
278
        name => Name,
279
        num_deleted => NumDeleted
280
    }),
281
    ok.
44✔
282

283
update_stats(#{tab := Tab, stat_tab := StatTab, name := Name} = PtState) ->
284
    #{count := Count, memory := Memory} = tab_stats(Tab),
1,159✔
285
    Stats = #stats{
1,159✔
286
        key = ?stat_key,
287
        count = Count,
288
        memory = Memory
289
    },
290
    ok = set_gauge(PtState, ?metric_count, Count),
1,159✔
291
    ok = set_gauge(PtState, ?metric_memory, Memory),
1,159✔
292
    ?tp(info, update_stats, #{
1,159✔
293
        name => Name,
294
        stats => Stats
295
    }),
296
    _ = ets:insert(StatTab, Stats),
1,159✔
297
    ok.
1,159✔
298

299
deadline(ConfigPath) ->
300
    now_ms_monotonic() + config_value(ConfigPath, cache_ttl).
11✔
301

302
cleanup_interval(#{config_path := ConfigPath}) ->
303
    config_value(ConfigPath, cleanup_interval, ?DEFAULT_CLEANUP_INTERVAL).
495✔
304

305
stat_update_interval(#{config_path := ConfigPath}) ->
306
    config_value(ConfigPath, stat_update_interval, ?DEFAULT_STAT_UPDATE_INTERVAL).
1,159✔
307

308
now_ms_monotonic() ->
309
    erlang:monotonic_time(millisecond).
80✔
310

311
config_value(ConfigPath, Key) ->
312
    maps:get(Key, emqx_config:get(ConfigPath)).
606✔
313

314
config_value(ConfigPath, Key, Default) ->
315
    maps:get(Key, emqx_config:get(ConfigPath), Default).
1,684✔
316

317
lookup(Tab, Key) ->
318
    Now = now_ms_monotonic(),
25✔
319
    try ets:lookup(Tab, Key) of
25✔
320
        [#cache_record{value = Value, deadline = Deadlne}] when Deadlne > Now ->
321
            {ok, Value};
7✔
322
        _ ->
323
            not_found
18✔
324
    catch
NEW
325
        error:badarg -> error
×
326
    end.
327

328
maybe_cache(PtState, Key, {cache, Value}) ->
329
    ok = maybe_insert(PtState, Key, Value),
15✔
330
    Value;
15✔
331
maybe_cache(_PtState, _Key, {nocache, Value}) ->
332
    Value.
3✔
333

334
dont_cache({nocache, Value}) -> Value;
60✔
335
dont_cache({cache, Value}) -> Value.
510✔
336

337
tab_stats(Tab) ->
338
    try
1,159✔
339
        Memory = ets:info(Tab, memory) * erlang:system_info(wordsize),
1,159✔
340
        Count = ets:info(Tab, size),
1,159✔
341
        #{count => Count, memory => Memory}
1,159✔
342
    catch
NEW
343
        error:badarg -> not_found
×
344
    end.
345

346
maybe_insert(#{tab := Tab, stat_tab := StatTab, config_path := ConfigPath} = PtState, Key, Value) ->
347
    LimitsReached = limits_reached(ConfigPath, StatTab),
15✔
348
    ?tp(warning, node_cache_insert, #{
15✔
349
        key => Key,
350
        value => Value,
351
        limits_reached => LimitsReached
352
    }),
353
    case LimitsReached of
15✔
354
        true ->
355
            ok;
4✔
356
        false ->
357
            ok = inc_metric(PtState, ?metric_insert),
11✔
358
            insert(Tab, Key, Value, ConfigPath)
11✔
359
    end.
360

361
insert(Tab, Key, Value, ConfigPath) ->
362
    Record = #cache_record{
11✔
363
        key = Key,
364
        value = Value,
365
        deadline = deadline(ConfigPath)
366
    },
367
    try ets:insert(Tab, Record) of
11✔
368
        true -> ok
11✔
369
    catch
NEW
370
        error:badarg -> ok
×
371
    end.
372

373
limits_reached(ConfigPath, StatTab) ->
374
    MaxCount = config_value(ConfigPath, max_count, ?unlimited),
15✔
375
    MaxMemory = config_value(ConfigPath, max_memory, ?unlimited),
15✔
376
    [#stats{count = Count, memory = Memory}] = ets:lookup(StatTab, ?stat_key),
15✔
377
    ?tp(warning, node_cache_limits, #{
15✔
378
        count => Count,
379
        memory => Memory,
380
        max_count => MaxCount,
381
        max_memory => MaxMemory
382
    }),
383
    case {MaxCount, MaxMemory} of
15✔
384
        {MaxCount, _} when is_integer(MaxCount) andalso Count >= MaxCount -> true;
2✔
385
        {_, MaxMemory} when is_integer(MaxMemory) andalso Memory >= MaxMemory -> true;
2✔
386
        _ -> false
11✔
387
    end.
388

389
pt_state(#{name := Name} = _State) ->
390
    persistent_term:get(?pt_key(Name)).
752✔
391

392
%%--------------------------------------------------------------------
393
%% Metric helpers
394
%%--------------------------------------------------------------------
395

396
fold_counters(RawMetrics, Counters) ->
397
    lists:foldl(
7✔
398
        fun(Counter, Acc) ->
399
            case RawMetrics of
21✔
400
                #{counters := #{Counter := Value}, rate := #{Counter := Rate}} ->
401
                    Acc#{Counter => #{value => Value, rate => Rate}};
21✔
402
                _ ->
NEW
403
                    Acc
×
404
            end
405
        end,
406
        #{},
407
        Counters
408
    ).
409

410
fold_gauges(RawMetrics, Gauges) ->
411
    lists:foldl(
7✔
412
        fun(Gauge, Acc) ->
413
            case RawMetrics of
14✔
414
                #{gauges := #{Gauge := Value}} ->
415
                    Acc#{Gauge => Value};
14✔
416
                _ ->
NEW
417
                    Acc
×
418
            end
419
        end,
420
        #{},
421
        Gauges
422
    ).
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc