• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 14194765223

01 Apr 2025 11:46AM UTC coverage: 83.423%. First build
14194765223

Pull #14957

github

web-flow
Merge 94ec173ec into 1f2774e5a
Pull Request #14957: feat(emqx_plugins): respect plugin's `on_config_changed` callback response

69 of 84 new or added lines in 3 files covered. (82.14%)

62042 of 74370 relevant lines covered (83.42%)

16908.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.78
/apps/emqx_plugins/src/emqx_plugins.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2017-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(emqx_plugins).
18

19
-feature(maybe_expr, enable).
20

21
-include("emqx_plugins.hrl").
22
-include_lib("emqx/include/logger.hrl").
23
-include_lib("snabbkaffe/include/trace.hrl").
24

25
-ifdef(TEST).
26
-include_lib("eunit/include/eunit.hrl").
27
-endif.
28

29
-export([
30
    describe/1,
31
    plugin_schema/1,
32
    plugin_i18n/1
33
]).
34

35
%% Package operations
36
-export([
37
    allow_installation/1,
38
    forget_allowed_installation/1,
39
    is_allowed_installation/1,
40

41
    ensure_installed/0,
42
    ensure_installed/1,
43
    ensure_installed/2,
44
    ensure_uninstalled/1,
45
    ensure_enabled/1,
46
    ensure_enabled/2,
47
    ensure_enabled/3,
48
    ensure_disabled/1,
49
    purge/1,
50
    write_package/2,
51
    is_package_present/1,
52
    delete_package/1
53
]).
54

55
%% Plugin runtime management
56
-export([
57
    ensure_started/0,
58
    ensure_started/1,
59
    ensure_stopped/0,
60
    ensure_stopped/1,
61
    restart/1,
62
    list/0,
63
    list/1
64
]).
65

66
%% Plugin config APIs
67
-export([
68
    get_config/1,
69
    get_config/2,
70
    update_config/2
71
]).
72

73
%% Package utils
74
-export([
75
    decode_plugin_config_map/2
76
]).
77

78
%% `emqx_config_handler' API
79
-export([
80
    post_config_update/5
81
]).
82

83
%% internal RPC targets
84
-export([
85
    get_tar/1,
86
    get_config/3
87
]).
88

89
%% for test cases
90
-export([put_config_internal/2]).
91

92
-ifdef(TEST).
93
-compile(export_all).
94
-compile(nowarn_export_all).
95
-endif.
96

97
%% Defines
98
-define(PLUGIN_CONFIG_PT_KEY(NameVsn), {?MODULE, NameVsn}).
99

100
-define(CATCH(BODY), catch_errors(atom_to_list(?FUNCTION_NAME), fun() -> BODY end)).
101

102
-define(APP, emqx_plugins).
103

104
-define(allowed_installations, allowed_installations).
105

106
%%--------------------------------------------------------------------
107
%% APIs
108
%%--------------------------------------------------------------------
109

110
%% @doc Describe a plugin.
111
-spec describe(name_vsn()) -> {ok, emqx_plugins_info:t()} | {error, any()}.
112
describe(NameVsn) ->
113
    read_plugin_info(NameVsn, #{fill_readme => true}).
29✔
114

115
-spec plugin_schema(name_vsn()) -> {ok, schema_json_map()} | {error, any()}.
116
plugin_schema(NameVsn) ->
117
    ?CATCH(emqx_plugins_fs:read_avsc_map(NameVsn)).
×
118

119
-spec plugin_i18n(name_vsn()) -> {ok, i18n_json_map()} | {error, any()}.
120
plugin_i18n(NameVsn) ->
121
    ?CATCH(emqx_plugins_fs:read_i18n(NameVsn)).
×
122

123
%% Note: this is only used for the HTTP API.
124
%% We could use `application:set_env', but the typespec for it makes dialyzer sad when it
125
%% seems a non-atom key...
126
-spec allow_installation(binary() | string()) -> ok.
127
allow_installation(NameVsn0) ->
128
    NameVsn = bin(NameVsn0),
10✔
129
    Allowed0 = application:get_env(?APP, ?allowed_installations, #{}),
10✔
130
    Allowed = Allowed0#{NameVsn => true},
10✔
131
    application:set_env(?APP, ?allowed_installations, Allowed),
10✔
132
    ok.
10✔
133

134
%% Note: this is only used for the HTTP API.
135
-spec is_allowed_installation(binary() | string()) -> boolean().
136
is_allowed_installation(NameVsn0) ->
137
    NameVsn = bin(NameVsn0),
10✔
138
    Allowed = application:get_env(?APP, ?allowed_installations, #{}),
10✔
139
    maps:get(NameVsn, Allowed, false).
10✔
140

141
%% Note: this is only used for the HTTP API.
142
-spec forget_allowed_installation(binary() | string()) -> ok.
143
forget_allowed_installation(NameVsn0) ->
144
    NameVsn = bin(NameVsn0),
6✔
145
    Allowed0 = application:get_env(?APP, ?allowed_installations, #{}),
6✔
146
    Allowed = maps:remove(NameVsn, Allowed0),
6✔
147
    application:set_env(?APP, ?allowed_installations, Allowed),
6✔
148
    ok.
6✔
149

150
%%--------------------------------------------------------------------
151
%% Package operations
152

153
%% @doc Start all configured plugins are started.
154
-spec ensure_installed() -> ok.
155
ensure_installed() ->
156
    Fun = fun(#{name_vsn := NameVsn}) ->
753✔
157
        case ensure_installed(NameVsn) of
11✔
158
            ok -> [];
11✔
159
            {error, Reason} -> [{NameVsn, Reason}]
×
160
        end
161
    end,
162
    ok = for_plugins(Fun).
753✔
163

164
%% @doc
165
%% * Install a .tar.gz package placed in install_dir
166
%% * Configure the plugin
167
-spec ensure_installed(name_vsn()) -> ok | {error, map()}.
168
ensure_installed(NameVsn) ->
169
    case read_plugin_info(NameVsn, #{}) of
28✔
170
        {ok, #{running_status := RunningSt}} ->
171
            configure(NameVsn, ?normal, RunningSt);
8✔
172
        {error, _} ->
173
            ok = purge(NameVsn),
20✔
174
            install_and_configure(NameVsn, ?normal, stopped)
20✔
175
    end.
176

177
ensure_installed(NameVsn, ?fresh_install = Mode) ->
178
    %% TODO
179
    %% Additionally check if the plugin is actually stopped/uninstalled.
180
    %% Currently, external layers (API, CLI) are responsible for
181
    %% not allowing to install a plugin that is already installed.
182
    install_and_configure(NameVsn, Mode, stopped).
11✔
183

184
%% @doc Ensure files and directories for the given plugin are being deleted.
185
%% If a plugin is running, or enabled, an error is returned.
186
-spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}.
187
ensure_uninstalled(NameVsn) ->
188
    case read_plugin_info(NameVsn, #{}) of
23✔
189
        {ok, #{running_status := running}} ->
190
            {error, #{
2✔
191
                msg => "bad_plugin_running_status",
192
                hint => "stop_the_plugin_first"
193
            }};
194
        {ok, #{config_status := enabled}} ->
195
            {error, #{
1✔
196
                msg => "bad_plugin_config_status",
197
                hint => "disable_the_plugin_first"
198
            }};
199
        {ok, Plugin} ->
200
            ok = emqx_plugins_apps:unload(Plugin),
15✔
201
            ok = purge(NameVsn),
15✔
202
            ensure_delete_state(NameVsn);
15✔
203
        {error, _Reason} ->
204
            ensure_delete_state(NameVsn)
5✔
205
    end.
206

207
%% @doc Ensure a plugin is enabled to the end of the plugins list.
208
-spec ensure_enabled(name_vsn()) -> ok | {error, any()}.
209
ensure_enabled(NameVsn) ->
210
    ensure_enabled(NameVsn, no_move).
16✔
211

212
%% @doc Ensure a plugin is enabled at the given position of the plugin list.
213
-spec ensure_enabled(name_vsn(), position()) -> ok | {error, any()}.
214
ensure_enabled(NameVsn, Position) ->
215
    ensure_state(NameVsn, Position, _Enabled = true, _ConfLocation = local).
17✔
216

217
-spec ensure_enabled(name_vsn(), position(), local | global) -> ok | {error, any()}.
218
ensure_enabled(NameVsn, Position, ConfLocation) when
219
    ConfLocation =:= local; ConfLocation =:= global
220
->
221
    ensure_state(NameVsn, Position, _Enabled = true, ConfLocation).
4✔
222

223
%% @doc Ensure a plugin is disabled.
224
-spec ensure_disabled(name_vsn()) -> ok | {error, any()}.
225
ensure_disabled(NameVsn) ->
226
    ensure_state(NameVsn, no_move, false, _ConfLocation = local).
12✔
227

228
%% @doc Delete extracted dir
229
%% In case one lib is shared by multiple plugins.
230
%% it might be the case that purging one plugin's install dir
231
%% will cause deletion of loaded beams.
232
%% It should not be a problem, because shared lib should
233
%% reside in all the plugin install dirs.
234
-spec purge(name_vsn()) -> ok.
235
purge(NameVsn) ->
236
    ok = delete_cached_config(NameVsn),
39✔
237
    emqx_plugins_fs:purge_installed(NameVsn).
39✔
238

239
%% @doc Write the package file.
240
-spec write_package(name_vsn(), binary()) -> ok.
241
write_package(NameVsn, Bin) ->
242
    emqx_plugins_fs:write_tar(NameVsn, Bin).
9✔
243

244
%% @doc Check if the package file is present.
245
-spec is_package_present(name_vsn()) -> false | {true, [file:filename()]}.
246
is_package_present(NameVsn) ->
247
    emqx_plugins_fs:is_tar_present(NameVsn).
10✔
248

249
%% @doc Delete the package file.
250
-spec delete_package(name_vsn()) -> ok.
251
delete_package(NameVsn) ->
252
    _ = emqx_plugins_serde:delete_schema(NameVsn),
13✔
253
    emqx_plugins_fs:delete_tar(NameVsn).
13✔
254

255
%%--------------------------------------------------------------------
256
%% Plugin runtime management
257

258
%% @doc Start all configured plugins are started.
259
-spec ensure_started() -> ok.
260
ensure_started() ->
261
    Fun = fun
755✔
262
        (#{name_vsn := NameVsn, enable := true}) ->
263
            case ?CATCH(do_ensure_started(NameVsn)) of
14✔
264
                ok -> [];
13✔
265
                {error, Reason} -> [{NameVsn, Reason}]
1✔
266
            end;
267
        (#{name_vsn := NameVsn, enable := false}) ->
268
            ?SLOG(debug, #{msg => "plugin_disabled", name_vsn => NameVsn}),
1✔
269
            []
1✔
270
    end,
271
    ok = for_plugins(Fun).
755✔
272

273
%% @doc Start a plugin from Management API or CLI.
274
%% the input is a <name>-<vsn> string.
275
-spec ensure_started(name_vsn()) -> ok | {error, term()}.
276
ensure_started(NameVsn) ->
277
    case ?CATCH(do_ensure_started(NameVsn)) of
21✔
278
        ok ->
279
            ok;
21✔
280
        {error, ReasonMap} ->
281
            ?SLOG(error, ReasonMap#{msg => "failed_to_start_plugin"}),
×
282
            {error, ReasonMap}
×
283
    end.
284

285
%% @doc Stop all plugins before broker stops.
286
-spec ensure_stopped() -> ok.
287
ensure_stopped() ->
288
    Fun = fun
6✔
289
        (#{name_vsn := NameVsn, enable := true}) ->
290
            case ensure_stopped(NameVsn) of
6✔
291
                ok ->
292
                    [];
6✔
293
                {error, Reason} ->
294
                    [{NameVsn, Reason}]
×
295
            end;
296
        (#{name_vsn := NameVsn, enable := false}) ->
297
            ?SLOG(debug, #{msg => "plugin_disabled", action => stop_plugin, name_vsn => NameVsn}),
1✔
298
            []
1✔
299
    end,
300
    ok = for_plugins(Fun).
6✔
301

302
%% @doc Stop a plugin from Management API or CLI.
303
-spec ensure_stopped(name_vsn()) -> ok | {error, term()}.
304
ensure_stopped(NameVsn) ->
305
    ?CATCH(do_ensure_stopped(NameVsn)).
24✔
306

307
do_ensure_stopped(NameVsn) ->
308
    case emqx_plugins_info:read(NameVsn) of
24✔
309
        {ok, Plugin} ->
310
            emqx_plugins_apps:stop(Plugin);
23✔
311
        {error, Reason} ->
312
            {error, Reason}
1✔
313
    end.
314

315
-spec get_config(name_vsn()) -> plugin_config_map().
316
get_config(NameVsn) ->
317
    get_config(NameVsn, #{}).
5✔
318

319
-spec get_config(name_vsn(), term()) -> plugin_config_map() | term().
320
get_config(NameVsn, Default) ->
321
    get_cached_config(NameVsn, Default).
30✔
322

323
%% @doc Update plugin's config.
324
%% RPC call from Management API or CLI.
325
%% NOTE
326
%% This function assumes that the config is already validated
327
%% by avro schema in case of its presence.
328
update_config(NameVsn, Config) ->
329
    maybe
22✔
330
        {ok, Plugin} ?= emqx_plugins_info:read(NameVsn, #{}),
22✔
331
        ok ?= request_config_change(NameVsn, Plugin, Config),
18✔
332
        ok = emqx_plugins_local_config:backup_and_update(NameVsn, Config),
9✔
333
        ok = put_cached_config(NameVsn, Config)
9✔
334
    end.
335

336
%% @doc Stop and then start the plugin.
337
restart(NameVsn) ->
338
    case ensure_stopped(NameVsn) of
2✔
339
        ok -> ensure_started(NameVsn);
2✔
340
        {error, Reason} -> {error, Reason}
×
341
    end.
342

343
%% @doc List all installed plugins.
344
%% Including the ones that are installed, but not enabled in config.
345
-spec list() -> [emqx_plugins_info:t()].
346
list() ->
347
    list(normal).
67✔
348

349
-spec list(all | normal | hidden) -> [emqx_plugins_info:t()].
350
list(Type) ->
351
    All = lists:filtermap(
67✔
352
        fun(NameVsn) ->
353
            case read_plugin_info(NameVsn, #{}) of
42✔
354
                {ok, Info} ->
355
                    filter_plugin_of_type(Type, Info);
41✔
356
                {error, Reason} ->
357
                    ?SLOG(warning, Reason#{msg => "failed_to_read_plugin_info"}),
1✔
358
                    false
1✔
359
            end
360
        end,
361
        emqx_plugins_fs:list_name_vsn()
362
    ),
363
    do_list(configured(), All).
67✔
364

365
filter_plugin_of_type(all, Info) ->
366
    {true, Info};
×
367
filter_plugin_of_type(normal, #{hidden := true}) ->
368
    false;
×
369
filter_plugin_of_type(normal, Info) ->
370
    {true, Info};
41✔
371
filter_plugin_of_type(hidden, #{hidden := true} = Info) ->
372
    {true, Info};
×
373
filter_plugin_of_type(hidden, _Info) ->
374
    false.
×
375

376
%%--------------------------------------------------------------------
377
%% Package utils
378

379
-spec decode_plugin_config_map(name_vsn(), map() | binary()) ->
380
    {ok, map() | ?plugin_without_config_schema}
381
    | {error, term()}.
382
decode_plugin_config_map(NameVsn, AvroJsonMap) ->
383
    case has_avsc(NameVsn) of
7✔
384
        true ->
385
            case emqx_plugins_serde:decode(NameVsn, ensure_config_bin(AvroJsonMap)) of
1✔
386
                {ok, Config} ->
387
                    {ok, Config};
1✔
388
                {error, #{reason := plugin_serde_not_found}} ->
389
                    Reason = "plugin_config_schema_serde_not_found",
×
390
                    ?SLOG(error, #{
×
391
                        msg => Reason, name_vsn => NameVsn, plugin_with_avro_schema => true
392
                    }),
×
393
                    {error, Reason};
×
394
                {error, _} = Error ->
395
                    Error
×
396
            end;
397
        false ->
398
            ?SLOG(debug, #{
6✔
399
                msg => "plugin_without_config_schema",
400
                name_vsn => NameVsn
401
            }),
6✔
402
            {ok, ?plugin_without_config_schema}
6✔
403
    end.
404

405
-spec has_avsc(name_vsn()) -> boolean().
406
has_avsc(NameVsn) ->
407
    case read_plugin_info(NameVsn, #{fill_readme => false}) of
36✔
408
        {ok, #{with_config_schema := WithAvsc}} when is_boolean(WithAvsc) ->
409
            WithAvsc;
35✔
410
        _ ->
411
            false
1✔
412
    end.
413

414
get_config_internal(Key, Default) when is_atom(Key) ->
415
    get_config_internal([Key], Default);
1,709✔
416
get_config_internal(Path, Default) ->
417
    emqx_conf:get([?CONF_ROOT | Path], Default).
1,712✔
418

419
put_config_internal(Key, Value) ->
420
    do_put_config_internal(Key, Value, _ConfLocation = local).
21✔
421

422
%%--------------------------------------------------------------------
423
%% Internal
424
%%--------------------------------------------------------------------
425

426
ensure_delete_state(NameVsn0) ->
427
    NameVsn = bin(NameVsn0),
20✔
428
    List = configured(),
20✔
429
    put_configured(lists:filter(fun(#{name_vsn := N1}) -> bin(N1) =/= NameVsn end, List)),
20✔
430
    ok.
20✔
431

432
ensure_state(NameVsn, Position, State, ConfLocation) when is_binary(NameVsn) ->
433
    ensure_state(binary_to_list(NameVsn), Position, State, ConfLocation);
34✔
434
ensure_state(NameVsn, Position, State, ConfLocation) ->
435
    case read_plugin_info(NameVsn, #{}) of
71✔
436
        {ok, _} ->
437
            Item = #{
69✔
438
                name_vsn => NameVsn,
439
                enable => State
440
            },
441
            ?CATCH(ensure_configured(Item, Position, ConfLocation));
69✔
442
        {error, Reason} ->
443
            ?SLOG(error, #{msg => "ensure_plugin_states_failed", reason => Reason}),
2✔
444
            {error, Reason}
2✔
445
    end.
446

447
ensure_configured(#{name_vsn := NameVsn} = Item, Position, ConfLocation) ->
448
    Configured = configured(),
69✔
449
    SplitFun = fun(#{name_vsn := NV}) -> bin(NV) =/= bin(NameVsn) end,
69✔
450
    {Front, Rear} = lists:splitwith(SplitFun, Configured),
69✔
451
    try
69✔
452
        NewConfigured =
69✔
453
            case Rear of
454
                [_ | More] when Position =:= no_move ->
455
                    Front ++ [Item | More];
41✔
456
                [_ | More] ->
457
                    add_new_configured(Front ++ More, Position, Item);
7✔
458
                [] ->
459
                    add_new_configured(Configured, Position, Item)
21✔
460
            end,
461
        ok = put_configured(NewConfigured, ConfLocation)
69✔
462
    catch
463
        throw:Reason ->
464
            {error, Reason}
×
465
    end.
466

467
add_new_configured(Configured, no_move, Item) ->
468
    %% default to rear
469
    add_new_configured(Configured, rear, Item);
19✔
470
add_new_configured(Configured, front, Item) ->
471
    [Item | Configured];
2✔
472
add_new_configured(Configured, rear, Item) ->
473
    Configured ++ [Item];
22✔
474
add_new_configured(Configured, {Action, NameVsn}, Item) ->
475
    SplitFun = fun(#{name_vsn := NV}) -> bin(NV) =/= bin(NameVsn) end,
4✔
476
    {Front, Rear} = lists:splitwith(SplitFun, Configured),
4✔
477
    Rear =:= [] andalso
4✔
478
        throw(#{
×
479
            msg => "position_anchor_plugin_not_configured",
480
            hint => "maybe_install_and_configure",
481
            name_vsn => NameVsn
482
        }),
483
    case Action of
4✔
484
        before ->
485
            Front ++ [Item | Rear];
2✔
486
        behind ->
487
            [Anchor | Rear0] = Rear,
2✔
488
            Front ++ [Anchor, Item | Rear0]
2✔
489
    end.
490

491
%% Make sure configured ones are ordered in front.
492
do_list([], All) ->
493
    All;
67✔
494
do_list([#{name_vsn := NameVsn} | Rest], All) ->
495
    SplitF = fun(#{name := Name, rel_vsn := Vsn}) ->
41✔
496
        bin([Name, "-", Vsn]) =/= bin(NameVsn)
46✔
497
    end,
498
    case lists:splitwith(SplitF, All) of
41✔
499
        {_, []} ->
500
            do_list(Rest, All);
×
501
        {Front, [I | Rear]} ->
502
            [I | do_list(Rest, Front ++ Rear)]
41✔
503
    end.
504

505
do_ensure_started(NameVsn) ->
506
    maybe
35✔
507
        ok ?= install(NameVsn, ?normal),
35✔
508
        {ok, Plugin} ?= emqx_plugins_info:read(NameVsn),
35✔
509
        ok ?= emqx_plugins_apps:start(Plugin)
35✔
510
    else
511
        {error, Reason} ->
512
            ?SLOG(error, #{
1✔
513
                msg => "failed_to_start_plugin",
514
                name_vsn => NameVsn,
515
                reason => Reason
516
            }),
1✔
517
            {error, Reason}
1✔
518
    end.
519

520
%%--------------------------------------------------------------------
521
%% RPC targets
522
%%--------------------------------------------------------------------
523

524
%% Redundant arguments are kept for backward compatibility.
525
-spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP, any()) ->
526
    {ok, plugin_config_map() | term()}.
527
get_config(NameVsn, ?CONFIG_FORMAT_MAP, Default) ->
528
    {ok, get_config(NameVsn, Default)}.
25✔
529

530
get_tar(NameVsn) ->
531
    emqx_plugins_fs:get_tar(NameVsn).
3✔
532

533
%%--------------------------------------------------------------------
534
%% Internal functions
535
%%--------------------------------------------------------------------
536

537
%% @doc Converts exception errors into `{error, Reason}` return values
538
catch_errors(Label, F) ->
539
    try
128✔
540
        F()
128✔
541
    catch
542
        error:Reason:Stacktrace ->
543
            %% unexpected errors, log stacktrace
544
            ?SLOG(warning, #{
×
545
                msg => "plugin_op_failed",
546
                which_op => Label,
547
                exception => Reason,
548
                stacktrace => Stacktrace
549
            }),
×
550
            {error, #{
×
551
                which_op => Label,
552
                exception => Reason,
553
                stacktrace => Stacktrace
554
            }}
555
    end.
556

557
%% read plugin info from the JSON file
558
%% returns {ok, Info} or {error, Reason}
559
read_plugin_info(NameVsn, Options) ->
560
    emqx_plugins_info:read(NameVsn, Options).
231✔
561

562
ensure_installed_locally(NameVsn) ->
563
    emqx_plugins_fs:ensure_installed_from_tar(
69✔
564
        NameVsn,
565
        fun() ->
566
            case emqx_plugins_info:read(NameVsn) of
29✔
567
                {ok, Plugin} ->
568
                    emqx_plugins_apps:load(Plugin, emqx_plugins_fs:lib_dir(NameVsn));
26✔
569
                {error, _} = Error ->
570
                    Error
3✔
571
            end
572
        end
573
    ).
574

575
install_and_configure(NameVsn, Mode, RunningSt) ->
576
    maybe
31✔
577
        ok ?= install(NameVsn, Mode),
31✔
578
        configure(NameVsn, Mode, RunningSt)
26✔
579
    end.
580

581
configure(NameVsn, Mode, RunningSt) ->
582
    ok = load_config_schema(NameVsn),
34✔
583
    maybe
34✔
584
        ok ?= ensure_local_config(NameVsn, Mode),
34✔
585
        configure_from_local_config(NameVsn, RunningSt)
29✔
586
    end,
587
    ensure_state(NameVsn).
34✔
588

589
%% Install from local tarball or get tarball from cluster
590
install(NameVsn, Mode) ->
591
    case {ensure_installed_locally(NameVsn), Mode} of
66✔
592
        {ok, _} ->
593
            ok;
58✔
594
        {{error, #{reason := plugin_tarball_not_found}}, ?normal} ->
595
            case get_from_cluster(NameVsn) of
4✔
596
                ok ->
597
                    ensure_installed_locally(NameVsn);
3✔
598
                {error, Reason} ->
599
                    {error, Reason}
1✔
600
            end;
601
        {{error, Reason}, _} ->
602
            {error, Reason}
4✔
603
    end.
604

605
get_from_cluster(NameVsn) ->
606
    Nodes = [N || N <- mria:running_nodes(), N /= node()],
4✔
607
    case get_plugin_tar_from_any_node(Nodes, NameVsn, []) of
4✔
608
        {ok, TarContent} ->
609
            emqx_plugins_fs:write_tar(NameVsn, TarContent);
3✔
610
        {error, NodeErrors} when Nodes =/= [] ->
611
            ErrMeta = #{
×
612
                msg => "failed_to_copy_plugin_from_other_nodes",
613
                name_vsn => NameVsn,
614
                node_errors => NodeErrors,
615
                reason => plugin_not_found
616
            },
617
            ?SLOG(error, ErrMeta),
×
618
            {error, ErrMeta};
×
619
        {error, _} ->
620
            ErrMeta = #{
1✔
621
                msg => "no_nodes_to_copy_plugin_from",
622
                name_vsn => NameVsn,
623
                reason => plugin_not_found
624
            },
625
            ?SLOG(error, ErrMeta),
1✔
626
            {error, ErrMeta}
1✔
627
    end.
628

629
get_plugin_tar_from_any_node([], _NameVsn, Errors) ->
630
    {error, Errors};
1✔
631
get_plugin_tar_from_any_node([Node | T], NameVsn, Errors) ->
632
    case emqx_plugins_proto_v2:get_tar(Node, NameVsn, infinity) of
3✔
633
        {ok, _} = Res ->
634
            ?SLOG(debug, #{
3✔
635
                msg => "get_plugin_tar_from_cluster_successfully",
636
                node => Node,
637
                name_vsn => NameVsn
638
            }),
3✔
639
            Res;
3✔
640
        Err ->
641
            get_plugin_tar_from_any_node(T, NameVsn, [{Node, Err} | Errors])
×
642
    end.
643

644
get_plugin_config_from_any_node([], _NameVsn, Errors) ->
645
    {error, Errors};
14✔
646
get_plugin_config_from_any_node([Node | RestNodes], NameVsn, Errors) ->
647
    case
25✔
648
        emqx_plugins_proto_v2:get_config(
649
            Node, NameVsn, ?CONFIG_FORMAT_MAP, ?plugin_conf_not_found, 5_000
650
        )
651
    of
652
        {ok, ?plugin_conf_not_found} ->
653
            Err = {error, {config_not_found_on_node, Node, NameVsn}},
15✔
654
            get_plugin_config_from_any_node(RestNodes, NameVsn, [{Node, Err} | Errors]);
15✔
655
        {ok, _} = Res ->
656
            ?SLOG(debug, #{
10✔
657
                msg => "get_plugin_config_from_cluster_successfully",
658
                node => Node,
659
                name_vsn => NameVsn
660
            }),
10✔
661
            Res;
10✔
662
        Err ->
663
            get_plugin_config_from_any_node(RestNodes, NameVsn, [{Node, Err} | Errors])
×
664
    end.
665

666
do_put_config_internal(Key, Value, ConfLocation) when is_atom(Key) ->
667
    do_put_config_internal([Key], Value, ConfLocation);
141✔
668
do_put_config_internal(Path, Values, _ConfLocation = local) when is_list(Path) ->
669
    Opts = #{rawconf_with_defaults => true, override_to => cluster},
108✔
670
    %% Already in cluster_rpc, don't use emqx_conf:update, dead calls
671
    case emqx:update_config([?CONF_ROOT | Path], bin_key(Values), Opts) of
108✔
672
        {ok, _} -> ok;
102✔
673
        Error -> Error
6✔
674
    end;
675
do_put_config_internal(Path, Values, _ConfLocation = global) when is_list(Path) ->
676
    Opts = #{rawconf_with_defaults => true, override_to => cluster},
38✔
677
    case emqx_conf:update([?CONF_ROOT | Path], bin_key(Values), Opts) of
38✔
678
        {ok, _} -> ok;
38✔
679
        Error -> Error
×
680
    end.
681

682
%%--------------------------------------------------------------------
683
%% `emqx_config_handler' API
684
%%--------------------------------------------------------------------
685

686
post_config_update([?CONF_ROOT], _Req, #{states := NewStates}, #{states := OldStates}, _Envs) ->
687
    NewStatesIndex = maps:from_list([{NV, S} || S = #{name_vsn := NV} <- NewStates]),
2✔
688
    OldStatesIndex = maps:from_list([{NV, S} || S = #{name_vsn := NV} <- OldStates]),
2✔
689
    #{changed := Changed} = emqx_utils_maps:diff_maps(NewStatesIndex, OldStatesIndex),
2✔
690
    maps:foreach(fun enable_disable_plugin/2, Changed),
2✔
691
    ok;
2✔
692
post_config_update(_Path, _Req, _NewConf, _OldConf, _Envs) ->
693
    ok.
×
694

695
enable_disable_plugin(NameVsn, {#{enable := true}, #{enable := false}}) ->
696
    %% errors are already logged in this fn
697
    _ = ensure_stopped(NameVsn),
1✔
698
    ok;
1✔
699
enable_disable_plugin(NameVsn, {#{enable := false}, #{enable := true}}) ->
700
    %% errors are already logged in this fn
701
    _ = ensure_started(NameVsn),
1✔
702
    ok;
1✔
703
enable_disable_plugin(_NameVsn, _Diff) ->
704
    ok.
×
705

706
%%--------------------------------------------------------------------
707
%% Helper functions
708
%%--------------------------------------------------------------------
709

710
put_configured(Configured) ->
711
    put_configured(Configured, _ConfLocation = local).
56✔
712

713
put_configured(Configured, ConfLocation) ->
714
    ok = do_put_config_internal(states, bin_key(Configured), ConfLocation).
125✔
715

716
configured() ->
717
    get_config_internal(states, []).
1,709✔
718

719
for_plugins(ActionFun) ->
720
    case lists:flatmap(ActionFun, configured()) of
1,514✔
721
        [] ->
722
            ok;
1,513✔
723
        Errors ->
724
            ErrMeta = #{function => ActionFun, errors => Errors},
1✔
725
            ?tp(
1✔
726
                for_plugins_action_error_occurred,
727
                ErrMeta
728
            ),
729
            ?SLOG(error, ErrMeta#{msg => "for_plugins_action_error_occurred"}),
1✔
730
            ok
1✔
731
    end.
732

733
ensure_state(NameVsn) ->
734
    EnsureStateFun = fun(#{name_vsn := NV, enable := Bool}, AccIn) ->
34✔
735
        case NV of
18✔
736
            NameVsn ->
737
                %% Configured, using existed cluster config
738
                _ = ensure_state(NV, no_move, Bool, global),
15✔
739
                AccIn#{ensured => true};
15✔
740
            _ ->
741
                AccIn
3✔
742
        end
743
    end,
744
    case lists:foldl(EnsureStateFun, #{ensured => false}, configured()) of
34✔
745
        #{ensured := true} ->
746
            ok;
15✔
747
        #{ensured := false} ->
748
            ?SLOG(info, #{msg => "plugin_not_configured", name_vsn => NameVsn}),
19✔
749
            %% Clean installation, no config, ensure with `Enable = false`
750
            _ = ensure_state(NameVsn, no_move, false, global),
19✔
751
            ok
19✔
752
    end,
753
    ok.
34✔
754

755
ensure_local_config(NameVsn, Mode) ->
756
    case emqx_plugins_fs:ensure_config_dir(NameVsn) of
34✔
757
        ok ->
758
            %% get config from other nodes or get from tarball
759
            do_ensure_local_config(NameVsn, Mode);
34✔
760
        {error, _} = Error ->
761
            ?SLOG(warning, #{
×
762
                msg => "failed_to_ensure_config_dir", name_vsn => NameVsn, reason => Error
763
            }),
×
764
            Error
×
765
    end.
766

767
do_ensure_local_config(NameVsn, ?fresh_install) ->
768
    emqx_plugins_local_config:copy_default(NameVsn);
10✔
769
do_ensure_local_config(NameVsn, ?normal) ->
770
    Nodes = mria:running_nodes(),
24✔
771
    case get_plugin_config_from_any_node(Nodes, NameVsn, []) of
24✔
772
        {ok, Config} when is_map(Config) ->
773
            emqx_plugins_local_config:update(NameVsn, Config);
10✔
774
        {error, Reason} ->
775
            ?SLOG(warning, #{
14✔
776
                msg => "failed_to_get_plugin_config_from_cluster",
777
                name_vsn => NameVsn,
778
                reason => Reason
779
            }),
14✔
780
            emqx_plugins_local_config:copy_default(NameVsn)
14✔
781
    end.
782

783
configure_from_local_config(NameVsn, RunningSt) ->
784
    case validated_local_config(NameVsn) of
29✔
785
        {ok, NewConfig} ->
786
            OldConfig = get_cached_config(NameVsn),
29✔
787
            ok = notify_config_change(NameVsn, OldConfig, NewConfig, RunningSt),
29✔
788
            ok = put_cached_config(NameVsn, NewConfig);
29✔
789
        {error, Reason} ->
790
            ?SLOG(warning, #{
×
791
                msg => "failed_to_validate_plugin_config", name_vsn => NameVsn, reason => Reason
792
            }),
×
793
            ok
×
794
    end.
795

796
notify_config_change(_NameVsn, _OldConfig, _NewConfig, stopped) ->
797
    ok;
22✔
798
notify_config_change(NameVsn, OldConfig, NewConfig, RunningSt) when
799
    RunningSt =:= running orelse RunningSt =:= loaded
800
->
801
    %% NOTE
802
    %% The new config here is the local config, so it is
803
    %% * either vendored with the plugin;
804
    %% * or fetched from another node where it was validated by the plugin before being updated.
805
    %% In both cases, we do not expect the plugin to reject the config.
806
    %% Even if it does, there is little that can be done.
807
    %% So we ignore the result of the callback.
808
    _ = emqx_plugins_apps:on_config_changed(NameVsn, OldConfig, NewConfig),
7✔
809
    ok.
7✔
810

811
request_config_change(NameVsn, #{running_status := stopped}, _Config) ->
NEW
812
    {error, {plugin_apps_not_loaded, NameVsn}};
×
813
request_config_change(NameVsn, #{running_status := RunningSt}, Config) when
814
    RunningSt =:= running orelse RunningSt =:= loaded
815
->
816
    emqx_plugins_apps:on_config_changed(NameVsn, get_cached_config(NameVsn), Config).
18✔
817

818
load_config_schema(NameVsn) ->
819
    case emqx_plugins_fs:read_avsc_bin(NameVsn) of
34✔
820
        {ok, AvscBin} ->
821
            _ = emqx_plugins_serde:add_schema(bin(NameVsn), AvscBin),
1✔
822
            ok;
1✔
823
        {error, _} ->
824
            ok
33✔
825
    end.
826

827
validated_local_config(NameVsn) ->
828
    case emqx_plugins_local_config:read(NameVsn) of
29✔
829
        {ok, Config} ->
830
            case has_avsc(NameVsn) of
29✔
831
                true ->
832
                    case decode_plugin_config_map(NameVsn, Config) of
1✔
833
                        {ok, _} ->
834
                            {ok, Config};
1✔
835
                        {error, Reason} ->
836
                            ?SLOG(error, #{
×
837
                                msg => "plugin_config_validation_failed",
838
                                name_vsn => NameVsn,
839
                                reason => Reason
840
                            }),
×
841
                            {error, Reason}
×
842
                    end;
843
                false ->
844
                    {ok, Config}
28✔
845
            end;
846
        {error, Reason} ->
847
            ?SLOG(warning, #{
×
848
                msg => "failed_to_read_plugin_config_hocon", name_vsn => NameVsn, reason => Reason
849
            }),
×
850
            {error, Reason}
×
851
    end.
852

853
get_cached_config(NameVsn) ->
854
    get_cached_config(NameVsn, #{}).
47✔
855

856
get_cached_config(NameVsn, Default) ->
857
    persistent_term:get(?PLUGIN_CONFIG_PT_KEY(bin(NameVsn)), Default).
77✔
858

859
put_cached_config(NameVsn, Config) ->
860
    persistent_term:put(?PLUGIN_CONFIG_PT_KEY(bin(NameVsn)), Config).
38✔
861

862
delete_cached_config(NameVsn) ->
863
    _ = persistent_term:erase(?PLUGIN_CONFIG_PT_KEY(bin(NameVsn))),
39✔
864
    ok.
39✔
865

866
ensure_config_bin(AvroJsonMap) when is_map(AvroJsonMap) ->
867
    emqx_utils_json:encode(AvroJsonMap);
1✔
868
ensure_config_bin(AvroJsonBin) when is_binary(AvroJsonBin) ->
869
    AvroJsonBin.
×
870

871
bin_key(Map) when is_map(Map) ->
872
    maps:fold(fun(K, V, Acc) -> Acc#{bin(K) => V} end, #{}, Map);
189✔
873
bin_key(List = [#{} | _]) ->
874
    lists:map(fun(M) -> bin_key(M) end, List);
149✔
875
bin_key(Term) ->
876
    Term.
122✔
877

878
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
194✔
879
bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
361✔
880
bin(B) when is_binary(B) -> B.
261✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc