• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 12589698593

02 Jan 2025 10:32PM UTC coverage: 82.088%. First build
12589698593

Pull #14286

github

web-flow
Merge 7dc4c3213 into d5a56c20b
Pull Request #14286: Implement node-level authentication/authorization cache

304 of 338 new or added lines in 29 files covered. (89.94%)

56812 of 69209 relevant lines covered (82.09%)

15271.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.95
/apps/emqx_auth/src/emqx_auth_cache.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2024-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(emqx_auth_cache).
18

19
-behaviour(gen_server).
20

21
-include_lib("snabbkaffe/include/trace.hrl").
22
-include_lib("stdlib/include/ms_transform.hrl").
23

24
-export([
25
    start_link/3,
26
    child_spec/3,
27
    with_cache/3,
28
    reset/1,
29
    metrics/1
30
]).
31

32
-export([
33
    reset_v1/1,
34
    metrics_v1/1
35
]).
36

37
-export([
38
    init/1,
39
    handle_call/3,
40
    handle_cast/2,
41
    handle_info/2,
42
    terminate/2
43
]).
44

45
-record(cache_record, {
46
    key :: term(),
47
    value :: term(),
48
    deadline :: integer() | '_'
49
}).
50

51
-define(stat_key, stats).
52
-record(stats, {
53
    key :: ?stat_key,
54
    count :: non_neg_integer(),
55
    memory :: non_neg_integer()
56
}).
57

58
-define(pt_key(NAME), {?MODULE, NAME}).
59
-define(unlimited, unlimited).
60

61
-define(DEFAULT_STAT_UPDATE_INTERVAL, 5000).
62
-define(DEFAULT_CLEANUP_INTERVAL, 30000).
63

64
%%--------------------------------------------------------------------
65
%% Metrics
66
%%--------------------------------------------------------------------
67

68
-define(metric_hit, hits).
69
-define(metric_miss, misses).
70
-define(metric_insert, inserts).
71
-define(metric_count, count).
72
-define(metric_memory, memory).
73

74
-define(metric_counters, [?metric_hit, ?metric_miss, ?metric_insert]).
75
-define(metric_gauges, [?metric_count, ?metric_memory]).
76

77
%% For gauges we use only one "virtual" worker
78
-define(worker_id, worker_id).
79

80
%%--------------------------------------------------------------------
81
%% Types
82
%%--------------------------------------------------------------------
83

84
-type cache_key() :: term() | fun(() -> term()).
85
-type name() :: atom().
86
-type config_path() :: emqx_config:runtime_config_key_path().
87
-type callback() :: fun(() -> {cache | nocache, term()}).
88

89
-type metrics_worker() :: emqx_metrics_worker:handler_name().
90

91
-export_type([
92
    name/0,
93
    cache_key/0
94
]).
95

96
%%--------------------------------------------------------------------
97
%% Messages
98
%%--------------------------------------------------------------------
99

100
-record(cleanup, {}).
101
-record(update_stats, {}).
102

103
%%--------------------------------------------------------------------
104
%% API
105
%%--------------------------------------------------------------------
106

107
-spec start_link(name(), config_path(), metrics_worker()) -> {ok, pid()}.
108
start_link(Name, ConfigPath, MetricsWorker) ->
109
    gen_server:start_link(?MODULE, [Name, ConfigPath, MetricsWorker], []).
451✔
110

111
-spec child_spec(name(), config_path(), metrics_worker()) -> supervisor:child_spec().
112
child_spec(Name, ConfigPath, MetricsWorker) ->
113
    #{
442✔
114
        id => {?MODULE, Name},
115
        start => {?MODULE, start_link, [Name, ConfigPath, MetricsWorker]},
116
        restart => permanent,
117
        shutdown => 5000,
118
        type => worker
119
    }.
120

121
-spec with_cache(name(), cache_key(), callback()) -> term().
122
with_cache(Name, Key, Fun) ->
123
    case is_cache_enabled(Name) of
639✔
124
        false ->
125
            with_cache_disabled(Fun);
568✔
126
        {true, PtState} ->
127
            with_cache_enabled(PtState, Key, Fun)
71✔
128
    end.
129

130
-spec reset(name()) -> ok.
131
reset(Name) ->
132
    try
66✔
133
        #{tab := Tab} = persistent_term:get(?pt_key(Name)),
66✔
134
        ets:delete_all_objects(Tab),
66✔
135
        ok
66✔
136
    catch
NEW
137
        error:badarg -> ok
×
138
    end.
139

140
-spec metrics(name()) -> map() | no_return().
141
metrics(Name) ->
142
    try persistent_term:get(?pt_key(Name)) of
31✔
143
        #{metrics_worker := MetricsWorker} ->
144
            RawMetrics = emqx_metrics_worker:get_metrics(MetricsWorker, Name),
31✔
145
            Metrics0 = fold_counters(RawMetrics, ?metric_counters),
31✔
146
            Metrics1 = fold_gauges(RawMetrics, ?metric_gauges),
31✔
147
            maps:merge(Metrics0, Metrics1)
31✔
148
    catch
149
        error:badarg ->
NEW
150
            error({cache_not_found, Name})
×
151
    end.
152

153
%%--------------------------------------------------------------------
154
%% RPC Targets
155
%%--------------------------------------------------------------------
156

157
-spec reset_v1(name()) -> ok.
158
reset_v1(Name) ->
159
    reset(Name).
3✔
160

161
-spec metrics_v1(name()) -> {node(), map()}.
162
metrics_v1(Name) ->
163
    {node(), metrics(Name)}.
5✔
164

165
%%--------------------------------------------------------------------
166
%% gen_server callbacks
167
%%--------------------------------------------------------------------
168

169
init([Name, ConfigPath, MetricsWorker]) ->
170
    Tab = ets:new(emqx_node_cache, [
451✔
171
        public,
172
        ordered_set,
173
        {keypos, #cache_record.key},
174
        {read_concurrency, true},
175
        {write_concurrency, true}
176
    ]),
177
    StatTab = ets:new(emqx_node_cache_tab, [
451✔
178
        public, set, {keypos, #stats.key}, {read_concurrency, true}
179
    ]),
180
    ok = create_metrics(Name, MetricsWorker),
451✔
181
    PtState = #{
451✔
182
        name => Name,
183
        tab => Tab,
184
        stat_tab => StatTab,
185
        config_path => ConfigPath,
186
        metrics_worker => MetricsWorker
187
    },
188
    ok = update_stats(PtState),
451✔
189
    _ = persistent_term:put(?pt_key(Name), PtState),
451✔
190
    _ = erlang:send_after(cleanup_interval(PtState), self(), #cleanup{}),
451✔
191
    _ = erlang:send_after(stat_update_interval(PtState), self(), #update_stats{}),
451✔
192
    {ok, #{name => Name}}.
451✔
193

194
handle_call(Msg, _From, State) ->
NEW
195
    ?tp(warning, auth_cache_unkown_call, #{
×
196
        msg => Msg
197
    }),
NEW
198
    {reply, ok, State}.
×
199

200
handle_cast(Msg, State) ->
NEW
201
    ?tp(warning, auth_cache_unkown_cast, #{
×
202
        msg => Msg
203
    }),
NEW
204
    {noreply, State}.
×
205

206
handle_info(#cleanup{}, State) ->
207
    PtState = pt_state(State),
44✔
208
    ok = cleanup(PtState),
44✔
209
    erlang:send_after(cleanup_interval(PtState), self(), #cleanup{}),
44✔
210
    {noreply, State};
44✔
211
handle_info(#update_stats{}, State) ->
212
    PtState = pt_state(State),
721✔
213
    ok = update_stats(PtState),
721✔
214
    erlang:send_after(stat_update_interval(PtState), self(), #update_stats{}),
721✔
215
    {noreply, State};
721✔
216
handle_info(Msg, State) ->
NEW
217
    ?tp(warning, auth_cache_unkown_info, #{
×
218
        msg => Msg
219
    }),
NEW
220
    {noreply, State}.
×
221

222
terminate(_Reason, #{name := Name}) ->
NEW
223
    _ = persistent_term:erase(?pt_key(Name)),
×
NEW
224
    ok.
×
225

226
%%--------------------------------------------------------------------
227
%% Internal functions
228
%%--------------------------------------------------------------------
229

230
is_cache_enabled(Name) ->
231
    try persistent_term:get(?pt_key(Name)) of
639✔
232
        #{config_path := ConfigPath} = PtState ->
233
            case config_value(ConfigPath, enable) of
639✔
234
                true -> {true, PtState};
71✔
235
                false -> false
568✔
236
            end
237
    catch
NEW
238
        error:badarg -> false
×
239
    end.
240

241
with_cache_disabled(Fun) ->
242
    dont_cache(Fun()).
568✔
243

244
create_metrics(Name, MetricsWorker) ->
245
    ok = emqx_metrics_worker:create_metrics(
451✔
246
        MetricsWorker, Name, ?metric_counters, ?metric_counters
247
    ).
248

249
with_cache_enabled(#{tab := Tab} = PtState, KeyOrFun, Fun) ->
250
    Key = evaluate_key(KeyOrFun),
71✔
251
    case lookup(Tab, Key) of
71✔
252
        {ok, Value} ->
253
            ok = inc_metric(PtState, ?metric_hit),
19✔
254
            Value;
19✔
255
        not_found ->
256
            ok = inc_metric(PtState, ?metric_miss),
52✔
257
            maybe_cache(PtState, Key, Fun());
52✔
258
        error ->
NEW
259
            dont_cache(Fun())
×
260
    end.
261

262
evaluate_key(Key) when is_function(Key) ->
263
    Key();
47✔
264
evaluate_key(Key) ->
265
    Key.
24✔
266

267
inc_metric(#{name := Name, metrics_worker := MetricsWorker}, Metric) ->
268
    ok = emqx_metrics_worker:inc(MetricsWorker, Name, Metric).
110✔
269

270
set_gauge(#{name := Name, metrics_worker := MetricsWorker}, Metric, Value) ->
271
    ok = emqx_metrics_worker:set_gauge(MetricsWorker, Name, ?worker_id, Metric, Value).
2,344✔
272

273
cleanup(#{name := Name, tab := Tab}) ->
274
    Now = now_ms_monotonic(),
44✔
275
    MS = ets:fun2ms(fun(#cache_record{deadline = Deadline}) when Deadline < Now -> true end),
44✔
276
    NumDeleted = ets:select_delete(Tab, MS),
44✔
277
    ?tp(info, node_cache_cleanup, #{
44✔
278
        name => Name,
279
        num_deleted => NumDeleted
280
    }),
281
    ok.
44✔
282

283
update_stats(#{tab := Tab, stat_tab := StatTab, name := Name} = PtState) ->
284
    #{count := Count, memory := Memory} = tab_stats(Tab),
1,172✔
285
    Stats = #stats{
1,172✔
286
        key = ?stat_key,
287
        count = Count,
288
        memory = Memory
289
    },
290
    ok = set_gauge(PtState, ?metric_count, Count),
1,172✔
291
    ok = set_gauge(PtState, ?metric_memory, Memory),
1,172✔
292
    ?tp(info, update_stats, #{
1,172✔
293
        name => Name,
294
        stats => Stats
295
    }),
296
    _ = ets:insert(StatTab, Stats),
1,172✔
297
    ok.
1,172✔
298

299
deadline(ConfigPath) ->
300
    now_ms_monotonic() + config_value(ConfigPath, cache_ttl).
39✔
301

302
cleanup_interval(#{config_path := ConfigPath}) ->
303
    config_value(ConfigPath, cleanup_interval, ?DEFAULT_CLEANUP_INTERVAL).
495✔
304

305
stat_update_interval(#{config_path := ConfigPath}) ->
306
    config_value(ConfigPath, stat_update_interval, ?DEFAULT_STAT_UPDATE_INTERVAL).
1,172✔
307

308
now_ms_monotonic() ->
309
    erlang:monotonic_time(millisecond).
154✔
310

311
config_value(ConfigPath, Key) ->
312
    maps:get(Key, emqx_config:get(ConfigPath)).
678✔
313

314
config_value(ConfigPath, Key, Default) ->
315
    maps:get(Key, emqx_config:get(ConfigPath), Default).
1,753✔
316

317
lookup(Tab, Key) ->
318
    Now = now_ms_monotonic(),
71✔
319
    try ets:lookup(Tab, Key) of
71✔
320
        [#cache_record{value = Value, deadline = Deadlne}] when Deadlne > Now ->
321
            {ok, Value};
19✔
322
        _ ->
323
            not_found
52✔
324
    catch
NEW
325
        error:badarg -> error
×
326
    end.
327

328
maybe_cache(PtState, Key, {cache, Value}) ->
329
    ok = maybe_insert(PtState, Key, Value),
43✔
330
    Value;
43✔
331
maybe_cache(_PtState, _Key, {nocache, Value}) ->
332
    Value.
9✔
333

334
dont_cache({nocache, Value}) -> Value;
60✔
335
dont_cache({cache, Value}) -> Value.
508✔
336

337
tab_stats(Tab) ->
338
    try
1,172✔
339
        Memory = ets:info(Tab, memory) * erlang:system_info(wordsize),
1,172✔
340
        Count = ets:info(Tab, size),
1,172✔
341
        #{count => Count, memory => Memory}
1,172✔
342
    catch
NEW
343
        error:badarg -> not_found
×
344
    end.
345

346
maybe_insert(#{tab := Tab, stat_tab := StatTab, config_path := ConfigPath} = PtState, Key, Value) ->
347
    LimitsReached = limits_reached(ConfigPath, StatTab),
43✔
348
    ?tp(warning, node_cache_insert, #{
43✔
349
        key => Key,
350
        value => Value,
351
        limits_reached => LimitsReached
352
    }),
353
    case LimitsReached of
43✔
354
        true ->
355
            ok;
4✔
356
        false ->
357
            ok = inc_metric(PtState, ?metric_insert),
39✔
358
            insert(Tab, Key, Value, ConfigPath)
39✔
359
    end.
360

361
insert(Tab, Key, Value, ConfigPath) ->
362
    Record = #cache_record{
39✔
363
        key = Key,
364
        value = Value,
365
        deadline = deadline(ConfigPath)
366
    },
367
    try ets:insert(Tab, Record) of
39✔
368
        true -> ok
39✔
369
    catch
NEW
370
        error:badarg -> ok
×
371
    end.
372

373
limits_reached(ConfigPath, StatTab) ->
374
    MaxCount = config_value(ConfigPath, max_count, ?unlimited),
43✔
375
    MaxMemory = config_value(ConfigPath, max_memory, ?unlimited),
43✔
376
    [#stats{count = Count, memory = Memory}] = ets:lookup(StatTab, ?stat_key),
43✔
377
    ?tp(warning, node_cache_limits, #{
43✔
378
        count => Count,
379
        memory => Memory,
380
        max_count => MaxCount,
381
        max_memory => MaxMemory
382
    }),
383
    case {MaxCount, MaxMemory} of
43✔
384
        {MaxCount, _} when is_integer(MaxCount) andalso Count >= MaxCount -> true;
2✔
385
        {_, MaxMemory} when is_integer(MaxMemory) andalso Memory >= MaxMemory -> true;
2✔
386
        _ -> false
39✔
387
    end.
388

389
pt_state(#{name := Name} = _State) ->
390
    persistent_term:get(?pt_key(Name)).
765✔
391

392
%%--------------------------------------------------------------------
393
%% Metric helpers
394
%%--------------------------------------------------------------------
395

396
fold_counters(RawMetrics, Counters) ->
397
    lists:foldl(
31✔
398
        fun(Counter, Acc) ->
399
            case RawMetrics of
93✔
400
                #{counters := #{Counter := Value}, rate := #{Counter := Rate}} ->
401
                    Acc#{Counter => #{value => Value, rate => Rate}};
93✔
402
                _ ->
NEW
403
                    Acc
×
404
            end
405
        end,
406
        #{},
407
        Counters
408
    ).
409

410
fold_gauges(RawMetrics, Gauges) ->
411
    lists:foldl(
31✔
412
        fun(Gauge, Acc) ->
413
            case RawMetrics of
62✔
414
                #{gauges := #{Gauge := Value}} ->
415
                    Acc#{Gauge => Value};
62✔
416
                _ ->
NEW
417
                    Acc
×
418
            end
419
        end,
420
        #{},
421
        Gauges
422
    ).
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc