• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 14194765223

01 Apr 2025 11:46AM UTC coverage: 83.423%. First build
14194765223

Pull #14957

github

web-flow
Merge 94ec173ec into 1f2774e5a
Pull Request #14957: feat(emqx_plugins): respect plugin's `on_config_changed` callback response

69 of 84 new or added lines in 3 files covered. (82.14%)

62042 of 74370 relevant lines covered (83.42%)

16908.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.29
/apps/emqx_management/src/emqx_mgmt_api_plugins.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16
-module(emqx_mgmt_api_plugins).
17

18
-behaviour(minirest_api).
19

20
-include_lib("typerefl/include/types.hrl").
21
-include_lib("emqx/include/logger.hrl").
22
-include_lib("emqx_plugins/include/emqx_plugins.hrl").
23
-include_lib("erlavro/include/erlavro.hrl").
24

25
-export([
26
    api_spec/0,
27
    fields/1,
28
    paths/0,
29
    schema/1,
30
    namespace/0
31
]).
32

33
-export([
34
    list_plugins/2,
35
    upload_install/2,
36
    plugin/2,
37
    update_plugin/2,
38
    plugin_config/2,
39
    plugin_schema/2,
40
    update_boot_order/2
41
]).
42

43
-export([
44
    validate_name/1,
45
    validate_file_name/2,
46
    get_plugins/0,
47
    install_package/2,
48
    install_package_v4/2,
49
    delete_package/1,
50
    delete_package/2,
51
    describe_package/1,
52
    ensure_action/2,
53
    ensure_action/3,
54
    do_update_plugin_config/3,
55
    do_update_plugin_config_v4/2
56
]).
57

58
-define(NAME_RE, "^[A-Za-z]+\\w*\\-[\\w-.]*$").
59
-define(TAGS, [<<"Plugins">>]).
60

61
-define(CONTENT_PLUGIN, plugin).
62

63
namespace() ->
64
    "plugins".
5,508✔
65

66
api_spec() ->
67
    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
324✔
68

69
%% Don't change the path's order
70
paths() ->
71
    [
72
        "/plugins",
324✔
73
        "/plugins/:name",
74
        "/plugins/install",
75
        "/plugins/:name/:action",
76
        "/plugins/:name/config",
77
        "/plugins/:name/schema",
78
        "/plugins/:name/move"
79
    ].
80

81
schema("/plugins") ->
82
    #{
329✔
83
        'operationId' => list_plugins,
84
        get => #{
85
            summary => <<"List all installed plugins">>,
86
            description =>
87
                "Plugins are launched in top-down order.<br/>"
88
                "Use `POST /plugins/{name}/move` to change the boot order.",
89
            tags => ?TAGS,
90
            responses => #{
91
                200 => hoconsc:array(hoconsc:ref(plugin))
92
            }
93
        }
94
    };
95
schema("/plugins/install") ->
96
    #{
335✔
97
        'operationId' => upload_install,
98
        filter => fun ?MODULE:validate_file_name/2,
99
        post => #{
100
            summary => <<"Install a new plugin">>,
101
            description =>
102
                "Upload a plugin tarball (plugin-vsn.tar.gz)."
103
                "Follow [emqx-plugin-template](https://github.com/emqx/emqx-plugin-template) "
104
                "to develop plugin.",
105
            tags => ?TAGS,
106
            'requestBody' => #{
107
                content => #{
108
                    'multipart/form-data' => #{
109
                        schema => #{
110
                            type => object,
111
                            properties => #{
112
                                ?CONTENT_PLUGIN => #{type => string, format => binary}
113
                            }
114
                        },
115
                        encoding => #{?CONTENT_PLUGIN => #{'contentType' => 'application/gzip'}}
116
                    }
117
                }
118
            },
119
            responses => #{
120
                204 => <<"Install plugin successfully">>,
121
                400 => emqx_dashboard_swagger:error_codes(
122
                    [
123
                        'UNEXPECTED_ERROR',
124
                        'ALREADY_INSTALLED',
125
                        'BAD_PLUGIN_INFO',
126
                        'BAD_FORM_DATA',
127
                        'FORBIDDEN'
128
                    ]
129
                )
130
            }
131
        }
132
    };
133
schema("/plugins/:name") ->
134
    #{
334✔
135
        'operationId' => plugin,
136
        get => #{
137
            summary => <<"Get a plugin description">>,
138
            description => "Describe a plugin according to its `release.json` and `README.md`.",
139
            tags => ?TAGS,
140
            parameters => [hoconsc:ref(name)],
141
            responses => #{
142
                200 => hoconsc:ref(plugin),
143
                404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>)
144
            }
145
        },
146
        delete => #{
147
            summary => <<"Delete a plugin">>,
148
            description => "Uninstalls a previously uploaded plugin package.",
149
            tags => ?TAGS,
150
            parameters => [hoconsc:ref(name)],
151
            responses => #{
152
                204 => <<"Uninstall successfully">>,
153
                400 => emqx_dashboard_swagger:error_codes(['PARAM_ERROR'], <<"Bad parameter">>),
154
                404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>)
155
            }
156
        }
157
    };
158
schema("/plugins/:name/:action") ->
159
    #{
330✔
160
        'operationId' => update_plugin,
161
        put => #{
162
            summary => <<"Trigger action on an installed plugin">>,
163
            description =>
164
                "start/stop a installed plugin.<br/>"
165
                "- **start**: start the plugin.<br/>"
166
                "- **stop**: stop the plugin.<br/>",
167
            tags => ?TAGS,
168
            parameters => [
169
                hoconsc:ref(name),
170
                {action, hoconsc:mk(hoconsc:enum([start, stop]), #{desc => "Action", in => path})}
171
            ],
172
            responses => #{
173
                204 => <<"Trigger action successfully">>,
174
                404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>)
175
            }
176
        }
177
    };
178
schema("/plugins/:name/config") ->
179
    #{
330✔
180
        'operationId' => plugin_config,
181
        get => #{
182
            summary => <<"Get plugin config">>,
183
            description =>
184
                "Get plugin config. Config schema is defined by user's schema.avsc file.<br/>",
185
            tags => ?TAGS,
186
            parameters => [hoconsc:ref(name)],
187
            responses => #{
188
                %% avro data, json encoded
189
                200 => hoconsc:mk(binary()),
190
                400 => emqx_dashboard_swagger:error_codes(
191
                    ['BAD_CONFIG'], <<"Plugin Config Not Found">>
192
                ),
193
                404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>)
194
            }
195
        },
196
        put => #{
197
            summary =>
198
                <<"Update plugin config">>,
199
            description =>
200
                "Update plugin config. Config schema defined by user's schema.avsc file.<br/>",
201
            tags => ?TAGS,
202
            parameters => [hoconsc:ref(name)],
203
            'requestBody' => #{
204
                content => #{
205
                    'application/json' => #{
206
                        schema => #{
207
                            type => object
208
                        }
209
                    }
210
                }
211
            },
212
            responses => #{
213
                204 => <<"Config updated successfully">>,
214
                400 => emqx_dashboard_swagger:error_codes(
215
                    ['BAD_CONFIG', 'UNEXPECTED_ERROR'], <<"Update plugin config failed">>
216
                ),
217
                404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>),
218
                500 => emqx_dashboard_swagger:error_codes(['INTERNAL_ERROR'], <<"Internal Error">>)
219
            }
220
        }
221
    };
222
schema("/plugins/:name/schema") ->
223
    #{
324✔
224
        'operationId' => plugin_schema,
225
        get => #{
226
            summary => <<"Get installed plugin's AVRO schema">>,
227
            description => "Get plugin's config AVRO schema.",
228
            tags => ?TAGS,
229
            parameters => [hoconsc:ref(name)],
230
            responses => #{
231
                %% avro schema and i18n json object
232
                200 => hoconsc:mk(binary()),
233
                404 => emqx_dashboard_swagger:error_codes(
234
                    ['NOT_FOUND', 'FILE_NOT_EXISTED'],
235
                    <<"Plugin Not Found or Plugin not given a schema file">>
236
                )
237
            }
238
        }
239
    };
240
schema("/plugins/:name/move") ->
241
    #{
328✔
242
        'operationId' => update_boot_order,
243
        post => #{
244
            summary => <<"Move plugin within plugin hierarchy">>,
245
            description => "Setting the boot order of plugins.",
246
            tags => ?TAGS,
247
            parameters => [hoconsc:ref(name)],
248
            'requestBody' => move_request_body(),
249
            responses => #{
250
                204 => <<"Boot order changed successfully">>,
251
                400 => emqx_dashboard_swagger:error_codes(['MOVE_FAILED'], <<"Move failed">>)
252
            }
253
        }
254
    }.
255

256
fields(plugin) ->
257
    [
258
        {name,
324✔
259
            hoconsc:mk(
260
                binary(),
261
                #{
262
                    desc => "Name-Vsn: without .tar.gz",
263
                    validator => fun ?MODULE:validate_name/1,
264
                    required => true,
265
                    example => "emqx_plugin_template-5.0-rc.1"
266
                }
267
            )},
268
        {author, hoconsc:mk(list(string()), #{example => [<<"EMQX Team">>]})},
269
        {builder, hoconsc:ref(?MODULE, builder)},
270
        {built_on_otp_release, hoconsc:mk(string(), #{example => "24"})},
271
        {compatibility, hoconsc:mk(map(), #{example => #{<<"emqx">> => <<"~>5.0">>}})},
272
        {git_commit_or_build_date,
273
            hoconsc:mk(string(), #{
274
                example => "2021-12-25",
275
                desc =>
276
                    "Last git commit date by `git log -1 --pretty=format:'%cd' "
277
                    "--date=format:'%Y-%m-%d`.\n"
278
                    " If the last commit date is not available, the build date will be presented."
279
            })},
280
        {functionality, hoconsc:mk(hoconsc:array(string()), #{example => [<<"Demo">>]})},
281
        {git_ref, hoconsc:mk(string(), #{example => "ddab50fafeed6b1faea70fc9ffd8c700d7e26ec1"})},
282
        {metadata_vsn, hoconsc:mk(string(), #{example => "0.1.0"})},
283
        {rel_vsn,
284
            hoconsc:mk(
285
                binary(),
286
                #{
287
                    desc => "Plugins release version",
288
                    required => true,
289
                    example => <<"5.0-rc.1">>
290
                }
291
            )},
292
        {rel_apps,
293
            hoconsc:mk(
294
                hoconsc:array(binary()),
295
                #{
296
                    desc => "Aplications in plugin.",
297
                    required => true,
298
                    example => [<<"emqx_plugin_template-5.0.0">>, <<"map_sets-1.1.0">>]
299
                }
300
            )},
301
        {repo, hoconsc:mk(string(), #{example => "https://github.com/emqx/emqx-plugin-template"})},
302
        {description,
303
            hoconsc:mk(
304
                binary(),
305
                #{
306
                    desc => "Plugin description.",
307
                    required => true,
308
                    example => "This is an demo plugin description"
309
                }
310
            )},
311
        {running_status,
312
            hoconsc:mk(
313
                hoconsc:array(hoconsc:ref(running_status)),
314
                #{required => true}
315
            )},
316
        {readme,
317
            hoconsc:mk(binary(), #{
318
                example => "This is an demo plugin.",
319
                desc => "only return when `GET /plugins/{name}`.",
320
                required => false
321
            })}
322
    ];
323
fields(name) ->
324
    [
325
        {name,
350✔
326
            hoconsc:mk(
327
                binary(),
328
                #{
329
                    desc => list_to_binary(?NAME_RE),
330
                    example => "emqx_plugin_template-5.0-rc.1",
331
                    in => path,
332
                    validator => fun ?MODULE:validate_name/1
333
                }
334
            )}
335
    ];
336
fields(builder) ->
337
    [
338
        {contact, hoconsc:mk(string(), #{example => "emqx-support@emqx.io"})},
324✔
339
        {name, hoconsc:mk(string(), #{example => "EMQX Team"})},
340
        {website, hoconsc:mk(string(), #{example => "www.emqx.com"})}
341
    ];
342
fields(position) ->
343
    [
344
        {position,
328✔
345
            hoconsc:mk(
346
                hoconsc:union([front, rear, binary()]),
347
                #{
348
                    desc =>
349
                        ""
350
                        "\n"
351
                        "             Enable auto-boot at position in the boot list, where Position could be\n"
352
                        "             'front', 'rear', or 'before:other-vsn', 'after:other-vsn'\n"
353
                        "             to specify a relative position.\n"
354
                        "            "
355
                        "",
356
                    required => false
357
                }
358
            )}
359
    ];
360
fields(running_status) ->
361
    [
362
        {node, hoconsc:mk(string(), #{example => "emqx@127.0.0.1"})},
324✔
363
        {status,
364
            hoconsc:mk(hoconsc:enum([running, stopped]), #{
365
                desc =>
366
                    "Install plugin status at runtime<br/>"
367
                    "1. running: plugin is running.<br/>"
368
                    "2. stopped: plugin is stopped.<br/>"
369
            })}
370
    ].
371

372
move_request_body() ->
373
    emqx_dashboard_swagger:schema_with_examples(
328✔
374
        hoconsc:ref(?MODULE, position),
375
        #{
376
            move_to_front => #{
377
                summary => <<"move plugin on the front">>,
378
                value => #{position => <<"front">>}
379
            },
380
            move_to_rear => #{
381
                summary => <<"move plugin on the rear">>,
382
                value => #{position => <<"rear">>}
383
            },
384
            move_to_before => #{
385
                summary => <<"move plugin before other plugins">>,
386
                value => #{position => <<"before:emqx_plugin_demo-5.1-rc.2">>}
387
            },
388
            move_to_after => #{
389
                summary => <<"move plugin after other plugins">>,
390
                value => #{position => <<"after:emqx_plugin_demo-5.1-rc.2">>}
391
            }
392
        }
393
    ).
394

395
validate_name(Name) ->
396
    NameLen = byte_size(Name),
36✔
397
    case NameLen > 0 andalso NameLen =< 256 of
36✔
398
        true ->
399
            case re:run(Name, ?NAME_RE) of
36✔
400
                nomatch ->
401
                    {
×
402
                        error,
403
                        "Name should be an application name"
404
                        " (starting with a letter, containing letters, digits and underscores)"
405
                        " followed with a dash and a version string "
406
                        " (can contain letters, digits, dots, and dashes), "
407
                        " e.g. emqx_plugin_template-5.0-rc.1"
408
                    };
409
                _ ->
410
                    ok
36✔
411
            end;
412
        false ->
413
            {error, "Name Length must =< 256"}
×
414
    end.
415

416
validate_file_name(#{body := #{<<"plugin">> := Plugin}} = Params, _Meta) when is_map(Plugin) ->
417
    [{FileName, Bin}] = maps:to_list(maps:without([type], Plugin)),
10✔
418
    NameVsn = string:trim(FileName, trailing, ".tar.gz"),
10✔
419
    case validate_name(NameVsn) of
10✔
420
        ok ->
421
            {ok, Params#{name => NameVsn, bin => Bin}};
10✔
422
        {error, Reason} ->
423
            {400, #{
×
424
                code => 'BAD_PLUGIN_INFO',
425
                message => iolist_to_binary(["Bad plugin file name: ", FileName, ". ", Reason])
426
            }}
427
    end;
428
validate_file_name(_Params, _Meta) ->
429
    {400, #{
1✔
430
        code => 'BAD_FORM_DATA',
431
        message =>
432
            <<"form-data should be `plugin=@packagename-vsn.tar.gz;type=application/x-gzip`">>
433
    }}.
434

435
%% API CallBack Begin
436
list_plugins(get, _) ->
437
    Nodes = emqx:running_nodes(),
12✔
438
    {Plugins, []} = emqx_mgmt_api_plugins_proto_v4:get_plugins(Nodes),
12✔
439
    {200, format_plugins(Plugins)}.
12✔
440

441
get_plugins() ->
442
    {node(), emqx_plugins:list()}.
19✔
443

444
upload_install(post, #{name := NameVsn, bin := Bin}) ->
445
    case emqx_plugins:describe(NameVsn) of
10✔
446
        {error, #{msg := "bad_info_file", reason := {enoent, _Path}}} ->
447
            case emqx_plugins:is_package_present(NameVsn) of
10✔
448
                false ->
449
                    install_package_on_nodes(NameVsn, Bin);
10✔
450
                {true, TarGzs} ->
451
                    %% TODO
452
                    %% What if a tar file is present but is not unpacked, i.e.
453
                    %% the plugin is not fully installed?
454
                    {400, #{
×
455
                        code => 'ALREADY_INSTALLED',
456
                        message => iolist_to_binary(io_lib:format("~p already installed", [TarGzs]))
457
                    }}
458
            end;
459
        {ok, _} ->
460
            {400, #{
×
461
                code => 'ALREADY_INSTALLED',
462
                message => iolist_to_binary(io_lib:format("~p is already installed", [NameVsn]))
463
            }}
464
    end;
465
upload_install(post, #{}) ->
466
    {400, #{
×
467
        code => 'BAD_FORM_DATA',
468
        message =>
469
            <<"form-data should be `plugin=@packagename-vsn.tar.gz;type=application/x-gzip`">>
470
    }}.
471

472
install_package_on_nodes(NameVsn, Bin) ->
473
    case emqx_plugins:is_allowed_installation(NameVsn) of
10✔
474
        true ->
475
            do_install_package_on_nodes(NameVsn, Bin);
7✔
476
        false ->
477
            Msg = iolist_to_binary([
3✔
478
                <<"Package is not allowed installation;">>,
479
                <<" first allow it to be installed by running:">>,
480
                <<" `emqx ctl plugins allow ">>,
481
                NameVsn,
482
                <<"`">>
483
            ]),
484
            {403, #{code => 'FORBIDDEN', message => Msg}}
3✔
485
    end.
486

487
do_install_package_on_nodes(NameVsn, Bin) ->
488
    %% TODO: handle bad nodes
489
    Nodes = emqx:running_nodes(),
7✔
490
    {[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v4:install_package(Nodes, NameVsn, Bin),
7✔
491
    case lists:filter(fun(R) -> R =/= ok end, Res) of
7✔
492
        [] ->
493
            {204};
6✔
494
        Filtered ->
495
            %% crash if we have unexpected errors or results
496
            [] = lists:filter(
1✔
497
                fun
498
                    ({error, {failed, _}}) -> true;
×
499
                    ({error, _}) -> false
1✔
500
                end,
501
                Filtered
502
            ),
503
            Reason =
1✔
504
                case hd(Filtered) of
505
                    {error, #{msg := Reason0}} -> Reason0;
1✔
506
                    {error, #{reason := Reason0}} -> Reason0
×
507
                end,
508
            {400, #{
1✔
509
                code => 'BAD_PLUGIN_INFO',
510
                message => iolist_to_binary([bin(Reason), ": ", NameVsn])
511
            }}
512
    end.
513

514
plugin(get, #{bindings := #{name := NameVsn}}) ->
515
    Nodes = emqx:running_nodes(),
5✔
516
    {Plugins, _} = emqx_mgmt_api_plugins_proto_v4:describe_package(Nodes, NameVsn),
5✔
517
    case format_plugins(Plugins) of
5✔
518
        [Plugin] -> {200, Plugin};
5✔
519
        [] -> {404, #{code => 'NOT_FOUND', message => NameVsn}}
×
520
    end;
521
plugin(delete, #{bindings := #{name := NameVsn}}) ->
522
    Res = emqx_mgmt_api_plugins_proto_v4:delete_package(NameVsn),
5✔
523
    return(204, Res).
5✔
524

525
update_plugin(put, #{bindings := #{name := NameVsn, action := Action}}) ->
526
    Res = emqx_mgmt_api_plugins_proto_v4:ensure_action(NameVsn, Action),
6✔
527
    return(204, Res).
6✔
528

529
plugin_config(get, #{bindings := #{name := NameVsn}}) ->
530
    case emqx_plugins:describe(NameVsn) of
×
531
        {ok, _} ->
532
            case emqx_plugins:get_config(NameVsn, ?plugin_conf_not_found) of
×
533
                Config when is_map(Config) ->
534
                    {200, #{<<"content-type">> => <<"'application/json'">>}, Config};
×
535
                ?plugin_conf_not_found ->
536
                    {400, #{
×
537
                        code => 'BAD_CONFIG',
538
                        message => <<"Plugin Config Not Found">>
539
                    }}
540
            end;
541
        _ ->
542
            {404, plugin_not_found_msg()}
×
543
    end;
544
plugin_config(put, #{bindings := #{name := NameVsn}, body := AvroJsonMap}) ->
545
    Nodes = emqx:running_nodes(),
6✔
546
    case emqx_plugins:describe(NameVsn) of
6✔
547
        {ok, _} ->
548
            case emqx_plugins:decode_plugin_config_map(NameVsn, AvroJsonMap) of
6✔
549
                {ok, ?plugin_without_config_schema} ->
550
                    %% no plugin avro schema, just put the json map as-is
551
                    Res = emqx_mgmt_api_plugins_proto_v4:update_plugin_config(
6✔
552
                        Nodes, NameVsn, AvroJsonMap
553
                    ),
554
                    return_config_update_result(Res);
6✔
555
                {ok, _AvroValue} ->
556
                    %% cluster call with config in map (binary key-value)
NEW
557
                    Res = emqx_mgmt_api_plugins_proto_v4:update_plugin_config(
×
558
                        Nodes, NameVsn, AvroJsonMap
559
                    ),
NEW
560
                    return_config_update_result(Res);
×
561
                {error, Reason} ->
562
                    {400, #{
×
563
                        code => 'BAD_CONFIG',
564
                        message => readable_error_msg(Reason)
565
                    }}
566
            end;
567
        _ ->
568
            {404, plugin_not_found_msg()}
×
569
    end.
570

571
plugin_schema(get, #{bindings := #{name := NameVsn}}) ->
572
    case emqx_plugins:describe(NameVsn) of
×
573
        {ok, _Plugin} ->
574
            {200, format_plugin_avsc_and_i18n(NameVsn)};
×
575
        _ ->
576
            {404, plugin_not_found_msg()}
×
577
    end.
578

579
update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
580
    case parse_position(Body, Name) of
4✔
581
        {error, Reason} ->
582
            {400, #{code => 'BAD_POSITION', message => Reason}};
×
583
        Position ->
584
            case emqx_plugins:ensure_enabled(Name, Position, global) of
4✔
585
                ok ->
586
                    {204};
4✔
587
                {error, Reason} ->
588
                    {400, #{
×
589
                        code => 'MOVE_FAILED',
590
                        message => readable_error_msg(Reason)
591
                    }}
592
            end
593
    end.
594

595
%% API CallBack End
596

597
%% For RPC upload_install/2
598
install_package(FileName, Bin) ->
599
    NameVsn = string:trim(FileName, trailing, ".tar.gz"),
×
600
    install_package_v4(NameVsn, Bin).
×
601

602
install_package_v4(NameVsn, Bin) ->
603
    ok = emqx_plugins:write_package(NameVsn, Bin),
9✔
604
    case emqx_plugins:ensure_installed(NameVsn, ?fresh_install) of
9✔
605
        {error, #{reason := plugin_not_found}} = NotFound ->
606
            NotFound;
×
607
        {error, Reason} = Error ->
608
            ?SLOG(error, Reason#{msg => "failed_to_install_plugin"}),
1✔
609
            _ = emqx_plugins:delete_package(NameVsn),
1✔
610
            Error;
1✔
611
        Result ->
612
            Result
8✔
613
    end.
614

615
%% For RPC plugin get
616
describe_package(NameVsn) ->
617
    Node = node(),
5✔
618
    case emqx_plugins:describe(NameVsn) of
5✔
619
        {ok, Plugin} -> {Node, [Plugin]};
5✔
620
        _ -> {Node, []}
×
621
    end.
622

623
%% Tip: Don't delete delete_package/1, use before v571 cluster_rpc
624
delete_package(NameVsn) ->
625
    delete_package(NameVsn, #{}).
×
626

627
%% For RPC plugin delete
628
delete_package(NameVsn, _Opts) ->
629
    _ = emqx_plugins:forget_allowed_installation(NameVsn),
5✔
630
    case emqx_plugins:ensure_stopped(NameVsn) of
5✔
631
        ok ->
632
            _ = emqx_plugins:ensure_disabled(NameVsn),
4✔
633
            _ = emqx_plugins:ensure_uninstalled(NameVsn),
4✔
634
            _ = emqx_plugins:delete_package(NameVsn),
4✔
635
            ok;
4✔
636
        Error ->
637
            Error
1✔
638
    end.
639

640
%% Tip: Don't delete ensure_action/2, use before v571 cluster_rpc
641
ensure_action(Name, Action) ->
642
    ensure_action(Name, Action, #{}).
×
643

644
%% for RPC plugin update
645
%% TODO: catch thrown error to return 400
646
%% - plugin_not_found
647
%% - otp vsn assertion failed
648

649
ensure_action(Name, start, _Opts) ->
650
    _ = emqx_plugins:ensure_started(Name),
6✔
651
    _ = emqx_plugins:ensure_enabled(Name),
6✔
652
    ok;
6✔
653
ensure_action(Name, stop, _Opts) ->
654
    _ = emqx_plugins:ensure_stopped(Name),
2✔
655
    _ = emqx_plugins:ensure_disabled(Name),
2✔
656
    ok;
2✔
657
ensure_action(Name, restart, _Opts) ->
658
    _ = emqx_plugins:ensure_enabled(Name),
×
659
    _ = emqx_plugins:restart(Name),
×
660
    ok.
×
661

662
%% for RPC plugin avro encoded config update
663
-spec do_update_plugin_config(name_vsn(), map() | binary(), any()) ->
664
    ok.
665
do_update_plugin_config(NameVsn, AvroJsonMap, _AvroValue) ->
NEW
666
    case do_update_plugin_config_v4(NameVsn, AvroJsonMap) of
×
NEW
667
        ok -> ok;
×
NEW
668
        {error, Reason} -> error(Reason)
×
669
    end.
670

671
-spec do_update_plugin_config_v4(name_vsn(), map() | binary()) ->
672
    ok | {error, term()}.
673
do_update_plugin_config_v4(NameVsn, AvroJsonMap) when is_binary(AvroJsonMap) ->
674
    do_update_plugin_config_v4(NameVsn, emqx_utils_json:decode(AvroJsonMap));
×
675
do_update_plugin_config_v4(NameVsn, AvroJsonMap) ->
676
    emqx_plugins:update_config(NameVsn, AvroJsonMap).
6✔
677

678
%%--------------------------------------------------------------------
679
%% Helper functions
680
%%--------------------------------------------------------------------
681

682
return(Code, ok) ->
683
    {Code};
10✔
684
return(_, {error, #{msg := Msg, reason := {enoent, Path} = Reason}}) ->
685
    ?SLOG(error, #{msg => Msg, reason => Reason}),
1✔
686
    {404, #{code => 'NOT_FOUND', message => iolist_to_binary([Path, " does not exist"])}};
1✔
687
return(_, {error, Reason}) ->
688
    {400, #{code => 'PARAM_ERROR', message => readable_error_msg(Reason)}}.
×
689

690
return_config_update_result({Responses, BadNodes}) ->
691
    ResponseErrors = lists:filter(fun(Response) -> Response =/= ok end, Responses),
6✔
692
    NodeErrors = [{badnode, Node} || Node <- BadNodes],
6✔
693
    case {ResponseErrors, NodeErrors} of
6✔
694
        {[], []} ->
695
            {204};
3✔
696
        {ResponseErrors, []} ->
697
            {400, #{code => 'BAD_CONFIG', message => readable_error_msg(ResponseErrors)}};
3✔
698
        {ResponseErrors, NodeErrors} ->
NEW
699
            {500, #{
×
700
                code => 'INTERNAL_ERROR',
701
                message => readable_error_msg(ResponseErrors ++ NodeErrors)
702
            }}
703
    end.
704

705
plugin_not_found_msg() ->
706
    #{
×
707
        code => 'NOT_FOUND',
708
        message => <<"Plugin Not Found">>
709
    }.
710

711
readable_error_msg(Msg) ->
712
    emqx_utils:readable_error_msg(Msg).
3✔
713

714
parse_position(#{<<"position">> := <<"front">>}, _) ->
715
    front;
1✔
716
parse_position(#{<<"position">> := <<"rear">>}, _) ->
717
    rear;
1✔
718
parse_position(#{<<"position">> := <<"before:", Name/binary>>}, Name) ->
719
    {error, <<"Invalid parameter. Cannot be placed before itself">>};
×
720
parse_position(#{<<"position">> := <<"after:", Name/binary>>}, Name) ->
721
    {error, <<"Invalid parameter. Cannot be placed after itself">>};
×
722
parse_position(#{<<"position">> := <<"before:">>}, _Name) ->
723
    {error, <<"Invalid parameter. Cannot be placed before an empty target">>};
×
724
parse_position(#{<<"position">> := <<"after:">>}, _Name) ->
725
    {error, <<"Invalid parameter. Cannot be placed after an empty target">>};
×
726
parse_position(#{<<"position">> := <<"before:", Before/binary>>}, _Name) ->
727
    {before, binary_to_list(Before)};
1✔
728
parse_position(#{<<"position">> := <<"after:", After/binary>>}, _Name) ->
729
    {behind, binary_to_list(After)};
1✔
730
parse_position(Position, _) ->
731
    {error, iolist_to_binary(io_lib:format("~p", [Position]))}.
×
732

733
format_plugins(List) ->
734
    StatusMap = aggregate_status(List),
17✔
735
    SortFun = fun({_N1, P1}, {_N2, P2}) -> length(P1) > length(P2) end,
17✔
736
    SortList = lists:sort(SortFun, List),
17✔
737
    pack_status_in_order(SortList, StatusMap).
17✔
738

739
pack_status_in_order(List, StatusMap) ->
740
    {Plugins, _} =
17✔
741
        lists:foldl(
742
            fun({_Node, PluginList}, {Acc, StatusAcc}) ->
743
                pack_plugin_in_order(PluginList, Acc, StatusAcc)
24✔
744
            end,
745
            {[], StatusMap},
746
            List
747
        ),
748
    lists:reverse(Plugins).
17✔
749

750
pack_plugin_in_order([], Acc, StatusAcc) ->
751
    {Acc, StatusAcc};
17✔
752
pack_plugin_in_order(_, Acc, StatusAcc) when map_size(StatusAcc) =:= 0 -> {Acc, StatusAcc};
7✔
753
pack_plugin_in_order([Plugin0 | Plugins], Acc, StatusAcc) ->
754
    #{name := Name, rel_vsn := Vsn} = Plugin0,
22✔
755
    case maps:find({Name, Vsn}, StatusAcc) of
22✔
756
        {ok, Status} ->
757
            Plugin1 = maps:without([running_status, config_status], Plugin0),
22✔
758
            Plugins2 = Plugin1#{running_status => Status},
22✔
759
            NewStatusAcc = maps:remove({Name, Vsn}, StatusAcc),
22✔
760
            pack_plugin_in_order(Plugins, [Plugins2 | Acc], NewStatusAcc);
22✔
761
        error ->
762
            pack_plugin_in_order(Plugins, Acc, StatusAcc)
×
763
    end.
764

765
aggregate_status(List) -> aggregate_status(List, #{}).
17✔
766

767
aggregate_status([], Acc) ->
768
    Acc;
17✔
769
aggregate_status([{Node, Plugins} | List], Acc) ->
770
    NewAcc =
24✔
771
        lists:foldl(
772
            fun(Plugin, SubAcc) ->
773
                #{name := Name, rel_vsn := Vsn} = Plugin,
34✔
774
                Key = {Name, Vsn},
34✔
775
                Value = #{node => Node, status => plugin_status(Plugin)},
34✔
776
                SubAcc#{Key => [Value | maps:get(Key, Acc, [])]}
34✔
777
            end,
778
            Acc,
779
            Plugins
780
        ),
781
    aggregate_status(List, NewAcc).
24✔
782

783
-dialyzer({nowarn_function, format_plugin_avsc_and_i18n/1}).
784
format_plugin_avsc_and_i18n(NameVsn) ->
785
    case emqx_release:edition() of
×
786
        ee ->
787
            #{
×
788
                avsc => or_null(emqx_plugins:plugin_schema(NameVsn)),
789
                i18n => or_null(emqx_plugins:plugin_i18n(NameVsn))
790
            };
791
        ce ->
792
            #{avsc => null, i18n => null}
×
793
    end.
794

795
or_null({ok, Value}) -> Value;
×
796
or_null(_) -> null.
×
797

798
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
×
799
bin(L) when is_list(L) -> list_to_binary(L);
1✔
800
bin(B) when is_binary(B) -> B.
×
801

802
% running_status: running loaded, stopped
803
%% config_status: not_configured disable enable
804
plugin_status(#{running_status := running}) -> running;
19✔
805
plugin_status(_) -> stopped.
15✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc