• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 14228011056

02 Apr 2025 07:32PM UTC coverage: 83.529%. First build
14228011056

Pull #14972

github

web-flow
Merge 6081a6b2e into ce6b1227a
Pull Request #14972: feat(plugins): implement config upload/download

28 of 32 new or added lines in 3 files covered. (87.5%)

62071 of 74311 relevant lines covered (83.53%)

16327.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.85
/apps/emqx_plugins/src/emqx_plugins_apps.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%--------------------------------------------------------------------
4

5
-module(emqx_plugins_apps).
6

7
-include("emqx_plugins.hrl").
8
-include_lib("emqx/include/logger.hrl").
9
-include_lib("snabbkaffe/include/trace.hrl").
10

11
%% Plugin's app lifecycle
12
-export([
13
    start/1,
14
    load/2,
15
    unload/1,
16
    stop/1,
17
    running_status/1
18
]).
19

20
%% Triggering app's callbacks
21
-export([
22
    on_config_changed/3,
23
    on_health_check/2
24
]).
25

26
-type health_check_options() :: #{}.
27

28
%%--------------------------------------------------------------------
29
%% API
30
%%--------------------------------------------------------------------
31

32
-spec running_status(name_vsn()) -> running | loaded | stopped.
33
running_status(NameVsn) ->
34
    {AppName, _AppVsn} = emqx_plugins_utils:parse_name_vsn(NameVsn),
416✔
35
    RunningApps = running_apps(),
416✔
36
    LoadedApps = loaded_apps(),
416✔
37
    app_running_status(AppName, RunningApps, LoadedApps).
416✔
38

39
-spec start(emqx_plugins_info:t()) -> ok | {error, term()}.
40
start(#{rel_apps := Apps}) ->
41
    AppNames =
35✔
42
        lists:map(
43
            fun(AppNameVsn) ->
44
                {AppName, _AppVsn} = emqx_plugins_utils:parse_name_vsn(AppNameVsn),
74✔
45
                AppName
74✔
46
            end,
47
            Apps
48
        ),
49
    try
35✔
50
        lists:foreach(
35✔
51
            fun(AppName) ->
52
                case start_app(AppName) of
74✔
53
                    ok -> ok;
74✔
54
                    {error, Reason} -> throw(Reason)
×
55
                end
56
            end,
57
            AppNames
58
        )
59
    catch
60
        throw:Reason ->
61
            {error, Reason}
×
62
    end.
63

64
%% Stop all apps installed by the plugin package,
65
%% but not the ones shared with others.
66
-spec stop(emqx_plugins_info:t()) -> ok | {error, term()}.
67
stop(#{rel_apps := Apps}) ->
68
    %% load plugin apps and beam code
69
    AppsToStop = lists:filtermap(fun parse_name_vsn_for_stopping/1, Apps),
26✔
70
    case stop_apps(AppsToStop) of
26✔
71
        {ok, []} ->
72
            %% all apps stopped
73
            ok;
26✔
74
        {ok, Left} ->
75
            ?SLOG(warning, #{
×
76
                msg => "unabled_to_stop_plugin_apps",
77
                apps => Left,
78
                reason => "running_apps_still_depends_on_this_apps"
79
            }),
×
80
            ok;
×
81
        {error, Reason} ->
82
            {error, Reason}
×
83
    end.
84

85
-spec load(emqx_plugins_info:t(), file:filename()) -> ok | {error, term()}.
86
load(#{rel_apps := Apps}, LibDir) ->
87
    LoadedApps = loaded_apps(),
64✔
88
    %% load plugin apps and beam code
89
    try
64✔
90
        lists:foreach(
64✔
91
            fun(AppNameVsn) ->
92
                {AppName, AppVsn} = emqx_plugins_utils:parse_name_vsn(AppNameVsn),
133✔
93
                EbinDir = filename:join([LibDir, AppNameVsn, "ebin"]),
133✔
94
                case load_plugin_app(AppName, AppVsn, EbinDir, LoadedApps) of
133✔
95
                    ok -> ok;
132✔
96
                    {error, Reason} -> throw(Reason)
1✔
97
                end
98
            end,
99
            Apps
100
        )
101
    catch
102
        throw:Reason ->
103
            {error, Reason}
1✔
104
    end.
105

106
-spec unload(emqx_plugins_info:t()) -> ok | {error, term()}.
107
unload(#{rel_apps := Apps}) ->
108
    RunningApps = running_apps(),
17✔
109
    LoadedApps = loaded_apps(),
17✔
110
    AppsForUnload = lists:filtermap(fun parse_name_vsn_for_stopping/1, Apps),
17✔
111
    ?SLOG(info, #{
17✔
112
        msg => "emqx_plugins_unloading_apps",
113
        apps => AppsForUnload
114
    }),
17✔
115
    unload_apps(AppsForUnload, RunningApps, LoadedApps).
17✔
116

117
%%--------------------------------------------------------------------
118
%% API for triggering app's callbacks
119
%%--------------------------------------------------------------------
120

121
-spec on_config_changed(name_vsn(), map(), map()) -> ok | {error, term()}.
122
on_config_changed(NameVsn, OldConf, NewConf) ->
123
    apply_callback(NameVsn, {on_config_changed, 2}, [OldConf, NewConf]).
27✔
124

125
-spec on_health_check(name_vsn(), health_check_options()) -> ok | {error, term()}.
126
on_health_check(NameVsn, Options) ->
127
    apply_callback(NameVsn, {on_health_check, 1}, [Options]).
25✔
128

129
%%--------------------------------------------------------------------
130
%% Internal functions
131
%%--------------------------------------------------------------------
132

133
apply_callback(NameVsn, {FuncName, Arity}, Args) ->
134
    maybe
52✔
135
        {ok, PluginAppModule} ?= app_module_name(NameVsn),
52✔
136
        ok ?= is_callback_exported(PluginAppModule, FuncName, Arity),
52✔
137
        try erlang:apply(PluginAppModule, FuncName, Args) of
52✔
138
            ok -> ok;
41✔
139
            {error, _} = Error -> Error;
11✔
140
            Other -> {error, {bad_callback_return_value, Other}}
×
141
        catch
142
            Class:Error:Stacktrace ->
143
                ?SLOG(error, #{
×
144
                    msg => "failed_to_apply_plugin_callback",
145
                    callback => {FuncName, Arity},
146
                    exception => Class,
147
                    reason => Error,
148
                    stacktrace => Stacktrace
149
                }),
×
150
                {error, Error}
×
151
        end
152
    else
153
        {error, Reason} ->
154
            ?SLOG(info, #{
×
155
                msg => "callback_not_found", callback => {FuncName, Arity}, reason => Reason
156
            }),
×
157
            ok;
×
158
        _ ->
159
            ok
×
160
    end.
161

162
app_running_status(AppName, RunningApps, LoadedApps) ->
163
    case lists:keyfind(AppName, 1, LoadedApps) of
450✔
164
        {AppName, _} ->
165
            case lists:keyfind(AppName, 1, RunningApps) of
346✔
166
                {AppName, _} -> running;
127✔
167
                false -> loaded
219✔
168
            end;
169
        false ->
170
            stopped
104✔
171
    end.
172

173
load_plugin_app(AppName, AppVsn, Ebin, LoadedApps) ->
174
    case lists:keyfind(AppName, 1, LoadedApps) of
133✔
175
        false ->
176
            do_load_plugin_app(AppName, Ebin);
53✔
177
        {_, Vsn} ->
178
            case bin(Vsn) =:= bin(AppVsn) of
80✔
179
                true ->
180
                    %% already loaded on the exact version
181
                    ok;
80✔
182
                false ->
183
                    %% running but a different version
184
                    ?SLOG(warning, #{
×
185
                        msg => "plugin_app_already_loaded",
186
                        name => AppName,
187
                        loaded_vsn => Vsn,
188
                        loading_vsn => AppVsn
189
                    }),
×
190
                    ok
×
191
            end
192
    end.
193

194
do_load_plugin_app(AppName, Ebin) when is_binary(Ebin) ->
195
    do_load_plugin_app(AppName, binary_to_list(Ebin));
53✔
196
do_load_plugin_app(AppName, Ebin) ->
197
    _ = code:add_patha(Ebin),
53✔
198
    Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])),
53✔
199
    maybe
53✔
200
        ok ?= load_modules(Modules),
53✔
201
        ok ?= application:load(AppName)
53✔
202
    else
203
        {error, {already_loaded, _}} ->
204
            ok;
×
205
        {error, Reason} ->
206
            {error, #{
1✔
207
                msg => "failed_to_load_plugin_app",
208
                name => AppName,
209
                reason => Reason
210
            }}
211
    end.
212

213
load_modules([]) ->
214
    ok;
53✔
215
load_modules([BeamFile | Modules]) ->
216
    Module = list_to_atom(filename:basename(BeamFile, ".beam")),
464✔
217
    _ = code:purge(Module),
464✔
218
    case code:load_file(Module) of
464✔
219
        {module, _} ->
220
            load_modules(Modules);
464✔
221
        {error, Reason} ->
222
            {error, #{msg => "failed_to_load_plugin_beam", path => BeamFile, reason => Reason}}
×
223
    end.
224

225
start_app(App) ->
226
    case run_with_timeout(application, ensure_all_started, [App], 10_000) of
74✔
227
        {ok, {ok, Started}} ->
228
            case Started =/= [] of
74✔
229
                true -> ?SLOG(debug, #{msg => "started_plugin_apps", apps => Started});
27✔
230
                false -> ok
47✔
231
            end;
232
        {ok, {error, Reason}} ->
233
            {error, #{
×
234
                msg => "failed_to_start_app",
235
                app => App,
236
                reason => Reason
237
            }};
238
        {error, Reason} ->
239
            {error, #{
×
240
                msg => "failed_to_start_plugin_app",
241
                app => App,
242
                reason => Reason
243
            }}
244
    end.
245

246
%% On one hand, Elixir plugins might include Elixir itself, when targetting a non-Elixir
247
%% EMQX release.  If, on the other hand, the EMQX release already includes Elixir, we
248
%% shouldn't stop Elixir nor IEx.
249
-ifdef(EMQX_ELIXIR).
250
is_protected_app(elixir) -> true;
251
is_protected_app(iex) -> true;
252
is_protected_app(_) -> false.
253

254
parse_name_vsn_for_stopping(NameVsn) ->
255
    {AppName, _AppVsn} = emqx_plugins_utils:parse_name_vsn(NameVsn),
256
    case is_protected_app(AppName) of
257
        true ->
258
            false;
259
        false ->
260
            {true, AppName}
261
    end.
262
%% ELSE ifdef(EMQX_ELIXIR)
263
-else.
264
parse_name_vsn_for_stopping(NameVsn) ->
265
    {AppName, _AppVsn} = emqx_plugins_utils:parse_name_vsn(NameVsn),
88✔
266
    {true, AppName}.
88✔
267
%% END ifdef(EMQX_ELIXIR)
268
-endif.
269

270
stop_apps(Apps) ->
271
    RunningApps = running_apps(),
41✔
272
    case do_stop_apps(Apps, [], RunningApps) of
41✔
273
        %% all stopped
274
        {ok, []} -> {ok, []};
26✔
275
        %% no progress
276
        {ok, Remain} when Remain =:= Apps -> {ok, Apps};
×
277
        %% try again
278
        {ok, Remain} -> stop_apps(Remain);
15✔
279
        {error, Reason} -> {error, Reason}
×
280
    end.
281

282
do_stop_apps([], Remain, _AllApps) ->
283
    {ok, lists:reverse(Remain)};
41✔
284
do_stop_apps([App | Apps], Remain, RunningApps) ->
285
    case is_needed_by_any(App, RunningApps) of
70✔
286
        true ->
287
            do_stop_apps(Apps, [App | Remain], RunningApps);
16✔
288
        false ->
289
            case stop_app(App) of
54✔
290
                ok ->
291
                    do_stop_apps(Apps, Remain, RunningApps);
54✔
292
                {error, Reason} ->
293
                    {error, Reason}
×
294
            end
295
    end.
296

297
unload_apps([], _RunningApps, _LoadedApps) ->
298
    ok;
17✔
299
unload_apps([App | Apps], RunningApps, LoadedApps) ->
300
    _ =
34✔
301
        case app_running_status(App, RunningApps, LoadedApps) of
302
            running ->
NEW
303
                ?SLOG(warning, #{msg => "emqx_plugins_cannot_unload_running_app", app => App});
×
304
            loaded ->
305
                ?SLOG(debug, #{msg => "emqx_plugins_unloading_loaded_app", app => App}),
30✔
306
                ok = unload_modules_and_app(App);
30✔
307
            stopped ->
308
                ?SLOG(debug, #{msg => "emqx_plugins_app_already_unloaded", app => App}),
4✔
309
                ok
4✔
310
        end,
311
    unload_apps(Apps, RunningApps, LoadedApps).
34✔
312

313
stop_app(App) ->
314
    case application:stop(App) of
54✔
315
        ok ->
316
            ?SLOG(debug, #{msg => "emqx_plugins_stop_plugin_successfully", app => App}),
30✔
317
            ok;
30✔
318
        {error, {not_started, App}} ->
319
            ?SLOG(debug, #{msg => "emqx_plugins_plugin_not_started", app => App}),
24✔
320
            ok;
24✔
321
        {error, Reason} ->
NEW
322
            {error, #{msg => "emqx_plugins_failed_to_stop_app", app => App, reason => Reason}}
×
323
    end.
324

325
unload_modules_and_app(App) ->
326
    case application:get_key(App, modules) of
30✔
327
        {ok, Modules} ->
328
            ?SLOG(debug, #{msg => "emqx_plugins_purging_modules", app => App, modules => Modules}),
30✔
329
            lists:foreach(fun code:soft_purge/1, Modules);
30✔
330
        _ ->
331
            ok
×
332
    end,
333
    Result = application:unload(App),
30✔
334
    ?SLOG(debug, #{msg => "emqx_plugins_unloaded_app", app => App, result => Result}),
30✔
335
    ok.
30✔
336

337
is_needed_by_any(AppToStop, RunningApps) ->
338
    lists:any(
70✔
339
        fun({RunningApp, _RunningAppVsn}) ->
340
            is_needed_by(AppToStop, RunningApp)
3,307✔
341
        end,
342
        RunningApps
343
    ).
344

345
is_needed_by(AppToStop, AppToStop) ->
346
    false;
30✔
347
is_needed_by(AppToStop, RunningApp) ->
348
    case application:get_key(RunningApp, applications) of
3,277✔
349
        {ok, Deps} -> lists:member(AppToStop, Deps);
3,277✔
350
        undefined -> false
×
351
    end.
352

353
running_apps() ->
354
    lists:map(
474✔
355
        fun({N, _, V}) ->
356
            {N, V}
28,856✔
357
        end,
358
        application:which_applications(infinity)
359
    ).
360

361
loaded_apps() ->
362
    lists:map(
497✔
363
        fun({N, _, V}) ->
364
            {N, V}
35,583✔
365
        end,
366
        application:loaded_applications()
367
    ).
368

369
run_with_timeout(Module, Function, Args, Timeout) ->
370
    Self = self(),
74✔
371
    Fun = fun() ->
74✔
372
        Result = apply(Module, Function, Args),
74✔
373
        Self ! {self(), Result}
74✔
374
    end,
375
    Pid = spawn(Fun),
74✔
376
    TimerRef = erlang:send_after(Timeout, self(), {timeout, Pid}),
74✔
377
    receive
74✔
378
        {Pid, Result} ->
379
            _ = erlang:cancel_timer(TimerRef),
74✔
380
            {ok, Result};
74✔
381
        {timeout, Pid} ->
382
            exit(Pid, kill),
×
383
            {error, timeout}
×
384
    end.
385

386
app_module_name(NameVsn) ->
387
    {AppName, _} = emqx_plugins_utils:parse_name_vsn(NameVsn),
52✔
388
    case emqx_utils:safe_to_existing_atom(<<(bin(AppName))/binary, "_app">>) of
52✔
389
        {ok, AppModule} ->
390
            {ok, AppModule};
52✔
391
        {error, Reason} ->
392
            {error, {undefined_app_module, AppName, Reason}}
×
393
    end.
394

395
is_callback_exported(AppModule, FuncName, Arity) ->
396
    case erlang:function_exported(AppModule, FuncName, Arity) of
52✔
397
        true -> ok;
52✔
398
        false -> {error, {callback_not_exported, AppModule, FuncName, Arity}}
×
399
    end.
400

401
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
52✔
402
bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
160✔
403
bin(B) when is_binary(B) -> B.
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc