• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 14194765223

01 Apr 2025 11:46AM UTC coverage: 83.423%. First build
14194765223

Pull #14957

github

web-flow
Merge 94ec173ec into 1f2774e5a
Pull Request #14957: feat(emqx_plugins): respect plugin's `on_config_changed` callback response

69 of 84 new or added lines in 3 files covered. (82.14%)

62042 of 74370 relevant lines covered (83.42%)

16908.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.57
/apps/emqx_plugins/src/emqx_plugins_apps.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%--------------------------------------------------------------------
4

5
-module(emqx_plugins_apps).
6

7
%% Plugin's app lifecycle
8
-export([
9
    start/1,
10
    load/2,
11
    unload/1,
12
    stop/1,
13
    running_status/1
14
]).
15

16
%% Triggering app's callbacks
17
-export([
18
    on_config_changed/3
19
]).
20

21
-include("emqx_plugins.hrl").
22
-include_lib("emqx/include/logger.hrl").
23
-include_lib("snabbkaffe/include/trace.hrl").
24

25
-spec running_status(name_vsn()) -> running | loaded | stopped.
26
running_status(NameVsn) ->
27
    {AppName, _AppVsn} = emqx_plugins_utils:parse_name_vsn(NameVsn),
290✔
28
    RunningApps = running_apps(),
290✔
29
    LoadedApps = loaded_apps(),
290✔
30
    app_running_status(AppName, RunningApps, LoadedApps).
290✔
31

32
%% Stop all apps installed by the plugin package,
33
%% but not the ones shared with others.
34
-spec stop(emqx_plugins_info:t()) -> ok | {error, term()}.
35
stop(#{rel_apps := Apps}) ->
36
    %% load plugin apps and beam code
37
    AppsToStop = lists:filtermap(fun parse_name_vsn_for_stopping/1, Apps),
23✔
38
    case stop_apps(AppsToStop) of
23✔
39
        {ok, []} ->
40
            %% all apps stopped
41
            ok;
23✔
42
        {ok, Left} ->
43
            ?SLOG(warning, #{
×
44
                msg => "unabled_to_stop_plugin_apps",
45
                apps => Left,
46
                reason => "running_apps_still_depends_on_this_apps"
47
            }),
×
48
            ok;
×
49
        {error, Reason} ->
50
            {error, Reason}
×
51
    end.
52

53
-spec unload(emqx_plugins_info:t()) -> ok | {error, term()}.
54
unload(#{rel_apps := Apps}) ->
55
    RunningApps = running_apps(),
15✔
56
    LoadedApps = loaded_apps(),
15✔
57
    unload_apps(Apps, RunningApps, LoadedApps).
15✔
58

59
-spec load(emqx_plugins_info:t(), file:filename()) -> ok | {error, term()}.
60
load(#{rel_apps := Apps}, LibDir) ->
61
    LoadedApps = loaded_apps(),
26✔
62
    %% load plugin apps and beam code
63
    try
26✔
64
        lists:foreach(
26✔
65
            fun(AppNameVsn) ->
66
                {AppName, AppVsn} = emqx_plugins_utils:parse_name_vsn(AppNameVsn),
54✔
67
                EbinDir = filename:join([LibDir, AppNameVsn, "ebin"]),
54✔
68
                case load_plugin_app(AppName, AppVsn, EbinDir, LoadedApps) of
54✔
69
                    ok -> ok;
54✔
NEW
70
                    {error, Reason} -> throw(Reason)
×
71
                end
72
            end,
73
            Apps
74
        )
75
    catch
76
        throw:Reason ->
NEW
77
            {error, Reason}
×
78
    end.
79

80
-spec start(emqx_plugins_info:t()) -> ok | {error, term()}.
81
start(#{rel_apps := Apps}) ->
82
    AppNames =
35✔
83
        lists:map(
84
            fun(AppNameVsn) ->
85
                {AppName, _AppVsn} = emqx_plugins_utils:parse_name_vsn(AppNameVsn),
73✔
86
                AppName
73✔
87
            end,
88
            Apps
89
        ),
90
    try
35✔
91
        lists:foreach(
35✔
92
            fun(AppName) ->
93
                case start_app(AppName) of
73✔
94
                    ok -> ok;
72✔
95
                    {error, Reason} -> throw(Reason)
1✔
96
                end
97
            end,
98
            AppNames
99
        )
100
    catch
101
        throw:Reason ->
102
            {error, Reason}
1✔
103
    end.
104

105
%% @doc Call plugin's callback on_config_changed/2
106
-spec on_config_changed(name_vsn(), map(), map()) -> ok | {error, term()}.
107
on_config_changed(NameVsn, OldConf, NewConf) ->
108
    FuncName = on_config_changed,
25✔
109
    maybe
25✔
110
        {ok, PluginAppModule} ?= app_module_name(NameVsn),
25✔
111
        ok ?= is_callback_exported(PluginAppModule, FuncName, 2),
25✔
112
        try erlang:apply(PluginAppModule, FuncName, [OldConf, NewConf]) of
25✔
113
            ok -> ok;
16✔
114
            {error, _} = Error -> Error;
9✔
NEW
115
            Other -> {error, {bad_on_config_changed_return_value, Other}}
×
116
        catch
117
            Class:Error:Stacktrace ->
118
                ?SLOG(error, #{
×
119
                    msg => "failed_to_call_on_config_changed",
120
                    exception => Class,
121
                    reason => Error,
122
                    stacktrace => Stacktrace
123
                }),
×
NEW
124
                {error, Error}
×
125
        end
126
    else
127
        {error, Reason} ->
NEW
128
            ?SLOG(info, #{msg => "on_config_changed_callback_not_found", reason => Reason}),
×
NEW
129
            ok;
×
130
        _ ->
131
            ok
×
132
    end.
133

134
%%--------------------------------------------------------------------
135
%% Internal functions
136
%%--------------------------------------------------------------------
137

138
app_running_status(AppName, RunningApps, LoadedApps) ->
139
    case lists:keyfind(AppName, 1, LoadedApps) of
320✔
140
        {AppName, _} ->
141
            case lists:keyfind(AppName, 1, RunningApps) of
238✔
142
                {AppName, _} -> running;
86✔
143
                false -> loaded
152✔
144
            end;
145
        false ->
146
            stopped
82✔
147
    end.
148

149
load_plugin_app(AppName, AppVsn, Ebin, LoadedApps) ->
150
    case lists:keyfind(AppName, 1, LoadedApps) of
54✔
151
        false ->
152
            do_load_plugin_app(AppName, Ebin);
32✔
153
        {_, Vsn} ->
154
            case bin(Vsn) =:= bin(AppVsn) of
22✔
155
                true ->
156
                    %% already loaded on the exact version
157
                    ok;
22✔
158
                false ->
159
                    %% running but a different version
160
                    ?SLOG(warning, #{
×
161
                        msg => "plugin_app_already_loaded",
162
                        name => AppName,
163
                        loaded_vsn => Vsn,
164
                        loading_vsn => AppVsn
165
                    }),
×
166
                    ok
×
167
            end
168
    end.
169

170
do_load_plugin_app(AppName, Ebin) when is_binary(Ebin) ->
171
    do_load_plugin_app(AppName, binary_to_list(Ebin));
32✔
172
do_load_plugin_app(AppName, Ebin) ->
173
    _ = code:add_patha(Ebin),
32✔
174
    Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])),
32✔
175
    maybe
32✔
176
        ok ?= load_modules(Modules),
32✔
177
        ok ?= application:load(AppName)
32✔
178
    else
179
        {error, {already_loaded, _}} ->
180
            ok;
×
181
        {error, Reason} ->
182
            {error, #{
×
183
                msg => "failed_to_load_plugin_app",
184
                name => AppName,
185
                reason => Reason
186
            }}
187
    end.
188

189
load_modules([]) ->
190
    ok;
32✔
191
load_modules([BeamFile | Modules]) ->
192
    Module = list_to_atom(filename:basename(BeamFile, ".beam")),
417✔
193
    _ = code:purge(Module),
417✔
194
    case code:load_file(Module) of
417✔
195
        {module, _} ->
196
            load_modules(Modules);
417✔
197
        {error, Reason} ->
198
            {error, #{msg => "failed_to_load_plugin_beam", path => BeamFile, reason => Reason}}
×
199
    end.
200

201
start_app(App) ->
202
    case run_with_timeout(application, ensure_all_started, [App], 10_000) of
73✔
203
        {ok, {ok, Started}} ->
204
            case Started =/= [] of
72✔
205
                true -> ?SLOG(debug, #{msg => "started_plugin_apps", apps => Started});
26✔
206
                false -> ok
46✔
207
            end;
208
        {ok, {error, Reason}} ->
209
            {error, #{
1✔
210
                msg => "failed_to_start_app",
211
                app => App,
212
                reason => Reason
213
            }};
214
        {error, Reason} ->
215
            {error, #{
×
216
                msg => "failed_to_start_plugin_app",
217
                app => App,
218
                reason => Reason
219
            }}
220
    end.
221

222
%% On one hand, Elixir plugins might include Elixir itself, when targetting a non-Elixir
223
%% EMQX release.  If, on the other hand, the EMQX release already includes Elixir, we
224
%% shouldn't stop Elixir nor IEx.
225
-ifdef(EMQX_ELIXIR).
226
is_protected_app(elixir) -> true;
227
is_protected_app(iex) -> true;
228
is_protected_app(_) -> false.
229

230
parse_name_vsn_for_stopping(NameVsn) ->
231
    {AppName, _AppVsn} = emqx_plugins_utils:parse_name_vsn(NameVsn),
232
    case is_protected_app(AppName) of
233
        true ->
234
            false;
235
        false ->
236
            {true, AppName}
237
    end.
238
%% ELSE ifdef(EMQX_ELIXIR)
239
-else.
240
parse_name_vsn_for_stopping(NameVsn) ->
241
    {AppName, _AppVsn} = emqx_plugins_utils:parse_name_vsn(NameVsn),
48✔
242
    {true, AppName}.
48✔
243
%% END ifdef(EMQX_ELIXIR)
244
-endif.
245

246
stop_apps(Apps) ->
247
    RunningApps = running_apps(),
37✔
248
    case do_stop_apps(Apps, [], RunningApps) of
37✔
249
        %% all stopped
250
        {ok, []} -> {ok, []};
23✔
251
        %% no progress
252
        {ok, Remain} when Remain =:= Apps -> {ok, Apps};
×
253
        %% try again
254
        {ok, Remain} -> stop_apps(Remain);
14✔
255
        {error, Reason} -> {error, Reason}
×
256
    end.
257

258
do_stop_apps([], Remain, _AllApps) ->
259
    {ok, lists:reverse(Remain)};
37✔
260
do_stop_apps([App | Apps], Remain, RunningApps) ->
261
    case is_needed_by_any(App, RunningApps) of
63✔
262
        true ->
263
            do_stop_apps(Apps, [App | Remain], RunningApps);
15✔
264
        false ->
265
            case stop_app(App) of
48✔
266
                ok ->
267
                    do_stop_apps(Apps, Remain, RunningApps);
48✔
268
                {error, Reason} ->
269
                    {error, Reason}
×
270
            end
271
    end.
272

273
unload_apps([], _RunningApps, _LoadedApps) ->
274
    ok;
15✔
275
unload_apps([App | Apps], RunningApps, LoadedApps) ->
276
    _ =
30✔
277
        case app_running_status(App, RunningApps, LoadedApps) of
278
            running ->
NEW
279
                ?SLOG(warning, #{msg => "cannot_unload_running_app", app => App});
×
280
            loaded ->
NEW
281
                ok = unload_modules_and_app(App);
×
282
            stopped ->
283
                ok
30✔
284
        end,
285
    unload_apps(Apps, RunningApps, LoadedApps).
30✔
286

287
stop_app(App) ->
288
    case application:stop(App) of
48✔
289
        ok ->
290
            ?SLOG(debug, #{msg => "stop_plugin_successfully", app => App}),
28✔
291
            ok;
28✔
292
        {error, {not_started, App}} ->
293
            ?SLOG(debug, #{msg => "plugin_not_started", app => App}),
20✔
294
            ok;
20✔
295
        {error, Reason} ->
296
            {error, #{msg => "failed_to_stop_app", app => App, reason => Reason}}
×
297
    end.
298

299
unload_modules_and_app(App) ->
300
    case application:get_key(App, modules) of
×
301
        {ok, Modules} ->
302
            lists:foreach(fun code:soft_purge/1, Modules);
×
303
        _ ->
304
            ok
×
305
    end,
306
    _ = application:unload(App),
×
307
    ok.
×
308

309
is_needed_by_any(AppToStop, RunningApps) ->
310
    lists:any(
63✔
311
        fun({RunningApp, _RunningAppVsn}) ->
312
            is_needed_by(AppToStop, RunningApp)
2,883✔
313
        end,
314
        RunningApps
315
    ).
316

317
is_needed_by(AppToStop, AppToStop) ->
318
    false;
28✔
319
is_needed_by(AppToStop, RunningApp) ->
320
    case application:get_key(RunningApp, applications) of
2,855✔
321
        {ok, Deps} -> lists:member(AppToStop, Deps);
2,855✔
322
        undefined -> false
×
323
    end.
324

325
running_apps() ->
326
    lists:map(
342✔
327
        fun({N, _, V}) ->
328
            {N, V}
20,487✔
329
        end,
330
        application:which_applications(infinity)
331
    ).
332

333
loaded_apps() ->
334
    lists:map(
331✔
335
        fun({N, _, V}) ->
336
            {N, V}
24,407✔
337
        end,
338
        application:loaded_applications()
339
    ).
340

341
run_with_timeout(Module, Function, Args, Timeout) ->
342
    Self = self(),
73✔
343
    Fun = fun() ->
73✔
344
        Result = apply(Module, Function, Args),
73✔
345
        Self ! {self(), Result}
73✔
346
    end,
347
    Pid = spawn(Fun),
73✔
348
    TimerRef = erlang:send_after(Timeout, self(), {timeout, Pid}),
73✔
349
    receive
73✔
350
        {Pid, Result} ->
351
            _ = erlang:cancel_timer(TimerRef),
73✔
352
            {ok, Result};
73✔
353
        {timeout, Pid} ->
354
            exit(Pid, kill),
×
355
            {error, timeout}
×
356
    end.
357

358
app_module_name(NameVsn) ->
359
    {AppName, _} = emqx_plugins_utils:parse_name_vsn(NameVsn),
25✔
360
    case emqx_utils:safe_to_existing_atom(<<(bin(AppName))/binary, "_app">>) of
25✔
361
        {ok, AppModule} ->
362
            {ok, AppModule};
25✔
363
        {error, Reason} ->
364
            {error, {undefined_app_module, AppName, Reason}}
×
365
    end.
366

367
is_callback_exported(AppModule, FuncName, Arity) ->
368
    case erlang:function_exported(AppModule, FuncName, Arity) of
25✔
369
        true -> ok;
25✔
370
        false -> {error, {callback_not_exported, AppModule, FuncName, Arity}}
×
371
    end.
372

373
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
25✔
374
bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
44✔
375
bin(B) when is_binary(B) -> B.
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc