• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 14222587000

02 Apr 2025 02:46PM UTC coverage: 83.451%. First build
14222587000

Pull #14972

github

web-flow
Merge d25c1943f into eff0dc94c
Pull Request #14972: feat(plugins): implement config upload/download

17 of 19 new or added lines in 2 files covered. (89.47%)

62145 of 74469 relevant lines covered (83.45%)

16805.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.54
/apps/emqx_management/src/emqx_mgmt_api_plugins.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16
-module(emqx_mgmt_api_plugins).
17

18
-behaviour(minirest_api).
19

20
-include_lib("typerefl/include/types.hrl").
21
-include_lib("emqx/include/logger.hrl").
22
-include_lib("emqx_plugins/include/emqx_plugins.hrl").
23
-include_lib("erlavro/include/erlavro.hrl").
24

25
-export([
26
    api_spec/0,
27
    fields/1,
28
    paths/0,
29
    schema/1,
30
    namespace/0
31
]).
32

33
-export([
34
    list_plugins/2,
35
    upload_install/2,
36
    plugin/2,
37
    update_plugin/2,
38
    plugin_config/2,
39
    upload_plugin_config/2,
40
    download_plugin_config/2,
41
    plugin_schema/2,
42
    update_boot_order/2
43
]).
44

45
-export([
46
    validate_name/1,
47
    validate_file_name/2,
48
    get_plugins/0,
49
    install_package/2,
50
    install_package_v4/2,
51
    delete_package/1,
52
    delete_package/2,
53
    describe_package/1,
54
    ensure_action/2,
55
    ensure_action/3,
56
    do_update_plugin_config/3,
57
    do_update_plugin_config_v4/2
58
]).
59

60
-define(NAME_RE, "^[A-Za-z]+\\w*\\-[\\w-.]*$").
61
-define(TAGS, [<<"Plugins">>]).
62

63
-define(CONTENT_PLUGIN, plugin).
64

65
namespace() ->
66
    "plugins".
6,825✔
67

68
api_spec() ->
69
    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
325✔
70

71
%% Don't change the path's order
72
paths() ->
73
    [
74
        "/plugins",
325✔
75
        "/plugins/:name",
76
        "/plugins/install",
77
        "/plugins/:name/:action",
78
        "/plugins/:name/config",
79
        "/plugins/:name/config/download",
80
        "/plugins/:name/config/upload",
81
        "/plugins/:name/schema",
82
        "/plugins/:name/move"
83
    ].
84

85
schema("/plugins") ->
86
    #{
331✔
87
        'operationId' => list_plugins,
88
        get => #{
89
            summary => <<"List all installed plugins">>,
90
            description =>
91
                "Plugins are launched in top-down order.<br/>"
92
                "Use `POST /plugins/{name}/move` to change the boot order.",
93
            tags => ?TAGS,
94
            responses => #{
95
                200 => hoconsc:array(hoconsc:ref(plugin))
96
            }
97
        }
98
    };
99
schema("/plugins/install") ->
100
    #{
338✔
101
        'operationId' => upload_install,
102
        filter => fun ?MODULE:validate_file_name/2,
103
        post => #{
104
            summary => <<"Install a new plugin">>,
105
            description =>
106
                "Upload a plugin tarball (plugin-vsn.tar.gz)."
107
                "Follow [emqx-plugin-template](https://github.com/emqx/emqx-plugin-template) "
108
                "to develop plugin.",
109
            tags => ?TAGS,
110
            'requestBody' => #{
111
                content => #{
112
                    'multipart/form-data' => #{
113
                        schema => #{
114
                            type => object,
115
                            properties => #{
116
                                ?CONTENT_PLUGIN => #{type => string, format => binary}
117
                            }
118
                        },
119
                        encoding => #{?CONTENT_PLUGIN => #{'contentType' => 'application/gzip'}}
120
                    }
121
                }
122
            },
123
            responses => #{
124
                204 => <<"Install plugin successfully">>,
125
                400 => emqx_dashboard_swagger:error_codes(
126
                    [
127
                        'UNEXPECTED_ERROR',
128
                        'ALREADY_INSTALLED',
129
                        'BAD_PLUGIN_INFO',
130
                        'BAD_FORM_DATA',
131
                        'FORBIDDEN'
132
                    ]
133
                )
134
            }
135
        }
136
    };
137
schema("/plugins/:name") ->
138
    #{
341✔
139
        'operationId' => plugin,
140
        get => #{
141
            summary => <<"Get a plugin description">>,
142
            description => "Describe a plugin according to its `release.json` and `README.md`.",
143
            tags => ?TAGS,
144
            parameters => [hoconsc:ref(name)],
145
            responses => #{
146
                200 => hoconsc:ref(plugin),
147
                404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>)
148
            }
149
        },
150
        delete => #{
151
            summary => <<"Delete a plugin">>,
152
            description => "Uninstalls a previously uploaded plugin package.",
153
            tags => ?TAGS,
154
            parameters => [hoconsc:ref(name)],
155
            responses => #{
156
                204 => <<"Uninstall successfully">>,
157
                400 => emqx_dashboard_swagger:error_codes(['PARAM_ERROR'], <<"Bad parameter">>),
158
                404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>)
159
            }
160
        }
161
    };
162
schema("/plugins/:name/:action") ->
163
    #{
333✔
164
        'operationId' => update_plugin,
165
        put => #{
166
            summary => <<"Trigger action on an installed plugin">>,
167
            description =>
168
                "start/stop a installed plugin.<br/>"
169
                "- **start**: start the plugin.<br/>"
170
                "- **stop**: stop the plugin.<br/>",
171
            tags => ?TAGS,
172
            parameters => [
173
                hoconsc:ref(name),
174
                {action, hoconsc:mk(hoconsc:enum([start, stop]), #{desc => "Action", in => path})}
175
            ],
176
            responses => #{
177
                204 => <<"Trigger action successfully">>,
178
                404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>)
179
            }
180
        }
181
    };
182
schema("/plugins/:name/config") ->
183
    #{
332✔
184
        'operationId' => plugin_config,
185
        get => #{
186
            summary => <<"Get plugin config">>,
187
            description =>
188
                "Get plugin config. Config schema is defined by user's schema.avsc file.<br/>",
189
            tags => ?TAGS,
190
            parameters => [hoconsc:ref(name)],
191
            responses => #{
192
                %% avro data, json encoded
193
                200 => hoconsc:mk(binary()),
194
                400 => emqx_dashboard_swagger:error_codes(
195
                    ['BAD_CONFIG'], <<"Plugin Config Not Found">>
196
                ),
197
                404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>)
198
            }
199
        },
200
        put => #{
201
            summary =>
202
                <<"Update plugin config">>,
203
            description =>
204
                "Update plugin config. Config schema defined by user's schema.avsc file.<br/>",
205
            tags => ?TAGS,
206
            parameters => [hoconsc:ref(name)],
207
            'requestBody' => #{
208
                content => #{
209
                    'application/json' => #{
210
                        schema => #{
211
                            type => object
212
                        }
213
                    }
214
                }
215
            },
216
            responses => #{
217
                204 => <<"Config updated successfully">>,
218
                400 => emqx_dashboard_swagger:error_codes(
219
                    ['BAD_CONFIG', 'UNEXPECTED_ERROR'], <<"Update plugin config failed">>
220
                ),
221
                404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>),
222
                500 => emqx_dashboard_swagger:error_codes(['INTERNAL_ERROR'], <<"Internal Error">>)
223
            }
224
        }
225
    };
226
schema("/plugins/:name/config/download") ->
227
    #{
326✔
228
        'operationId' => download_plugin_config,
229
        get => #{
230
            summary => <<"Download plugin config">>,
231
            description => "Download plugin config",
232
            tags => ?TAGS,
233
            parameters => [hoconsc:ref(name)],
234
            responses => #{
235
                200 => hoconsc:mk(binary()),
236
                400 => emqx_dashboard_swagger:error_codes(
237
                    ['BAD_CONFIG'], <<"Plugin Config Not Found">>
238
                ),
239
                404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>)
240
            }
241
        }
242
    };
243
schema("/plugins/:name/config/upload") ->
244
    #{
326✔
245
        'operationId' => upload_plugin_config,
246
        post => #{
247
            summary =>
248
                <<"Upload plugin config">>,
249
            description => "Upload plugin config",
250
            tags => ?TAGS,
251
            parameters => [hoconsc:ref(name)],
252
            'requestBody' => #{
253
                content => #{
254
                    'multipart/form-data' => #{
255
                        schema => #{
256
                            type => object,
257
                            properties => #{
258
                                config => #{type => string, format => binary}
259
                            }
260
                        },
261
                        encoding => #{config => #{'contentType' => 'application/json'}}
262
                    }
263
                }
264
            },
265
            responses => #{
266
                204 => <<"Config updated successfully">>,
267
                400 => emqx_dashboard_swagger:error_codes(
268
                    ['BAD_CONFIG', 'UNEXPECTED_ERROR'], <<"Update plugin config failed">>
269
                ),
270
                404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>),
271
                500 => emqx_dashboard_swagger:error_codes(['INTERNAL_ERROR'], <<"Internal Error">>)
272
            }
273
        }
274
    };
275
schema("/plugins/:name/schema") ->
276
    #{
325✔
277
        'operationId' => plugin_schema,
278
        get => #{
279
            summary => <<"Get installed plugin's AVRO schema">>,
280
            description => "Get plugin's config AVRO schema.",
281
            tags => ?TAGS,
282
            parameters => [hoconsc:ref(name)],
283
            responses => #{
284
                %% avro schema and i18n json object
285
                200 => hoconsc:mk(binary()),
286
                404 => emqx_dashboard_swagger:error_codes(
287
                    ['NOT_FOUND', 'FILE_NOT_EXISTED'],
288
                    <<"Plugin Not Found or Plugin not given a schema file">>
289
                )
290
            }
291
        }
292
    };
293
schema("/plugins/:name/move") ->
294
    #{
329✔
295
        'operationId' => update_boot_order,
296
        post => #{
297
            summary => <<"Move plugin within plugin hierarchy">>,
298
            description => "Setting the boot order of plugins.",
299
            tags => ?TAGS,
300
            parameters => [hoconsc:ref(name)],
301
            'requestBody' => move_request_body(),
302
            responses => #{
303
                204 => <<"Boot order changed successfully">>,
304
                400 => emqx_dashboard_swagger:error_codes(['MOVE_FAILED'], <<"Move failed">>)
305
            }
306
        }
307
    }.
308

309
fields(plugin) ->
310
    [
311
        {name,
325✔
312
            hoconsc:mk(
313
                binary(),
314
                #{
315
                    desc => "Name-Vsn: without .tar.gz",
316
                    validator => fun ?MODULE:validate_name/1,
317
                    required => true,
318
                    example => "emqx_plugin_template-5.0-rc.1"
319
                }
320
            )},
321
        {author, hoconsc:mk(list(string()), #{example => [<<"EMQX Team">>]})},
322
        {builder, hoconsc:ref(?MODULE, builder)},
323
        {built_on_otp_release, hoconsc:mk(string(), #{example => "24"})},
324
        {compatibility, hoconsc:mk(map(), #{example => #{<<"emqx">> => <<"~>5.0">>}})},
325
        {git_commit_or_build_date,
326
            hoconsc:mk(string(), #{
327
                example => "2021-12-25",
328
                desc =>
329
                    "Last git commit date by `git log -1 --pretty=format:'%cd' "
330
                    "--date=format:'%Y-%m-%d`.\n"
331
                    " If the last commit date is not available, the build date will be presented."
332
            })},
333
        {functionality, hoconsc:mk(hoconsc:array(string()), #{example => [<<"Demo">>]})},
334
        {git_ref, hoconsc:mk(string(), #{example => "ddab50fafeed6b1faea70fc9ffd8c700d7e26ec1"})},
335
        {metadata_vsn, hoconsc:mk(string(), #{example => "0.1.0"})},
336
        {rel_vsn,
337
            hoconsc:mk(
338
                binary(),
339
                #{
340
                    desc => "Plugins release version",
341
                    required => true,
342
                    example => <<"5.0-rc.1">>
343
                }
344
            )},
345
        {rel_apps,
346
            hoconsc:mk(
347
                hoconsc:array(binary()),
348
                #{
349
                    desc => "Aplications in plugin.",
350
                    required => true,
351
                    example => [<<"emqx_plugin_template-5.0.0">>, <<"map_sets-1.1.0">>]
352
                }
353
            )},
354
        {repo, hoconsc:mk(string(), #{example => "https://github.com/emqx/emqx-plugin-template"})},
355
        {description,
356
            hoconsc:mk(
357
                binary(),
358
                #{
359
                    desc => "Plugin description.",
360
                    required => true,
361
                    example => "This is an demo plugin description"
362
                }
363
            )},
364
        {running_status,
365
            hoconsc:mk(
366
                hoconsc:array(hoconsc:ref(running_status)),
367
                #{required => true}
368
            )},
369
        {readme,
370
            hoconsc:mk(binary(), #{
371
                example => "This is an demo plugin.",
372
                desc => "only return when `GET /plugins/{name}`.",
373
                required => false
374
            })},
375
        {health_status, hoconsc:ref(?MODULE, health_status)}
376
    ];
377
fields(health_status) ->
378
    [
379
        {status, hoconsc:mk(hoconsc:enum([ok, error]), #{example => error})},
325✔
380
        {message, hoconsc:mk(binary(), #{example => <<"Port unavailable: 3306">>})}
381
    ];
382
fields(name) ->
383
    [
384
        {name,
362✔
385
            hoconsc:mk(
386
                binary(),
387
                #{
388
                    desc => list_to_binary(?NAME_RE),
389
                    example => "emqx_plugin_template-5.0-rc.1",
390
                    in => path,
391
                    validator => fun ?MODULE:validate_name/1
392
                }
393
            )}
394
    ];
395
fields(builder) ->
396
    [
397
        {contact, hoconsc:mk(string(), #{example => "emqx-support@emqx.io"})},
325✔
398
        {name, hoconsc:mk(string(), #{example => "EMQX Team"})},
399
        {website, hoconsc:mk(string(), #{example => "www.emqx.com"})}
400
    ];
401
fields(position) ->
402
    [
403
        {position,
329✔
404
            hoconsc:mk(
405
                hoconsc:union([front, rear, binary()]),
406
                #{
407
                    desc =>
408
                        ""
409
                        "\n"
410
                        "             Enable auto-boot at position in the boot list, where Position could be\n"
411
                        "             'front', 'rear', or 'before:other-vsn', 'after:other-vsn'\n"
412
                        "             to specify a relative position.\n"
413
                        "            "
414
                        "",
415
                    required => false
416
                }
417
            )}
418
    ];
419
fields(running_status) ->
420
    [
421
        {node, hoconsc:mk(string(), #{example => "emqx@127.0.0.1"})},
325✔
422
        {status,
423
            hoconsc:mk(hoconsc:enum([running, stopped]), #{
424
                desc =>
425
                    "Install plugin status at runtime<br/>"
426
                    "1. running: plugin is running.<br/>"
427
                    "2. stopped: plugin is stopped.<br/>"
428
            })}
429
    ].
430

431
move_request_body() ->
432
    emqx_dashboard_swagger:schema_with_examples(
329✔
433
        hoconsc:ref(?MODULE, position),
434
        #{
435
            move_to_front => #{
436
                summary => <<"move plugin on the front">>,
437
                value => #{position => <<"front">>}
438
            },
439
            move_to_rear => #{
440
                summary => <<"move plugin on the rear">>,
441
                value => #{position => <<"rear">>}
442
            },
443
            move_to_before => #{
444
                summary => <<"move plugin before other plugins">>,
445
                value => #{position => <<"before:emqx_plugin_demo-5.1-rc.2">>}
446
            },
447
            move_to_after => #{
448
                summary => <<"move plugin after other plugins">>,
449
                value => #{position => <<"after:emqx_plugin_demo-5.1-rc.2">>}
450
            }
451
        }
452
    ).
453

454
validate_name(Name) ->
455
    NameLen = byte_size(Name),
49✔
456
    case NameLen > 0 andalso NameLen =< 256 of
49✔
457
        true ->
458
            case re:run(Name, ?NAME_RE) of
49✔
459
                nomatch ->
460
                    {
×
461
                        error,
462
                        "Name should be an application name"
463
                        " (starting with a letter, containing letters, digits and underscores)"
464
                        " followed with a dash and a version string "
465
                        " (can contain letters, digits, dots, and dashes), "
466
                        " e.g. emqx_plugin_template-5.0-rc.1"
467
                    };
468
                _ ->
469
                    ok
49✔
470
            end;
471
        false ->
472
            {error, "Name Length must =< 256"}
×
473
    end.
474

475
validate_file_name(#{body := #{<<"plugin">> := Plugin}} = Params, _Meta) when is_map(Plugin) ->
476
    [{FileName, Bin}] = maps:to_list(maps:without([type], Plugin)),
12✔
477
    NameVsn = string:trim(FileName, trailing, ".tar.gz"),
12✔
478
    case validate_name(NameVsn) of
12✔
479
        ok ->
480
            {ok, Params#{name => NameVsn, bin => Bin}};
12✔
481
        {error, Reason} ->
482
            {400, #{
×
483
                code => 'BAD_PLUGIN_INFO',
484
                message => iolist_to_binary(["Bad plugin file name: ", FileName, ". ", Reason])
485
            }}
486
    end;
487
validate_file_name(_Params, _Meta) ->
488
    {400, #{
1✔
489
        code => 'BAD_FORM_DATA',
490
        message =>
491
            <<"form-data should be `plugin=@packagename-vsn.tar.gz;type=application/x-gzip`">>
492
    }}.
493

494
%% API CallBack Begin
495
list_plugins(get, _) ->
496
    Nodes = emqx:running_nodes(),
13✔
497
    {Plugins, []} = emqx_mgmt_api_plugins_proto_v4:get_plugins(Nodes),
13✔
498
    {200, format_plugins(Plugins)}.
13✔
499

500
get_plugins() ->
501
    {node(), emqx_plugins:list(?normal, #{health_check => true})}.
20✔
502

503
upload_install(post, #{name := NameVsn, bin := Bin}) ->
504
    case emqx_plugins:describe(NameVsn, #{}) of
12✔
505
        {error, #{msg := "bad_info_file", reason := {enoent, _Path}}} ->
506
            case emqx_plugins:is_package_present(NameVsn) of
12✔
507
                false ->
508
                    install_package_on_nodes(NameVsn, Bin);
12✔
509
                {true, TarGzs} ->
510
                    %% TODO
511
                    %% What if a tar file is present but is not unpacked, i.e.
512
                    %% the plugin is not fully installed?
513
                    {400, #{
×
514
                        code => 'ALREADY_INSTALLED',
515
                        message => iolist_to_binary(io_lib:format("~p already installed", [TarGzs]))
516
                    }}
517
            end;
518
        {ok, _} ->
519
            {400, #{
×
520
                code => 'ALREADY_INSTALLED',
521
                message => iolist_to_binary(io_lib:format("~p is already installed", [NameVsn]))
522
            }}
523
    end;
524
upload_install(post, #{}) ->
525
    {400, #{
×
526
        code => 'BAD_FORM_DATA',
527
        message =>
528
            <<"form-data should be `plugin=@packagename-vsn.tar.gz;type=application/x-gzip`">>
529
    }}.
530

531
install_package_on_nodes(NameVsn, Bin) ->
532
    case emqx_plugins:is_allowed_installation(NameVsn) of
12✔
533
        true ->
534
            do_install_package_on_nodes(NameVsn, Bin);
9✔
535
        false ->
536
            Msg = iolist_to_binary([
3✔
537
                <<"Package is not allowed installation;">>,
538
                <<" first allow it to be installed by running:">>,
539
                <<" `emqx ctl plugins allow ">>,
540
                NameVsn,
541
                <<"`">>
542
            ]),
543
            {403, #{code => 'FORBIDDEN', message => Msg}}
3✔
544
    end.
545

546
do_install_package_on_nodes(NameVsn, Bin) ->
547
    %% TODO: handle bad nodes
548
    Nodes = emqx:running_nodes(),
9✔
549
    {[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v4:install_package(Nodes, NameVsn, Bin),
9✔
550
    case lists:filter(fun(R) -> R =/= ok end, Res) of
9✔
551
        [] ->
552
            {204};
8✔
553
        Filtered ->
554
            %% crash if we have unexpected errors or results
555
            [] = lists:filter(
1✔
556
                fun
557
                    ({error, {failed, _}}) -> true;
×
558
                    ({error, _}) -> false
1✔
559
                end,
560
                Filtered
561
            ),
562
            Reason =
1✔
563
                case hd(Filtered) of
564
                    {error, #{msg := Reason0}} -> Reason0;
1✔
565
                    {error, #{reason := Reason0}} -> Reason0
×
566
                end,
567
            {400, #{
1✔
568
                code => 'BAD_PLUGIN_INFO',
569
                message => iolist_to_binary([bin(Reason), ": ", NameVsn])
570
            }}
571
    end.
572

573
plugin(get, #{bindings := #{name := NameVsn}}) ->
574
    Nodes = emqx:running_nodes(),
9✔
575
    {Plugins, _} = emqx_mgmt_api_plugins_proto_v4:describe_package(Nodes, NameVsn),
9✔
576
    case format_plugins(Plugins) of
9✔
577
        [Plugin] -> {200, Plugin};
9✔
578
        [] -> {404, #{code => 'NOT_FOUND', message => NameVsn}}
×
579
    end;
580
plugin(delete, #{bindings := #{name := NameVsn}}) ->
581
    Res = emqx_mgmt_api_plugins_proto_v4:delete_package(NameVsn),
7✔
582
    return(204, Res).
7✔
583

584
update_plugin(put, #{bindings := #{name := NameVsn, action := Action}}) ->
585
    Res = emqx_mgmt_api_plugins_proto_v4:ensure_action(NameVsn, Action),
8✔
586
    return(204, Res).
8✔
587

588
plugin_config(get, #{bindings := #{name := NameVsn}}) ->
NEW
589
    get_plugin_config(NameVsn);
×
590
plugin_config(put, #{bindings := #{name := NameVsn}, body := Config}) ->
591
    put_plugin_config(NameVsn, Config).
7✔
592

593
upload_plugin_config(post, #{
594
    bindings := #{name := NameVsn}, body := #{<<"config">> := #{type := _} = ConfigUpload}
595
}) ->
596
    [{_FileName, ConfigBin}] = maps:to_list(maps:without([type], ConfigUpload)),
1✔
597
    put_plugin_config(NameVsn, ConfigBin).
1✔
598

599
download_plugin_config(get, #{bindings := #{name := NameVsn}}) ->
600
    case get_plugin_config(NameVsn) of
1✔
601
        {200, Headers0, Config} ->
602
            Headers = Headers0#{
1✔
603
                <<"content-disposition">> =>
604
                    <<"attachment; filename=\"", NameVsn/binary, ".json\"">>
605
            },
606
            {200, Headers, Config};
1✔
607
        FailureResponse ->
NEW
608
            FailureResponse
×
609
    end.
610

611
get_plugin_config(NameVsn) ->
612
    case emqx_plugins:describe(NameVsn, #{}) of
1✔
613
        {ok, _} ->
614
            case emqx_plugins:get_config(NameVsn, ?plugin_conf_not_found) of
1✔
615
                Config when is_map(Config) ->
616
                    {200, #{<<"content-type">> => <<"'application/json'">>}, Config};
1✔
617
                ?plugin_conf_not_found ->
618
                    {400, #{
×
619
                        code => 'BAD_CONFIG',
620
                        message => <<"Plugin Config Not Found">>
621
                    }}
622
            end;
623
        _ ->
624
            {404, plugin_not_found_msg()}
×
625
    end.
626

627
put_plugin_config(NameVsn, Config) ->
628
    Nodes = emqx:running_nodes(),
8✔
629
    case emqx_plugins:describe(NameVsn, #{}) of
8✔
630
        {ok, _} ->
631
            case emqx_plugins:decode_plugin_config_map(NameVsn, Config) of
8✔
632
                {ok, ?plugin_without_config_schema} ->
633
                    %% no plugin avro schema, just put the json map as-is
634
                    Res = emqx_mgmt_api_plugins_proto_v4:update_plugin_config(
8✔
635
                        Nodes, NameVsn, Config
636
                    ),
637
                    return_config_update_result(Res);
8✔
638
                {ok, _AvroValue} ->
639
                    %% cluster call with config in map (binary key-value)
640
                    Res = emqx_mgmt_api_plugins_proto_v4:update_plugin_config(
×
641
                        Nodes, NameVsn, Config
642
                    ),
643
                    return_config_update_result(Res);
×
644
                {error, Reason} ->
645
                    {400, #{
×
646
                        code => 'BAD_CONFIG',
647
                        message => readable_error_msg(Reason)
648
                    }}
649
            end;
650
        _ ->
651
            {404, plugin_not_found_msg()}
×
652
    end.
653

654
plugin_schema(get, #{bindings := #{name := NameVsn}}) ->
655
    case emqx_plugins:describe(NameVsn, #{}) of
×
656
        {ok, _Plugin} ->
657
            {200, format_plugin_avsc_and_i18n(NameVsn)};
×
658
        _ ->
659
            {404, plugin_not_found_msg()}
×
660
    end.
661

662
update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
663
    case parse_position(Body, Name) of
4✔
664
        {error, Reason} ->
665
            {400, #{code => 'BAD_POSITION', message => Reason}};
×
666
        Position ->
667
            case emqx_plugins:ensure_enabled(Name, Position, global) of
4✔
668
                ok ->
669
                    {204};
4✔
670
                {error, Reason} ->
671
                    {400, #{
×
672
                        code => 'MOVE_FAILED',
673
                        message => readable_error_msg(Reason)
674
                    }}
675
            end
676
    end.
677

678
%% API CallBack End
679

680
%% For RPC upload_install/2
681
install_package(FileName, Bin) ->
682
    NameVsn = string:trim(FileName, trailing, ".tar.gz"),
×
683
    install_package_v4(NameVsn, Bin).
×
684

685
install_package_v4(NameVsn, Bin) ->
686
    ok = emqx_plugins:write_package(NameVsn, Bin),
11✔
687
    case emqx_plugins:ensure_installed(NameVsn, ?fresh_install) of
11✔
688
        {error, #{reason := plugin_not_found}} = NotFound ->
689
            NotFound;
×
690
        {error, Reason} = Error ->
691
            ?SLOG(error, Reason#{msg => "failed_to_install_plugin"}),
1✔
692
            _ = emqx_plugins:delete_package(NameVsn),
1✔
693
            Error;
1✔
694
        Result ->
695
            Result
10✔
696
    end.
697

698
%% For RPC plugin get
699
describe_package(NameVsn) ->
700
    Node = node(),
9✔
701
    case emqx_plugins:describe(NameVsn) of
9✔
702
        {ok, Plugin} -> {Node, [Plugin]};
9✔
703
        _ -> {Node, []}
×
704
    end.
705

706
%% Tip: Don't delete delete_package/1, use before v571 cluster_rpc
707
delete_package(NameVsn) ->
708
    delete_package(NameVsn, #{}).
×
709

710
%% For RPC plugin delete
711
delete_package(NameVsn, _Opts) ->
712
    _ = emqx_plugins:forget_allowed_installation(NameVsn),
7✔
713
    case emqx_plugins:ensure_stopped(NameVsn) of
7✔
714
        ok ->
715
            _ = emqx_plugins:ensure_disabled(NameVsn),
6✔
716
            _ = emqx_plugins:ensure_uninstalled(NameVsn),
6✔
717
            _ = emqx_plugins:delete_package(NameVsn),
6✔
718
            ok;
6✔
719
        Error ->
720
            Error
1✔
721
    end.
722

723
%% Tip: Don't delete ensure_action/2, use before v571 cluster_rpc
724
ensure_action(Name, Action) ->
725
    ensure_action(Name, Action, #{}).
×
726

727
%% for RPC plugin update
728
%% TODO: catch thrown error to return 400
729
%% - plugin_not_found
730
%% - otp vsn assertion failed
731

732
ensure_action(Name, start, _Opts) ->
733
    _ = emqx_plugins:ensure_started(Name),
7✔
734
    _ = emqx_plugins:ensure_enabled(Name),
7✔
735
    ok;
7✔
736
ensure_action(Name, stop, _Opts) ->
737
    _ = emqx_plugins:ensure_stopped(Name),
3✔
738
    _ = emqx_plugins:ensure_disabled(Name),
3✔
739
    ok;
3✔
740
ensure_action(Name, restart, _Opts) ->
741
    _ = emqx_plugins:ensure_enabled(Name),
×
742
    _ = emqx_plugins:restart(Name),
×
743
    ok.
×
744

745
%% for RPC plugin avro encoded config update
746
-spec do_update_plugin_config(name_vsn(), map() | binary(), any()) ->
747
    ok.
748
do_update_plugin_config(NameVsn, AvroJsonMap, _AvroValue) ->
749
    case do_update_plugin_config_v4(NameVsn, AvroJsonMap) of
×
750
        ok -> ok;
×
751
        {error, Reason} -> error(Reason)
×
752
    end.
753

754
-spec do_update_plugin_config_v4(name_vsn(), map() | binary()) ->
755
    ok | {error, term()}.
756
do_update_plugin_config_v4(NameVsn, AvroJsonMap) when is_binary(AvroJsonMap) ->
757
    do_update_plugin_config_v4(NameVsn, emqx_utils_json:decode(AvroJsonMap));
1✔
758
do_update_plugin_config_v4(NameVsn, AvroJsonMap) ->
759
    emqx_plugins:update_config(NameVsn, AvroJsonMap).
8✔
760

761
%%--------------------------------------------------------------------
762
%% Helper functions
763
%%--------------------------------------------------------------------
764

765
return(Code, ok) ->
766
    {Code};
14✔
767
return(_, {error, #{msg := Msg, reason := {enoent, Path} = Reason}}) ->
768
    ?SLOG(error, #{msg => Msg, reason => Reason}),
1✔
769
    {404, #{code => 'NOT_FOUND', message => iolist_to_binary([Path, " does not exist"])}};
1✔
770
return(_, {error, Reason}) ->
771
    {400, #{code => 'PARAM_ERROR', message => readable_error_msg(Reason)}}.
×
772

773
return_config_update_result({Responses, BadNodes}) ->
774
    ResponseErrors = lists:filter(fun(Response) -> Response =/= ok end, Responses),
8✔
775
    NodeErrors = [{badnode, Node} || Node <- BadNodes],
8✔
776
    case {ResponseErrors, NodeErrors} of
8✔
777
        {[], []} ->
778
            {204};
5✔
779
        {ResponseErrors, []} ->
780
            {400, #{code => 'BAD_CONFIG', message => readable_error_msg(ResponseErrors)}};
3✔
781
        {ResponseErrors, NodeErrors} ->
782
            {500, #{
×
783
                code => 'INTERNAL_ERROR',
784
                message => readable_error_msg(ResponseErrors ++ NodeErrors)
785
            }}
786
    end.
787

788
plugin_not_found_msg() ->
789
    #{
×
790
        code => 'NOT_FOUND',
791
        message => <<"Plugin Not Found">>
792
    }.
793

794
readable_error_msg(Msg) ->
795
    emqx_utils:readable_error_msg(Msg).
3✔
796

797
parse_position(#{<<"position">> := <<"front">>}, _) ->
798
    front;
1✔
799
parse_position(#{<<"position">> := <<"rear">>}, _) ->
800
    rear;
1✔
801
parse_position(#{<<"position">> := <<"before:", Name/binary>>}, Name) ->
802
    {error, <<"Invalid parameter. Cannot be placed before itself">>};
×
803
parse_position(#{<<"position">> := <<"after:", Name/binary>>}, Name) ->
804
    {error, <<"Invalid parameter. Cannot be placed after itself">>};
×
805
parse_position(#{<<"position">> := <<"before:">>}, _Name) ->
806
    {error, <<"Invalid parameter. Cannot be placed before an empty target">>};
×
807
parse_position(#{<<"position">> := <<"after:">>}, _Name) ->
808
    {error, <<"Invalid parameter. Cannot be placed after an empty target">>};
×
809
parse_position(#{<<"position">> := <<"before:", Before/binary>>}, _Name) ->
810
    {before, binary_to_list(Before)};
1✔
811
parse_position(#{<<"position">> := <<"after:", After/binary>>}, _Name) ->
812
    {behind, binary_to_list(After)};
1✔
813
parse_position(Position, _) ->
814
    {error, iolist_to_binary(io_lib:format("~p", [Position]))}.
×
815

816
format_plugins(List) ->
817
    StatusMap = aggregate_status(List),
22✔
818
    SortFun = fun({_N1, P1}, {_N2, P2}) -> length(P1) > length(P2) end,
22✔
819
    SortList = lists:sort(SortFun, List),
22✔
820
    pack_status_in_order(SortList, StatusMap).
22✔
821

822
pack_status_in_order(List, StatusMap) ->
823
    {Plugins, _} =
22✔
824
        lists:foldl(
825
            fun({_Node, PluginList}, {Acc, StatusAcc}) ->
826
                pack_plugin_in_order(PluginList, Acc, StatusAcc)
29✔
827
            end,
828
            {[], StatusMap},
829
            List
830
        ),
831
    lists:reverse(Plugins).
22✔
832

833
pack_plugin_in_order([], Acc, StatusAcc) ->
834
    {Acc, StatusAcc};
22✔
835
pack_plugin_in_order(_, Acc, StatusAcc) when map_size(StatusAcc) =:= 0 -> {Acc, StatusAcc};
7✔
836
pack_plugin_in_order([Plugin0 | Plugins], Acc, StatusAcc) ->
837
    #{name := Name, rel_vsn := Vsn} = Plugin0,
27✔
838
    case maps:find({Name, Vsn}, StatusAcc) of
27✔
839
        {ok, Status} ->
840
            Plugin1 = maps:without([running_status, config_status], Plugin0),
27✔
841
            Plugins2 = Plugin1#{running_status => Status},
27✔
842
            NewStatusAcc = maps:remove({Name, Vsn}, StatusAcc),
27✔
843
            pack_plugin_in_order(Plugins, [Plugins2 | Acc], NewStatusAcc);
27✔
844
        error ->
845
            pack_plugin_in_order(Plugins, Acc, StatusAcc)
×
846
    end.
847

848
aggregate_status(List) -> aggregate_status(List, #{}).
22✔
849

850
aggregate_status([], Acc) ->
851
    Acc;
22✔
852
aggregate_status([{Node, Plugins} | List], Acc) ->
853
    NewAcc =
29✔
854
        lists:foldl(
855
            fun(Plugin, SubAcc) ->
856
                #{name := Name, rel_vsn := Vsn} = Plugin,
39✔
857
                Key = {Name, Vsn},
39✔
858
                Value0 = #{
39✔
859
                    node => Node,
860
                    status => plugin_status(Plugin)
861
                },
862
                Value = add_health_status(Value0, Plugin),
39✔
863
                SubAcc#{Key => [Value | maps:get(Key, Acc, [])]}
39✔
864
            end,
865
            Acc,
866
            Plugins
867
        ),
868
    aggregate_status(List, NewAcc).
29✔
869

870
-dialyzer({nowarn_function, format_plugin_avsc_and_i18n/1}).
871
format_plugin_avsc_and_i18n(NameVsn) ->
872
    case emqx_release:edition() of
×
873
        ee ->
874
            #{
×
875
                avsc => or_null(emqx_plugins:plugin_schema(NameVsn)),
876
                i18n => or_null(emqx_plugins:plugin_i18n(NameVsn))
877
            };
878
        ce ->
879
            #{avsc => null, i18n => null}
×
880
    end.
881

882
or_null({ok, Value}) -> Value;
×
883
or_null(_) -> null.
×
884

885
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
×
886
bin(L) when is_list(L) -> list_to_binary(L);
1✔
887
bin(B) when is_binary(B) -> B.
×
888

889
% running_status: running loaded, stopped
890
%% config_status: not_configured disable enable
891
plugin_status(#{running_status := running}) -> running;
22✔
892
plugin_status(_) -> stopped.
17✔
893

894
add_health_status(StatusInfo, #{health_status := HealthStatus}) ->
895
    StatusInfo#{health_status => HealthStatus};
22✔
896
add_health_status(StatusInfo, _) ->
897
    StatusInfo.
17✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc