• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx-operator / 6245615479

20 Sep 2023 07:53AM UTC coverage: 73.273%. First build
6245615479

Pull #949

github

Rory-Z
fix: delete lastUpdateTime in status to fix resourceVersion frequent updates

Signed-off-by: Rory Z <16801068+Rory-Z@users.noreply.github.com>
Pull Request #949: Patch/ecp

15 of 18 new or added lines in 3 files covered. (83.33%)

1793 of 2447 relevant lines covered (73.27%)

1.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.58
/controllers/apps/v1beta3/emqx_handler.go
1
/*
2
Copyright 2021.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package apps
18

19
import (
20
        "context"
21
        "encoding/base64"
22
        "fmt"
23
        "net"
24
        "path/filepath"
25
        "reflect"
26
        "regexp"
27
        "sort"
28
        "strconv"
29
        "strings"
30
        "time"
31

32
        emperror "emperror.dev/errors"
33
        appsv1 "k8s.io/api/apps/v1"
34
        corev1 "k8s.io/api/core/v1"
35
        k8sErrors "k8s.io/apimachinery/pkg/api/errors"
36
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37

38
        "k8s.io/apimachinery/pkg/labels"
39
        "k8s.io/apimachinery/pkg/runtime"
40
        "k8s.io/apimachinery/pkg/types"
41
        "k8s.io/apimachinery/pkg/util/intstr"
42
        "k8s.io/client-go/tools/record"
43
        "k8s.io/client-go/util/retry"
44

45
        ctrl "sigs.k8s.io/controller-runtime"
46
        "sigs.k8s.io/controller-runtime/pkg/client"
47
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
48

49
        appsv1beta3 "github.com/emqx/emqx-operator/apis/apps/v1beta3"
50
        "github.com/emqx/emqx-operator/pkg/handler"
51
        json "github.com/json-iterator/go"
52
        "github.com/tidwall/gjson"
53
)
54

55
var _ reconcile.Reconciler = &EmqxBrokerReconciler{}
56

57
const (
58
        ReloaderContainerName = "reloader"
59
)
60

61
type EmqxReconciler struct {
62
        handler.Handler
63
        Scheme *runtime.Scheme
64
        record.EventRecorder
65
}
66

67
func (r *EmqxReconciler) Do(ctx context.Context, instance appsv1beta3.Emqx) (ctrl.Result, error) {
1✔
68
        var err error
1✔
69
        var emqxNodes []appsv1beta3.EmqxNode
1✔
70
        emqxNodes, err = r.getNodeStatusesByAPI(instance)
1✔
71
        if err != nil {
2✔
72
                r.EventRecorder.Event(instance, corev1.EventTypeWarning, "FailedToGetNodeStatues", err.Error())
1✔
73
                condition := appsv1beta3.NewCondition(
1✔
74
                        appsv1beta3.ConditionRunning,
1✔
75
                        corev1.ConditionFalse,
1✔
76
                        "FailedToGetNodeStatues",
1✔
77
                        err.Error(),
1✔
78
                )
1✔
79
                instance.SetCondition(*condition)
1✔
80
                _ = r.Status().Update(ctx, instance)
1✔
81
        }
1✔
82
        if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
2✔
83
                _ = r.Get(ctx, client.ObjectKeyFromObject(instance), instance)
1✔
84
                instance = updateEmqxStatus(instance, emqxNodes)
1✔
85
                return r.Status().Update(ctx, instance)
1✔
86
        }); err != nil {
1✔
NEW
87
                return ctrl.Result{}, emperror.Wrap(err, "failed to update status")
×
88
        }
×
89

90
        var resources []client.Object
1✔
91
        postFn := func(client.Object) error { return nil }
2✔
92

93
        sts := generateStatefulSetDef(instance)
1✔
94

1✔
95
        storeSts := &appsv1.StatefulSet{}
1✔
96
        if err := r.Get(ctx, client.ObjectKeyFromObject(sts), storeSts); err != nil {
2✔
97
                if !k8sErrors.IsNotFound(err) {
1✔
98
                        return ctrl.Result{}, err
×
99
                }
×
100
        }
101
        // store statefulSet is exit
102
        if storeSts.Spec.PodManagementPolicy != "" {
2✔
103
                sts.Spec.PodManagementPolicy = storeSts.Spec.PodManagementPolicy
1✔
104
        }
1✔
105
        // compatible with 1.2.2
106
        if storeSts.Spec.VolumeClaimTemplates != nil {
2✔
107
                sts.Spec.VolumeClaimTemplates = storeSts.Spec.VolumeClaimTemplates
1✔
108
        }
1✔
109
        // compatible with 1.2.2
110
        if storeSts.Annotations != nil {
2✔
111
                sts.Annotations = storeSts.Annotations
1✔
112
        }
1✔
113

114
        defaultPluginsConfig := generateDefaultPluginsConfig(instance)
1✔
115
        sts = updatePluginsConfigForSts(sts, defaultPluginsConfig)
1✔
116

1✔
117
        if status := instance.GetStatus(); !status.IsPluginInitialized() {
2✔
118
                resources = append(resources, defaultPluginsConfig)
1✔
119

1✔
120
                pluginsList := &appsv1beta3.EmqxPluginList{}
1✔
121
                err = r.Client.List(ctx, pluginsList, client.InNamespace(instance.GetNamespace()))
1✔
122
                if err != nil && !k8sErrors.IsNotFound(err) {
1✔
123
                        return ctrl.Result{}, err
×
124
                }
×
125
                var condition *appsv1beta3.Condition
1✔
126
                pluginResourceList := generateInitPluginList(instance, pluginsList)
1✔
127
                resources = append(resources, pluginResourceList...)
1✔
128

1✔
129
                err = r.CreateOrUpdateList(instance, r.Scheme, resources, postFn)
1✔
130
                if err != nil {
1✔
131
                        r.EventRecorder.Event(instance, corev1.EventTypeWarning, "FailedCreateOrUpdate", err.Error())
×
132
                        condition = appsv1beta3.NewCondition(
×
133
                                appsv1beta3.ConditionPluginInitialized,
×
134
                                corev1.ConditionFalse,
×
135
                                "PluginInitializeFailed",
×
136
                                err.Error(),
×
137
                        )
×
138
                        instance.SetCondition(*condition)
×
139
                        _ = r.Status().Update(ctx, instance)
×
140
                        return ctrl.Result{RequeueAfter: time.Duration(5) * time.Second}, err
×
141
                }
×
142
                condition = appsv1beta3.NewCondition(
1✔
143
                        appsv1beta3.ConditionPluginInitialized,
1✔
144
                        corev1.ConditionTrue,
1✔
145
                        "PluginInitializeSuccessfully",
1✔
146
                        "All default plugins initialized",
1✔
147
                )
1✔
148
                instance.SetCondition(*condition)
1✔
149
                _ = r.Status().Update(ctx, instance)
1✔
150
                return ctrl.Result{RequeueAfter: time.Duration(5) * time.Second}, nil
1✔
151
        }
152

153
        if acl := generateAcl(instance); acl != nil {
2✔
154
                resources = append(resources, acl)
1✔
155
                sts = updateAclForSts(sts, acl)
1✔
156
        }
1✔
157

158
        if loadedModules := generateLoadedModules(instance); loadedModules != nil {
2✔
159
                resources = append(resources, loadedModules)
1✔
160
                sts = updateLoadedModulesForSts(sts, loadedModules)
1✔
161
        }
1✔
162

163
        if emqxEnterprise, ok := instance.(*appsv1beta3.EmqxEnterprise); ok {
2✔
164
                var license *corev1.Secret
1✔
165
                if emqxEnterprise.GetLicense().SecretName != "" {
1✔
166
                        license = &corev1.Secret{}
×
167
                        if err := r.Client.Get(context.Background(), types.NamespacedName{Name: emqxEnterprise.GetLicense().SecretName, Namespace: emqxEnterprise.GetNamespace()}, license); err != nil {
×
168
                                return ctrl.Result{}, err
×
169
                        }
×
170
                } else {
1✔
171
                        license = generateLicense(emqxEnterprise)
1✔
172
                }
1✔
173

174
                if license != nil {
2✔
175
                        resources = append(resources, license)
1✔
176
                        sts = updateLicenseForsts(sts, license)
1✔
177
                }
1✔
178
        }
179

180
        if status := instance.GetStatus(); status.IsRunning() {
2✔
181
                serviceTemplate := instance.GetServiceTemplate()
1✔
182
                serviceTemplate.MergePorts(r.getListenerPortsByAPI(instance))
1✔
183
                instance.SetServiceTemplate(serviceTemplate)
1✔
184
                svc := generateSvc(instance)
1✔
185
                resources = append(resources, svc)
1✔
186
        }
1✔
187

188
        headlessSvc := generateHeadlessSvc(instance)
1✔
189
        sts.Spec.ServiceName = headlessSvc.Name
1✔
190
        resources = append(resources, headlessSvc)
1✔
191

1✔
192
        resources = append(resources, sts)
1✔
193

1✔
194
        if err := r.CreateOrUpdateList(instance, r.Scheme, resources, postFn); err != nil {
1✔
195
                r.EventRecorder.Event(instance, corev1.EventTypeWarning, "FailedCreateOrUpdate", err.Error())
×
196
                condition := appsv1beta3.NewCondition(
×
197
                        appsv1beta3.ConditionRunning,
×
198
                        corev1.ConditionFalse,
×
199
                        "FailedCreateOrUpdate",
×
200
                        err.Error(),
×
201
                )
×
202
                instance.SetCondition(*condition)
×
203
                _ = r.Status().Update(ctx, instance)
×
204
                return ctrl.Result{RequeueAfter: time.Duration(5) * time.Second}, err
×
205
        }
×
206

207
        emqxNodes, err = r.getNodeStatusesByAPI(instance)
1✔
208
        if err != nil {
2✔
209
                r.EventRecorder.Event(instance, corev1.EventTypeWarning, "FailedToGetNodeStatues", err.Error())
1✔
210
                condition := appsv1beta3.NewCondition(
1✔
211
                        appsv1beta3.ConditionRunning,
1✔
212
                        corev1.ConditionFalse,
1✔
213
                        "FailedToGetNodeStatues",
1✔
214
                        err.Error(),
1✔
215
                )
1✔
216
                instance.SetCondition(*condition)
1✔
217
                _ = r.Status().Update(ctx, instance)
1✔
218
        }
1✔
219

220
        instance = updateEmqxStatus(instance, emqxNodes)
1✔
221
        if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
2✔
222
                _ = r.Get(ctx, client.ObjectKeyFromObject(instance), instance)
1✔
223
                instance = updateEmqxStatus(instance, emqxNodes)
1✔
224
                return r.Status().Update(ctx, instance)
1✔
225
        }); err != nil {
1✔
NEW
226
                return ctrl.Result{}, emperror.Wrap(err, "failed to update status")
×
227
        }
×
228

229
        if status := instance.GetStatus(); !status.IsRunning() {
2✔
230
                return ctrl.Result{RequeueAfter: time.Duration(5) * time.Second}, nil
1✔
231
        }
1✔
232
        return ctrl.Result{RequeueAfter: time.Duration(20) * time.Second}, nil
1✔
233
}
234

235
func (r *EmqxReconciler) getListenerPortsByAPI(instance appsv1beta3.Emqx) []corev1.ServicePort {
1✔
236
        resp, body, err := r.Handler.RequestAPI(instance, "GET", instance.GetUsername(), instance.GetPassword(), appsv1beta3.DefaultManagementPort, "api/v4/listeners")
1✔
237
        if err != nil {
1✔
238
                return nil
×
239
        }
×
240
        if resp.StatusCode != 200 {
1✔
241
                return nil
×
242
        }
×
243

244
        ports := []corev1.ServicePort{}
1✔
245
        listeners := gjson.GetBytes(body, "data.0.listeners")
1✔
246
        for _, l := range listeners.Array() {
2✔
247
                var name string
1✔
248
                var protocol corev1.Protocol
1✔
249
                var strPort string
1✔
250
                var intPort int
1✔
251

1✔
252
                compile := regexp.MustCompile(".*(udp|dtls|sn).*")
1✔
253
                proto := gjson.Get(l.Raw, "protocol").String()
1✔
254
                if compile.MatchString(proto) {
2✔
255
                        protocol = corev1.ProtocolUDP
1✔
256
                } else {
2✔
257
                        protocol = corev1.ProtocolTCP
1✔
258
                }
1✔
259

260
                listenOn := gjson.Get(l.Raw, "listen_on").String()
1✔
261
                if strings.Contains(listenOn, ":") {
2✔
262
                        _, strPort, err = net.SplitHostPort(listenOn)
1✔
263
                        if err != nil {
1✔
264
                                strPort = listenOn
×
265
                        }
×
266
                } else {
1✔
267
                        strPort = listenOn
1✔
268
                }
1✔
269
                intPort, _ = strconv.Atoi(strPort)
1✔
270

1✔
271
                // Get name by protocol and port from API
1✔
272
                // protocol maybe like mqtt:wss:8084
1✔
273
                // protocol maybe like mqtt:tcp
1✔
274
                // We had to do something with the "protocol" to make it conform to the kubernetes service port name specification
1✔
275
                name = regexp.MustCompile(`:[\d]+`).ReplaceAllString(proto, "")
1✔
276
                name = strings.ReplaceAll(name, ":", "-")
1✔
277
                name = fmt.Sprintf("%s-%s", name, strPort)
1✔
278

1✔
279
                ports = append(ports, corev1.ServicePort{
1✔
280
                        Name:       name,
1✔
281
                        Protocol:   protocol,
1✔
282
                        Port:       int32(intPort),
1✔
283
                        TargetPort: intstr.FromInt(intPort),
1✔
284
                })
1✔
285
        }
286
        return ports
1✔
287
}
288

289
func (r *EmqxReconciler) getNodeStatusesByAPI(instance appsv1beta3.Emqx) ([]appsv1beta3.EmqxNode, error) {
1✔
290
        resp, body, err := r.Handler.RequestAPI(instance, "GET", instance.GetUsername(), instance.GetPassword(), appsv1beta3.DefaultManagementPort, "api/v4/nodes")
1✔
291
        if err != nil {
2✔
292
                return nil, err
1✔
293
        }
1✔
294
        if resp.StatusCode != 200 {
1✔
295
                return nil, fmt.Errorf("failed to get node statuses from API: %s", resp.Status)
×
296
        }
×
297

298
        emqxNodes := []appsv1beta3.EmqxNode{}
1✔
299
        data := gjson.GetBytes(body, "data")
1✔
300
        if err := json.Unmarshal([]byte(data.Raw), &emqxNodes); err != nil {
1✔
301
                return nil, fmt.Errorf("failed to unmarshal node statuses: %v", err)
×
302
        }
×
303
        return emqxNodes, nil
1✔
304
}
305

306
func generateStatefulSetDef(instance appsv1beta3.Emqx) *appsv1.StatefulSet {
2✔
307
        annotations := instance.GetAnnotations()
2✔
308
        if annotations == nil {
3✔
309
                annotations = make(map[string]string)
1✔
310
        }
1✔
311
        delete(annotations, "kubectl.kubernetes.io/last-applied-configuration")
2✔
312

2✔
313
        podTemplate := corev1.PodTemplateSpec{
2✔
314
                ObjectMeta: metav1.ObjectMeta{
2✔
315
                        Labels:      instance.GetLabels(),
2✔
316
                        Annotations: annotations,
2✔
317
                },
2✔
318
                Spec: corev1.PodSpec{
2✔
319
                        Affinity:         instance.GetAffinity(),
2✔
320
                        Tolerations:      instance.GetToleRations(),
2✔
321
                        NodeName:         instance.GetNodeName(),
2✔
322
                        NodeSelector:     instance.GetNodeSelector(),
2✔
323
                        ImagePullSecrets: instance.GetImagePullSecrets(),
2✔
324
                        SecurityContext:  instance.GetSecurityContext(),
2✔
325
                        InitContainers:   instance.GetInitContainers(),
2✔
326
                        Containers: append(
2✔
327
                                []corev1.Container{
2✔
328
                                        *generateEmqxContainer(instance),
2✔
329
                                        *generateReloaderContainer(instance),
2✔
330
                                },
2✔
331
                                instance.GetExtraContainers()...,
2✔
332
                        ),
2✔
333
                        Volumes: instance.GetExtraVolumes(),
2✔
334
                },
2✔
335
        }
2✔
336

2✔
337
        podAnnotation := podTemplate.ObjectMeta.DeepCopy().Annotations
2✔
338
        podAnnotation[handler.ManageContainersAnnotation] = generateAnnotationByContainers(podTemplate.Spec.Containers)
2✔
339
        podTemplate.Annotations = podAnnotation
2✔
340

2✔
341
        sts := &appsv1.StatefulSet{
2✔
342
                TypeMeta: metav1.TypeMeta{
2✔
343
                        APIVersion: "apps/v1",
2✔
344
                        Kind:       "StatefulSet",
2✔
345
                },
2✔
346
                ObjectMeta: metav1.ObjectMeta{
2✔
347
                        Name:        instance.GetName(),
2✔
348
                        Namespace:   instance.GetNamespace(),
2✔
349
                        Labels:      instance.GetLabels(),
2✔
350
                        Annotations: annotations,
2✔
351
                },
2✔
352
                Spec: appsv1.StatefulSetSpec{
2✔
353
                        Replicas: instance.GetReplicas(),
2✔
354
                        Selector: &metav1.LabelSelector{
2✔
355
                                MatchLabels: instance.GetLabels(),
2✔
356
                        },
2✔
357
                        PodManagementPolicy: appsv1.ParallelPodManagement,
2✔
358
                        Template:            podTemplate,
2✔
359
                },
2✔
360
        }
2✔
361

2✔
362
        sts = generateDataVolume(instance, sts)
2✔
363

2✔
364
        return sts
2✔
365
}
366

367
func generateInitPluginList(instance appsv1beta3.Emqx, existPluginList *appsv1beta3.EmqxPluginList) []client.Object {
2✔
368
        matchedPluginList := []appsv1beta3.EmqxPlugin{}
2✔
369
        for _, existPlugin := range existPluginList.Items {
3✔
370
                selector, _ := labels.ValidatedSelectorFromSet(existPlugin.Spec.Selector)
1✔
371
                if selector.Empty() || !selector.Matches(labels.Set(instance.GetLabels())) {
2✔
372
                        continue
1✔
373
                }
374
                matchedPluginList = append(matchedPluginList, existPlugin)
1✔
375
        }
376

377
        isExistPlugin := func(pluginName string, pluginList []appsv1beta3.EmqxPlugin) bool {
4✔
378
                for _, plugin := range pluginList {
3✔
379
                        if plugin.Spec.PluginName == pluginName {
2✔
380
                                return true
1✔
381
                        }
1✔
382
                }
383
                return false
2✔
384
        }
385

386
        pluginList := []client.Object{}
2✔
387
        // Default plugins
2✔
388
        if !isExistPlugin("emqx_rule_engine", matchedPluginList) {
4✔
389
                emqxRuleEngine := &appsv1beta3.EmqxPlugin{
2✔
390
                        TypeMeta: metav1.TypeMeta{
2✔
391
                                APIVersion: "apps.emqx.io/v1beta3",
2✔
392
                                Kind:       "EmqxPlugin",
2✔
393
                        },
2✔
394
                        ObjectMeta: metav1.ObjectMeta{
2✔
395
                                Name:      fmt.Sprintf("%s-rule-engine", instance.GetName()),
2✔
396
                                Namespace: instance.GetNamespace(),
2✔
397
                                Labels:    instance.GetLabels(),
2✔
398
                        },
2✔
399
                        Spec: appsv1beta3.EmqxPluginSpec{
2✔
400
                                PluginName: "emqx_rule_engine",
2✔
401
                                Selector:   instance.GetLabels(),
2✔
402
                                Config:     map[string]string{},
2✔
403
                        },
2✔
404
                }
2✔
405
                pluginList = append(pluginList, emqxRuleEngine)
2✔
406
        }
2✔
407

408
        if !isExistPlugin("emqx_retainer", matchedPluginList) {
4✔
409
                emqxRetainer := &appsv1beta3.EmqxPlugin{
2✔
410
                        TypeMeta: metav1.TypeMeta{
2✔
411
                                APIVersion: "apps.emqx.io/v1beta3",
2✔
412
                                Kind:       "EmqxPlugin",
2✔
413
                        },
2✔
414
                        ObjectMeta: metav1.ObjectMeta{
2✔
415
                                Name:      fmt.Sprintf("%s-retainer", instance.GetName()),
2✔
416
                                Namespace: instance.GetNamespace(),
2✔
417
                                Labels:    instance.GetLabels(),
2✔
418
                        },
2✔
419
                        Spec: appsv1beta3.EmqxPluginSpec{
2✔
420
                                PluginName: "emqx_retainer",
2✔
421
                                Selector:   instance.GetLabels(),
2✔
422
                                Config:     map[string]string{},
2✔
423
                        },
2✔
424
                }
2✔
425
                pluginList = append(pluginList, emqxRetainer)
2✔
426
        }
2✔
427

428
        enterprise, ok := instance.(*appsv1beta3.EmqxEnterprise)
2✔
429
        if ok && !isExistPlugin("emqx_modules", matchedPluginList) {
4✔
430
                emqxModules := &appsv1beta3.EmqxPlugin{
2✔
431
                        TypeMeta: metav1.TypeMeta{
2✔
432
                                APIVersion: "apps.emqx.io/v1beta3",
2✔
433
                                Kind:       "EmqxPlugin",
2✔
434
                        },
2✔
435
                        ObjectMeta: metav1.ObjectMeta{
2✔
436
                                Name:      fmt.Sprintf("%s-modules", instance.GetName()),
2✔
437
                                Namespace: instance.GetNamespace(),
2✔
438
                                Labels:    instance.GetLabels(),
2✔
439
                        },
2✔
440
                        Spec: appsv1beta3.EmqxPluginSpec{
2✔
441
                                PluginName: "emqx_modules",
2✔
442
                                Selector:   instance.GetLabels(),
2✔
443
                                Config:     map[string]string{},
2✔
444
                        },
2✔
445
                }
2✔
446

2✔
447
                if enterprise.Spec.EmqxTemplate.Modules != nil {
4✔
448
                        emqxModules.Spec.Config = map[string]string{
2✔
449
                                "modules.loaded_file": "/mounted/modules/loaded_modules",
2✔
450
                        }
2✔
451
                }
2✔
452

453
                pluginList = append(pluginList, emqxModules)
2✔
454
        }
455

456
        return pluginList
2✔
457
}
458

459
func generateDefaultPluginsConfig(instance appsv1beta3.Emqx) *corev1.ConfigMap {
2✔
460
        names := appsv1beta3.Names{Object: instance}
2✔
461

2✔
462
        cm := &corev1.ConfigMap{
2✔
463
                TypeMeta: metav1.TypeMeta{
2✔
464
                        APIVersion: "v1",
2✔
465
                        Kind:       "ConfigMap",
2✔
466
                },
2✔
467
                ObjectMeta: metav1.ObjectMeta{
2✔
468
                        Labels:    instance.GetLabels(),
2✔
469
                        Namespace: instance.GetNamespace(),
2✔
470
                        Name:      names.PluginsConfig(),
2✔
471
                },
2✔
472
                Data: map[string]string{
2✔
473
                        "emqx_modules.conf":           "",
2✔
474
                        "emqx_management.conf":        "management.listener.http = 8081\n",
2✔
475
                        "emqx_dashboard.conf":         "dashboard.listener.http = 18083\n",
2✔
476
                        "emqx_rule_engine.conf":       "",
2✔
477
                        "emqx_retainer.conf":          "",
2✔
478
                        "emqx_telemetry.conf":         "",
2✔
479
                        "emqx_auth_http.conf":         "auth.http.auth_req.url = http://127.0.0.1:80/mqtt/auth\nauth.http.auth_req.method = post\nauth.http.auth_req.headers.content_type = application/x-www-form-urlencoded\nauth.http.auth_req.params = clientid=%c,username=%u,password=%P\nauth.http.acl_req.url = http://127.0.0.1:80/mqtt/acl\nauth.http.acl_req.method = post\nauth.http.acl_req.headers.content-type = application/x-www-form-urlencoded\nauth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t,mountpoint=%m\nauth.http.timeout = 5s\nauth.http.connect_timeout = 5s\nauth.http.pool_size = 32\nauth.http.enable_pipelining = true\n",
2✔
480
                        "emqx_auth_jwt.conf":          "auth.jwt.secret = emqxsecret\nauth.jwt.from = password\nauth.jwt.verify_claims = off\n",
2✔
481
                        "emqx_auth_ldap.conf":         "auth.ldap.servers = 127.0.0.1\nauth.ldap.port = 389\nauth.ldap.pool = 8\nauth.ldap.bind_dn = cn=root,dc=emqx,dc=io\nauth.ldap.bind_password = public\nauth.ldap.timeout = 30s\nauth.ldap.device_dn = ou=device,dc=emqx,dc=io\nauth.ldap.match_objectclass = mqttUser\nauth.ldap.username.attributetype = uid\nauth.ldap.password.attributetype = userPassword\nauth.ldap.ssl = false\n",
2✔
482
                        "emqx_auth_mnesia.conf":       "",
2✔
483
                        "emqx_auth_mongo.conf":        "auth.mongo.type = single\nauth.mongo.srv_record = false\nauth.mongo.server = 127.0.0.1:27017\nauth.mongo.pool = 8\nauth.mongo.database = mqtt\nauth.mongo.topology.pool_size = 1\nauth.mongo.topology.max_overflow = 0\nauth.mongo.auth_query.password_hash = sha256\nauth.mongo.auth_query.collection = mqtt_user\nauth.mongo.auth_query.password_field = password\nauth.mongo.auth_query.selector = username=%u\nauth.mongo.super_query.collection = mqtt_user\nauth.mongo.super_query.super_field = is_superuser\nauth.mongo.super_query.selector = username=%u\nauth.mongo.acl_query.collection = mqtt_acl\nauth.mongo.acl_query.selector = username=%u\n",
2✔
484
                        "emqx_auth_mysql.conf":        "auth.mysql.server = 127.0.0.1:3306\nauth.mysql.pool = 8\nauth.mysql.database = mqtt\nauth.mysql.auth_query = select password from mqtt_user where username = '%u' limit 1\nauth.mysql.password_hash = sha256\nauth.mysql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1\nauth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'\n",
2✔
485
                        "emqx_auth_pgsql.conf":        "auth.pgsql.server = 127.0.0.1:5432\nauth.pgsql.pool = 8\nauth.pgsql.username = root\nauth.pgsql.database = mqtt\nauth.pgsql.encoding = utf8\nauth.pgsql.ssl = off\nauth.pgsql.auth_query = select password from mqtt_user where username = '%u' limit 1\nauth.pgsql.password_hash = sha256\nauth.pgsql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1\nauth.pgsql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'\n",
2✔
486
                        "emqx_auth_redis.conf":        "auth.redis.type = single\nauth.redis.server = 127.0.0.1:6379\nauth.redis.pool = 8\nauth.redis.database = 0\nauth.redis.auth_cmd = HMGET mqtt_user:%u password\nauth.redis.password_hash = plain\nauth.redis.super_cmd = HGET mqtt_user:%u is_superuser\nauth.redis.acl_cmd = HGETALL mqtt_acl:%u\n",
2✔
487
                        "emqx_backend_cassa.conf":     "backend.ecql.pool1.nodes = 127.0.0.1:9042\nbackend.ecql.pool1.size = 8\nbackend.ecql.pool1.auto_reconnect = 1\nbackend.ecql.pool1.username = cassandra\nbackend.ecql.pool1.password = cassandra\nbackend.ecql.pool1.keyspace = mqtt\nbackend.ecql.pool1.logger = info\nbackend.cassa.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.client.connected.2    = {\"action\": {\"function\": \"on_subscription_lookup\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.session.subscribed.1  = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_fetch\"}, \"offline_opts\": {\"max_returned_count\": 500, \"time_range\": \"2h\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.session.subscribed.2  = {\"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"cql\": [\"delete from acked where clientid = ${clientid} and topic = ${topic}\"]}, \"pool\": \"pool1\"}\nbackend.cassa.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.message.acked.1       = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_acked\"}, \"pool\": \"pool1\"}\n",
2✔
488
                        "emqx_backend_dynamo.conf":    "backend.dynamo.region = us-west-2\nbackend.dynamo.pool1.server = http://localhost:8000\nbackend.dynamo.pool1.pool_size = 8\nbackend.dynamo.pool1.aws_access_key_id = FAKE_AWS_ACCESS_KEY_ID\nbackend.dynamo.pool1.aws_secret_access_key = FAKE_AWS_SECRET_ACCESS_KEY\nbackend.dynamo.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.client.connected.2    = {\"action\": {\"function\": \"on_subscribe_lookup\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.session.subscribed.1  = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_fetch_for_queue\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.session.subscribed.2  = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"function\": \"on_acked_delete\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.message.acked.1       = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_acked_for_queue\"}, \"pool\": \"pool1\"}\n",
2✔
489
                        "emqx_backend_influxdb.conf":  "backend.influxdb.udp.pool1.server = 127.0.0.1:8089\nbackend.influxdb.udp.pool1.pool_size = 8\nbackend.influxdb.http.pool1.server = 127.0.0.1:8086\nbackend.influxdb.http.pool1.pool_size = 8\nbackend.influxdb.http.pool1.precision = ms\nbackend.influxdb.http.pool1.database = mydb\nbackend.influxdb.http.pool1.https_enabled = false\nbackend.influxdb.hook.message.publish.1 = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\n",
2✔
490
                        "emqx_backend_mongo.conf":     "backend.mongo.pool1.type = single\nbackend.mongo.pool1.srv_record = false\nbackend.mongo.pool1.server = 127.0.0.1:27017\nbackend.mongo.pool1.c_pool_size = 8\nbackend.mongo.pool1.database = mqtt\nbackend.mongo.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.client.connected.2    = {\"action\": {\"function\": \"on_subscribe_lookup\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.session.subscribed.1  = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_fetch\"}, \"pool\": \"pool1\", \"offline_opts\": {\"time_range\": \"2h\", \"max_returned_count\": 500}}\nbackend.mongo.hook.session.subscribed.2  = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"function\": \"on_acked_delete\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.message.acked.1       = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_acked\"}, \"pool\": \"pool1\"}\n",
2✔
491
                        "emqx_backend_mysql.conf":     "backend.mysql.pool1.server = 127.0.0.1:3306\nbackend.mysql.pool1.pool_size = 8\nbackend.mysql.pool1.user = root\nbackend.mysql.pool1.password = public\nbackend.mysql.pool1.database = mqtt\nbackend.mysql.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.client.connected.2     = {\"action\": {\"function\": \"on_subscribe_lookup\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.session.subscribed.1  = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_fetch\"}, \"offline_opts\": {\"max_returned_count\": 500, \"time_range\": \"2h\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.session.subscribed.2  = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"sql\": [\"delete from mqtt_acked where clientid = ${clientid} and topic = ${topic}\"]}, \"pool\": \"pool1\"}\nbackend.mysql.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.message.acked.1       = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_acked\"}, \"pool\": \"pool1\"}\n",
2✔
492
                        "emqx_backend_opentsdb.conf":  "backend.opentsdb.pool1.server = 127.0.0.1:4242\nbackend.opentsdb.pool1.pool_size = 8\nbackend.opentsdb.pool1.summary = true\nbackend.opentsdb.pool1.details = false\nbackend.opentsdb.pool1.sync = false\nbackend.opentsdb.pool1.sync_timeout = 0\nbackend.opentsdb.pool1.max_batch_size = 20\nbackend.opentsdb.hook.message.publish.1 = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\n",
2✔
493
                        "emqx_backend_pgsql.conf":     "backend.pgsql.pool1.server = 127.0.0.1:5432\nbackend.pgsql.pool1.pool_size = 8\nbackend.pgsql.pool1.username = root\nbackend.pgsql.pool1.password = public\nbackend.pgsql.pool1.database = mqtt\nbackend.pgsql.pool1.ssl = false\nbackend.pgsql.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.client.connected.2     = {\"action\": {\"function\": \"on_subscribe_lookup\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.session.subscribed.1  = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_fetch\"}, \"offline_opts\": {\"max_returned_count\": 500, \"time_range\": \"2h\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.session.subscribed.2  = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"sql\": \"delete from mqtt_acked where clientid = ${clientid} and topic = ${topic}\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.message.acked.1       = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_acked\"}, \"pool\": \"pool1\"}\n",
2✔
494
                        "emqx_backend_redis.conf":     "backend.redis.pool1.type = single\nbackend.redis.pool1.server = 127.0.0.1:6379\nbackend.redis.pool1.pool_size = 8\nbackend.redis.pool1.database = 0\nbackend.redis.pool1.channel = mqtt_channel\nbackend.redis.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.client.connected.2    = {\"action\": {\"function\": \"on_subscribe_lookup\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.session.subscribed.1  = {\"topic\": \"queue/#\", \"action\": {\"function\": \"on_message_fetch_for_queue\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.session.subscribed.2  = {\"topic\": \"pubsub/#\", \"action\": {\"function\": \"on_message_fetch_for_pubsub\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.session.subscribed.3  = {\"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"commands\": [\"DEL mqtt:acked:${clientid}:${topic}\"]}, \"pool\": \"pool1\"}\nbackend.redis.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"expired_time\" : 3600, \"pool\": \"pool1\"}\nbackend.redis.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"expired_time\" : 3600, \"pool\": \"pool1\"}\nbackend.redis.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.message.acked.1       = {\"topic\": \"queue/#\", \"action\": {\"function\": \"on_message_acked_for_queue\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.message.acked.2       = {\"topic\": \"pubsub/#\", \"action\": {\"function\": \"on_message_acked_for_pubsub\"}, \"pool\": \"pool1\"}\n",
2✔
495
                        "emqx_backend_timescale.conf": "backend.timescale.pool1.server = 127.0.0.1:5432\nbackend.timescale.pool1.pool_size = 8\nbackend.timescale.pool1.username = root\nbackend.timescale.pool1.password = public\nbackend.timescale.pool1.database = mqtt\nbackend.timescale.pool1.ssl = false\nbackend.timescale.hook.message.publish.1 = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\n",
2✔
496
                        "emqx_bridge_kafka.conf":      "bridge.kafka.servers = 127.0.0.1:9092\nbridge.kafka.query_api_versions = true\nbridge.kafka.connection_strategy = per_partition\nbridge.kafka.min_metadata_refresh_interval = 5S\nbridge.kafka.produce = sync\nbridge.kafka.produce.sync_timeout = 3s\nbridge.kafka.sock.sndbuf = 1MB\nbridge.kafka.hook.client.connected.1     = {\"topic\":\"ClientConnected\"}\nbridge.kafka.hook.client.disconnected.1  = {\"topic\":\"ClientDisconnected\"}\nbridge.kafka.hook.session.subscribed.1   = {\"filter\":\"#\", \"topic\":\"SessionSubscribed\"}\nbridge.kafka.hook.session.unsubscribed.1 = {\"filter\":\"#\", \"topic\":\"SessionUnsubscribed\"}\nbridge.kafka.hook.message.publish.1      = {\"filter\":\"#\", \"topic\":\"MessagePublish\"}\nbridge.kafka.hook.message.delivered.1    = {\"filter\":\"#\", \"topic\":\"MessageDelivered\"}\nbridge.kafka.hook.message.acked.1        = {\"filter\":\"#\", \"topic\":\"MessageAcked\"}\n",
2✔
497
                        "emqx_bridge_mqtt.conf":       "bridge.mqtt.aws.address = 127.0.0.1:1883\nbridge.mqtt.aws.proto_ver = mqttv4\nbridge.mqtt.aws.start_type = manual\nbridge.mqtt.aws.clientid = bridge_aws\nbridge.mqtt.aws.clean_start = true\nbridge.mqtt.aws.username = user\nbridge.mqtt.aws.password = passwd\nbridge.mqtt.aws.forwards = topic1/#,topic2/#\nbridge.mqtt.aws.forward_mountpoint = bridge/aws/${node}/\nbridge.mqtt.aws.ssl = off\nbridge.mqtt.aws.cacertfile = etc/certs/cacert.pem\nbridge.mqtt.aws.certfile = etc/certs/client-cert.pem\nbridge.mqtt.aws.keyfile = etc/certs/client-key.pem\nbridge.mqtt.aws.ciphers = TLS_AES_256_GCM_SHA384,TLS_AES_128_GCM_SHA256,TLS_CHACHA20_POLY1305_SHA256,TLS_AES_128_CCM_SHA256,TLS_AES_128_CCM_8_SHA256,ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA\nbridge.mqtt.aws.keepalive = 60s\nbridge.mqtt.aws.tls_versions = tlsv1.3,tlsv1.2,tlsv1.1,tlsv1\nbridge.mqtt.aws.reconnect_interval = 30s\nbridge.mqtt.aws.retry_interval = 20s\nbridge.mqtt.aws.batch_size = 32\nbridge.mqtt.aws.max_inflight_size = 32\nbridge.mqtt.aws.queue.replayq_dir = data/replayq/emqx_aws_bridge/\nbridge.mqtt.aws.queue.replayq_seg_bytes = 10MB\nbridge.mqtt.aws.queue.max_total_size = 5GB\n",
2✔
498
                        "emqx_bridge_pulsar.conf":     "bridge.pulsar.servers = 127.0.0.1:6650\nbridge.pulsar.produce = sync\nbridge.pulsar.sock.sndbuf = 1MB\nbridge.pulsar.hook.client.connected.1     = {\"topic\":\"ClientConnected\"}\nbridge.pulsar.hook.client.disconnected.1  = {\"topic\":\"ClientDisconnected\"}\nbridge.pulsar.hook.session.subscribed.1   = {\"filter\":\"#\", \"topic\":\"SessionSubscribed\"}\nbridge.pulsar.hook.session.unsubscribed.1 = {\"filter\":\"#\", \"topic\":\"SessionUnsubscribed\"}\nbridge.pulsar.hook.message.publish.1      = {\"filter\":\"#\", \"topic\":\"MessagePublish\"}\nbridge.pulsar.hook.message.delivered.1      = {\"filter\":\"#\", \"topic\":\"MessageDelivered\"}\nbridge.pulsar.hook.message.acked.1        = {\"filter\":\"#\", \"topic\":\"MessageAcked\"}\n",
2✔
499
                        "emqx_bridge_rabbit.conf":     "bridge.rabbit.1.server = 127.0.0.1:5672\nbridge.rabbit.1.pool_size = 8\nbridge.rabbit.1.username = guest\nbridge.rabbit.1.password = guest\nbridge.rabbit.1.timeout = 5s\nbridge.rabbit.1.virtual_host = /\nbridge.rabbit.1.heartbeat = 30s\nbridge.rabbit.hook.session.subscribed.1 = {\"action\": \"on_session_subscribed\", \"rabbit\": 1, \"exchange\": \"direct:emqx.subscription\"}\nbridge.rabbit.hook.session.unsubscribed.1 = {\"action\": \"on_session_unsubscribed\", \"rabbit\": 1, \"exchange\": \"direct:emqx.subscription\"}\nbridge.rabbit.hook.message.publish.1 = {\"topic\": \"$SYS/#\", \"action\": \"on_message_publish\", \"rabbit\": 1, \"exchange\": \"topic:emqx.$sys\"}\nbridge.rabbit.hook.message.publish.2 = {\"topic\": \"#\", \"action\": \"on_message_publish\", \"rabbit\": 1, \"exchange\": \"topic:emqx.pub\"}\nbridge.rabbit.hook.message.acked.1 = {\"topic\": \"#\", \"action\": \"on_message_acked\", \"rabbit\": 1, \"exchange\": \"topic:emqx.acked\"}\n",
2✔
500
                        "emqx_bridge_rocket.conf":     "bridge.rocket.servers = 127.0.0.1:9876\nbridge.rocket.refresh_topic_route_interval = 5S\nbridge.rocket.produce = sync\nbridge.rocket.sock.sndbuf = 1MB\nbridge.rocket.hook.client.connected.1     = {\"topic\":\"ClientConnected\"}\nbridge.rocket.hook.client.disconnected.1  = {\"topic\":\"ClientDisconnected\"}\nbridge.rocket.hook.session.subscribed.1   = {\"filter\":\"#\", \"topic\":\"SessionSubscribed\"}\nbridge.rocket.hook.session.unsubscribed.1 = {\"filter\":\"#\", \"topic\":\"SessionUnsubscribed\"}\nbridge.rocket.hook.message.publish.1      = {\"filter\":\"#\", \"topic\":\"MessagePublish\"}\nbridge.rocket.hook.message.delivered.1    = {\"filter\":\"#\", \"topic\":\"MessageDeliver\"}\nbridge.rocket.hook.message.acked.1        = {\"filter\":\"#\", \"topic\":\"MessageAcked\"}\n",
2✔
501
                        "emqx_coap.conf":              "coap.bind.udp.1 = 0.0.0.0:5683\ncoap.enable_stats = off\ncoap.bind.dtls.1 = 0.0.0.0:5684\ncoap.dtls.keyfile = etc/certs/key.pem\ncoap.dtls.certfile = etc/certs/cert.pem\ncoap.dtls.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA\n",
2✔
502
                        "emqx_conf.conf":              "conf.etc.dir.emqx = etc\nconf.etc.dir.emqx.zones = etc\nconf.etc.dir.emqx.listeners = etc\nconf.etc.dir.emqx.sys_mon = etc\n",
2✔
503
                        "emqx_exhook.conf":            "exhook.server.default.url = http://127.0.0.1:9000\n",
2✔
504
                        "emqx_exproto.conf":           "exproto.server.http.port = 9100\nexproto.server.https.port = 9101\nexproto.server.https.cacertfile = etc/certs/cacert.pem\nexproto.server.https.certfile = etc/certs/cert.pem\nexproto.server.https.keyfile = etc/certs/key.pem\nexproto.listener.protoname = tcp://0.0.0.0:7993\nexproto.listener.protoname.connection_handler_url = http://127.0.0.1:9001\nexproto.listener.protoname.acceptors = 8\nexproto.listener.protoname.max_connections = 1024000\nexproto.listener.protoname.max_conn_rate = 1000\nexproto.listener.protoname.active_n = 100\nexproto.listener.protoname.idle_timeout = 30s\nexproto.listener.protoname.access.1 = allow all\nexproto.listener.protoname.backlog = 1024\nexproto.listener.protoname.send_timeout = 15s\nexproto.listener.protoname.send_timeout_close = on\nexproto.listener.protoname.nodelay = true\nexproto.listener.protoname.reuseaddr = true\n",
2✔
505
                        "emqx_gbt32960.conf":          "gbt32960.frame.max_length = 8192\ngbt32960.proto.retx_interval = 8s\ngbt32960.proto.retx_max_times = 3\ngbt32960.proto.message_queue_len = 10\ngbt32960.listener.tcp = 0.0.0.0:7325\ngbt32960.listener.tcp.acceptors = 8\ngbt32960.listener.tcp.max_connections = 1024000\ngbt32960.listener.tcp.max_conn_rate = 1000\ngbt32960.listener.tcp.idle_timeout = 60s\ngbt32960.listener.tcp.active_n = 100\ngbt32960.listener.tcp.zone = external\ngbt32960.listener.tcp.access.1 = allow all\ngbt32960.listener.tcp.backlog = 1024\ngbt32960.listener.tcp.send_timeout = 15s\ngbt32960.listener.tcp.send_timeout_close = on\ngbt32960.listener.tcp.nodelay = true\ngbt32960.listener.tcp.reuseaddr = true\ngbt32960.listener.ssl = 7326\ngbt32960.listener.ssl.acceptors = 16\ngbt32960.listener.ssl.max_connections = 102400\ngbt32960.listener.ssl.max_conn_rate = 500\ngbt32960.listener.ssl.idle_timeout = 60s\ngbt32960.listener.ssl.active_n = 100\ngbt32960.listener.ssl.zone = external\ngbt32960.listener.ssl.access.1 = allow all\ngbt32960.listener.ssl.handshake_timeout = 15s\ngbt32960.listener.ssl.keyfile = etc/certs/key.pem\ngbt32960.listener.ssl.certfile = etc/certs/cert.pem\ngbt32960.listener.ssl.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA\ngbt32960.listener.ssl.reuseaddr = true\n",
2✔
506
                        "emqx_jt808.conf":             "jt808.proto.allow_anonymous = true\njt808.proto.dn_topic = jt808/%c/dn\njt808.proto.up_topic = jt808/%c/up\njt808.conn.idle_timeout = 30s\njt808.conn.enable_stats = on\njt808.frame.max_length = 8192\njt808.listener.tcp = 6207\njt808.listener.tcp.acceptors = 4\njt808.listener.tcp.max_clients = 512\n",
2✔
507
                        "emqx_lua_hook.conf":          "",
2✔
508
                        "emqx_lwm2m.conf":             "lwm2m.lifetime_min = 1s\nlwm2m.lifetime_max = 86400s\nlwm2m.mountpoint = lwm2m/%e/\nlwm2m.topics.command = dn/#\nlwm2m.topics.response = up/resp\nlwm2m.topics.notify = up/notify\nlwm2m.topics.register = up/resp\nlwm2m.topics.update = up/resp\nlwm2m.xml_dir =  etc/lwm2m_xml\nlwm2m.bind.udp.1 = 0.0.0.0:5683\nlwm2m.opts.buffer = 1024KB\nlwm2m.opts.recbuf = 1024KB\nlwm2m.opts.sndbuf = 1024KB\nlwm2m.opts.read_packets = 20\nlwm2m.bind.dtls.1 = 0.0.0.0:5684\nlwm2m.dtls.keyfile = etc/certs/key.pem\nlwm2m.dtls.certfile = etc/certs/cert.pem\nlwm2m.dtls.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA\n",
2✔
509
                        "emqx_prometheus.conf":        "prometheus.push.gateway.server = http://127.0.0.1:9091\nprometheus.interval = 15000\n",
2✔
510
                        "emqx_psk_file.conf":          "psk.file.path = etc/psk.txt\npsk.file.delimiter = :\n",
2✔
511
                        "emqx_recon.conf":             "",
2✔
512
                        "emqx_sasl.conf":              "",
2✔
513
                        "emqx_schema_registry.conf":   "",
2✔
514
                        "emqx_sn.conf":                "mqtt.sn.port = 1884\nmqtt.sn.advertise_duration = 15m\nmqtt.sn.gateway_id = 1\nmqtt.sn.enable_stats = off\nmqtt.sn.enable_qos3 = off\nmqtt.sn.idle_timeout = 30s\nmqtt.sn.predefined.topic.0 = reserved\nmqtt.sn.predefined.topic.1 = /predefined/topic/name/hello\nmqtt.sn.predefined.topic.2 = /predefined/topic/name/nice\nmqtt.sn.username = mqtt_sn_user\nmqtt.sn.password = abc\n",
2✔
515
                        "emqx_stomp.conf":             "stomp.listener = 61613\nstomp.listener.acceptors = 4\nstomp.listener.max_connections = 512\nstomp.default_user.login = guest\nstomp.default_user.passcode = guest\nstomp.allow_anonymous = true\nstomp.frame.max_headers = 10\nstomp.frame.max_header_length = 1024\nstomp.frame.max_body_length = 8192\n",
2✔
516
                        "emqx_tcp.conf":               "tcp.proto.idle_timeout = 15s\ntcp.proto.up_topic = tcp/%c/up\ntcp.proto.dn_topic = tcp/%c/dn\ntcp.proto.max_packet_size = 65535\ntcp.proto.enable_stats = on\ntcp.proto.force_gc_policy = 1000|1MB\ntcp.listener.external = 0.0.0.0:8090\ntcp.listener.external.acceptors = 8\ntcp.listener.external.max_connections = 1024000\ntcp.listener.external.max_conn_rate = 1000\ntcp.listener.external.active_n = 100\ntcp.listener.external.access.1 = allow all\ntcp.listener.external.backlog = 1024\ntcp.listener.external.send_timeout = 15s\ntcp.listener.external.send_timeout_close = on\ntcp.listener.external.nodelay = true\ntcp.listener.external.reuseaddr = true\ntcp.listener.ssl.external = 0.0.0.0:8091\ntcp.listener.ssl.external.acceptors = 8\ntcp.listener.ssl.external.max_connections = 1024000\ntcp.listener.ssl.external.max_conn_rate = 1000\ntcp.listener.ssl.external.active_n = 100\ntcp.listener.ssl.external.access.1 = allow all\ntcp.listener.ssl.external.handshake_timeout = 15s\ntcp.listener.ssl.external.keyfile = etc/certs/key.pem\ntcp.listener.ssl.external.certfile = etc/certs/cert.pem\ntcp.listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA\ntcp.listener.ssl.external.backlog = 1024\ntcp.listener.ssl.external.send_timeout = 15s\ntcp.listener.ssl.external.send_timeout_close = on\ntcp.listener.ssl.external.nodelay = true\ntcp.listener.ssl.external.reuseaddr = true\n",
2✔
517
                        "emqx_web_hook.conf":          "web.hook.url = http://127.0.0.1:80\nweb.hook.headers.content-type = application/json\nweb.hook.body.encoding_of_payload_field = plain\nweb.hook.pool_size = 32\nweb.hook.enable_pipelining = true\n",
2✔
518
                },
2✔
519
        }
2✔
520

2✔
521
        return cm
2✔
522
}
2✔
523

524
func generateHeadlessSvc(instance appsv1beta3.Emqx) *corev1.Service {
2✔
525
        names := appsv1beta3.Names{Object: instance}
2✔
526

2✔
527
        headlessSvc := &corev1.Service{
2✔
528
                TypeMeta: metav1.TypeMeta{
2✔
529
                        APIVersion: "v1",
2✔
530
                        Kind:       "Service",
2✔
531
                },
2✔
532
                ObjectMeta: metav1.ObjectMeta{
2✔
533
                        Labels:    instance.GetLabels(),
2✔
534
                        Name:      names.HeadlessSvc(),
2✔
535
                        Namespace: instance.GetNamespace(),
2✔
536
                },
2✔
537
                Spec: corev1.ServiceSpec{
2✔
538
                        Selector:                 instance.GetLabels(),
2✔
539
                        ClusterIP:                corev1.ClusterIPNone,
2✔
540
                        PublishNotReadyAddresses: true,
2✔
541
                },
2✔
542
        }
2✔
543

2✔
544
        compile := regexp.MustCompile(".*management.*")
2✔
545
        for _, port := range instance.GetServiceTemplate().Spec.Ports {
4✔
546
                if compile.MatchString(port.Name) {
4✔
547
                        // Headless services must not set nodePort
2✔
548
                        headlessSvc.Spec.Ports = append(headlessSvc.Spec.Ports, corev1.ServicePort{
2✔
549
                                Name:        port.Name,
2✔
550
                                Protocol:    port.Protocol,
2✔
551
                                AppProtocol: port.AppProtocol,
2✔
552
                                TargetPort:  port.TargetPort,
2✔
553
                                Port:        port.Port,
2✔
554
                        })
2✔
555
                }
2✔
556
        }
557
        return headlessSvc
2✔
558
}
559

560
func generateSvc(instance appsv1beta3.Emqx) *corev1.Service {
2✔
561
        return &corev1.Service{
2✔
562
                TypeMeta: metav1.TypeMeta{
2✔
563
                        APIVersion: "v1",
2✔
564
                        Kind:       "Service",
2✔
565
                },
2✔
566
                ObjectMeta: instance.GetServiceTemplate().ObjectMeta,
2✔
567
                Spec:       instance.GetServiceTemplate().Spec,
2✔
568
        }
2✔
569
}
2✔
570

571
func generateAcl(instance appsv1beta3.Emqx) *corev1.ConfigMap {
2✔
572
        if len(instance.GetACL()) == 0 {
4✔
573
                return nil
2✔
574
        }
2✔
575
        names := appsv1beta3.Names{Object: instance}
2✔
576

2✔
577
        var aclString string
2✔
578
        for _, rule := range instance.GetACL() {
4✔
579
                aclString += fmt.Sprintf("%s\n", rule)
2✔
580
        }
2✔
581
        cm := &corev1.ConfigMap{
2✔
582
                TypeMeta: metav1.TypeMeta{
2✔
583
                        APIVersion: "v1",
2✔
584
                        Kind:       "ConfigMap",
2✔
585
                },
2✔
586
                ObjectMeta: metav1.ObjectMeta{
2✔
587
                        Labels:    instance.GetLabels(),
2✔
588
                        Namespace: instance.GetNamespace(),
2✔
589
                        Name:      names.ACL(),
2✔
590
                },
2✔
591
                Data: map[string]string{"acl.conf": aclString},
2✔
592
        }
2✔
593
        return cm
2✔
594
}
595

596
func generateLoadedModules(instance appsv1beta3.Emqx) *corev1.ConfigMap {
2✔
597
        names := appsv1beta3.Names{Object: instance}
2✔
598
        var loadedModulesString string
2✔
599
        switch obj := instance.(type) {
2✔
600
        case *appsv1beta3.EmqxBroker:
2✔
601
                modules := &appsv1beta3.EmqxBrokerModuleList{
2✔
602
                        Items: obj.Spec.EmqxTemplate.Modules,
2✔
603
                }
2✔
604
                loadedModulesString = modules.String()
2✔
605
                if loadedModulesString == "" {
3✔
606
                        return nil
1✔
607
                }
1✔
608
        case *appsv1beta3.EmqxEnterprise:
2✔
609
                modules := &appsv1beta3.EmqxEnterpriseModuleList{
2✔
610
                        Items: obj.Spec.EmqxTemplate.Modules,
2✔
611
                }
2✔
612
                // for enterprise, if modules is empty, don't create configmap
2✔
613
                loadedModulesString = modules.String()
2✔
614
                if loadedModulesString == "" {
3✔
615
                        return nil
1✔
616
                }
1✔
617
        }
618

619
        cm := &corev1.ConfigMap{
2✔
620
                TypeMeta: metav1.TypeMeta{
2✔
621
                        APIVersion: "v1",
2✔
622
                        Kind:       "ConfigMap",
2✔
623
                },
2✔
624
                ObjectMeta: metav1.ObjectMeta{
2✔
625
                        Labels:    instance.GetLabels(),
2✔
626
                        Namespace: instance.GetNamespace(),
2✔
627
                        Name:      names.LoadedModules(),
2✔
628
                },
2✔
629
                Data: map[string]string{"loaded_modules": loadedModulesString},
2✔
630
        }
2✔
631

2✔
632
        return cm
2✔
633
}
634

635
func generateLicense(emqxEnterprise *appsv1beta3.EmqxEnterprise) *corev1.Secret {
2✔
636
        names := appsv1beta3.Names{Object: emqxEnterprise}
2✔
637
        license := emqxEnterprise.GetLicense()
2✔
638
        if len(license.Data) == 0 && len(license.StringData) == 0 {
3✔
639
                return nil
1✔
640
        }
1✔
641

642
        secret := &corev1.Secret{
2✔
643
                TypeMeta: metav1.TypeMeta{
2✔
644
                        APIVersion: "v1",
2✔
645
                        Kind:       "Secret",
2✔
646
                },
2✔
647
                ObjectMeta: metav1.ObjectMeta{
2✔
648
                        Labels:    emqxEnterprise.GetLabels(),
2✔
649
                        Namespace: emqxEnterprise.GetNamespace(),
2✔
650
                        Name:      names.License(),
2✔
651
                },
2✔
652
                Type: corev1.SecretTypeOpaque,
2✔
653
                Data: map[string][]byte{"emqx.lic": emqxEnterprise.GetLicense().Data},
2✔
654
        }
2✔
655
        if emqxEnterprise.GetLicense().StringData != "" {
2✔
656
                secret.StringData = map[string]string{"emqx.lic": emqxEnterprise.GetLicense().StringData}
×
657
        }
×
658
        return secret
2✔
659
}
660

661
func generateDataVolume(instance appsv1beta3.Emqx, sts *appsv1.StatefulSet) *appsv1.StatefulSet {
2✔
662
        names := appsv1beta3.Names{Object: instance}
2✔
663
        dataName := names.Data()
2✔
664

2✔
665
        if reflect.ValueOf(instance.GetPersistent()).IsZero() {
4✔
666
                sts.Spec.Template.Spec.Volumes = append(
2✔
667
                        sts.Spec.Template.Spec.Volumes,
2✔
668
                        corev1.Volume{
2✔
669
                                Name: dataName,
2✔
670
                                VolumeSource: corev1.VolumeSource{
2✔
671
                                        EmptyDir: &corev1.EmptyDirVolumeSource{},
2✔
672
                                },
2✔
673
                        },
2✔
674
                )
2✔
675
        } else {
4✔
676
                sts.Spec.VolumeClaimTemplates = append(
2✔
677
                        sts.Spec.VolumeClaimTemplates,
2✔
678
                        generateVolumeClaimTemplate(instance, dataName),
2✔
679
                )
2✔
680
        }
2✔
681

682
        emqxContainerIndex := findContinerIndex(sts, handler.EmqxContainerName)
2✔
683
        sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts = append(
2✔
684
                sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts,
2✔
685
                corev1.VolumeMount{
2✔
686
                        Name:      dataName,
2✔
687
                        MountPath: "/opt/emqx/data",
2✔
688
                },
2✔
689
        )
2✔
690

2✔
691
        ReloaderContainerIndex := findContinerIndex(sts, ReloaderContainerName)
2✔
692
        sts.Spec.Template.Spec.Containers[ReloaderContainerIndex].VolumeMounts = sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts
2✔
693
        return sts
2✔
694
}
695

696
func generateVolumeClaimTemplate(instance appsv1beta3.Emqx, Name string) corev1.PersistentVolumeClaim {
2✔
697
        template := instance.GetPersistent()
2✔
698
        pvc := corev1.PersistentVolumeClaim{
2✔
699
                ObjectMeta: metav1.ObjectMeta{
2✔
700
                        Name:      Name,
2✔
701
                        Namespace: instance.GetNamespace(),
2✔
702
                },
2✔
703
                Spec: template,
2✔
704
        }
2✔
705
        if pvc.Spec.AccessModes == nil {
3✔
706
                pvc.Spec.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}
1✔
707
        }
1✔
708
        return pvc
2✔
709
}
710

711
func generateReloaderContainer(instance appsv1beta3.Emqx) *corev1.Container {
2✔
712
        return &corev1.Container{
2✔
713
                Name:            ReloaderContainerName,
2✔
714
                Image:           instance.GetReloaderImage(),
2✔
715
                ImagePullPolicy: instance.GetImagePullPolicy(),
2✔
716
                Args: []string{
2✔
717
                        "-u", instance.GetUsername(),
2✔
718
                        "-p", instance.GetPassword(),
2✔
719
                        "-P", appsv1beta3.DefaultManagementPort,
2✔
720
                },
2✔
721
        }
2✔
722
}
2✔
723

724
func generateEmqxContainer(instance appsv1beta3.Emqx) *corev1.Container {
2✔
725
        return &corev1.Container{
2✔
726
                Name:            handler.EmqxContainerName,
2✔
727
                Image:           instance.GetImage(),
2✔
728
                ImagePullPolicy: instance.GetImagePullPolicy(),
2✔
729
                Resources:       instance.GetResource(),
2✔
730
                Env: mergeEnvAndConfig(instance, []corev1.EnvVar{
2✔
731
                        {
2✔
732
                                Name:  "EMQX_MANAGEMENT__DEFAULT_APPLICATION__ID",
2✔
733
                                Value: instance.GetUsername(),
2✔
734
                        },
2✔
735
                        {
2✔
736
                                Name:  "EMQX_DASHBOARD__DEFAULT_USER__LOGIN",
2✔
737
                                Value: instance.GetUsername(),
2✔
738
                        },
2✔
739
                        {
2✔
740
                                Name:  "EMQX_MANAGEMENT__DEFAULT_APPLICATION__SECRET",
2✔
741
                                Value: instance.GetPassword(),
2✔
742
                        },
2✔
743
                        {
2✔
744
                                Name:  "EMQX_DASHBOARD__DEFAULT_USER__PASSWORD",
2✔
745
                                Value: instance.GetPassword(),
2✔
746
                        },
2✔
747
                }...),
2✔
748
                Args:           instance.GetArgs(),
2✔
749
                ReadinessProbe: instance.GetReadinessProbe(),
2✔
750
                LivenessProbe:  instance.GetLivenessProbe(),
2✔
751
                StartupProbe:   instance.GetStartupProbe(),
2✔
752
                VolumeMounts:   instance.GetExtraVolumeMounts(),
2✔
753
        }
2✔
754
}
2✔
755

756
func updateEnvAndVolumeForSts(sts *appsv1.StatefulSet, envVar corev1.EnvVar, volumeMount corev1.VolumeMount, volume corev1.Volume) *appsv1.StatefulSet {
2✔
757
        emqxContainerIndex := findContinerIndex(sts, handler.EmqxContainerName)
2✔
758
        reloaderContainerIndex := findContinerIndex(sts, ReloaderContainerName)
2✔
759

2✔
760
        isNotExistVolume := func(volume corev1.Volume) bool {
4✔
761
                for _, v := range sts.Spec.Template.Spec.Volumes {
3✔
762
                        if v.Name == volume.Name {
1✔
763
                                return false
×
764
                        }
×
765
                }
766
                return true
2✔
767
        }
768

769
        isNotExistVolumeVolumeMount := func(volumeMount corev1.VolumeMount) bool {
4✔
770
                for _, v := range sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts {
3✔
771
                        if v.Name == volumeMount.Name {
1✔
772
                                return false
×
773
                        }
×
774
                }
775
                return true
2✔
776
        }
777

778
        isNotExistEnv := func(envVar corev1.EnvVar) bool {
4✔
779
                for _, v := range sts.Spec.Template.Spec.Containers[emqxContainerIndex].Env {
3✔
780
                        if v.Name == envVar.Name {
1✔
781
                                return false
×
782
                        }
×
783
                }
784
                return true
2✔
785
        }
786

787
        if isNotExistVolume(volume) {
4✔
788
                sts.Spec.Template.Spec.Volumes = append(
2✔
789
                        sts.Spec.Template.Spec.Volumes,
2✔
790
                        volume,
2✔
791
                )
2✔
792
        }
2✔
793

794
        if isNotExistVolumeVolumeMount(volumeMount) {
4✔
795
                sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts = append(
2✔
796
                        sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts,
2✔
797
                        volumeMount,
2✔
798
                )
2✔
799
        }
2✔
800

801
        if isNotExistEnv(envVar) {
4✔
802
                sts.Spec.Template.Spec.Containers[emqxContainerIndex].Env = append(
2✔
803
                        sts.Spec.Template.Spec.Containers[emqxContainerIndex].Env,
2✔
804
                        envVar,
2✔
805
                )
2✔
806
        }
2✔
807

808
        sts.Spec.Template.Spec.Containers[reloaderContainerIndex].VolumeMounts = sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts
2✔
809
        sts.Spec.Template.Spec.Containers[reloaderContainerIndex].Env = sts.Spec.Template.Spec.Containers[emqxContainerIndex].Env
2✔
810

2✔
811
        return sts
2✔
812
}
813

814
func mergeEnvAndConfig(instance appsv1beta3.Emqx, extraEnvs ...corev1.EnvVar) []corev1.EnvVar {
2✔
815
        lookup := func(name string, envs []corev1.EnvVar) bool {
4✔
816
                for _, env := range envs {
4✔
817
                        if env.Name == name {
3✔
818
                                return true
1✔
819
                        }
1✔
820
                }
821
                return false
2✔
822
        }
823

824
        envs := append(instance.GetEnv(), extraEnvs...)
2✔
825
        emqxConfig := instance.GetEmqxConfig()
2✔
826

2✔
827
        for k, v := range emqxConfig {
4✔
828
                key := fmt.Sprintf("EMQX_%s", strings.ToUpper(strings.ReplaceAll(k, ".", "__")))
2✔
829
                if !lookup(key, envs) {
4✔
830
                        envs = append(envs, corev1.EnvVar{Name: key, Value: v})
2✔
831
                }
2✔
832
        }
833

834
        sort.Slice(envs, func(i, j int) bool {
4✔
835
                return envs[i].Name < envs[j].Name
2✔
836
        })
2✔
837
        return envs
2✔
838
}
839

840
func findContinerIndex(sts *appsv1.StatefulSet, containerName string) int {
2✔
841
        for k, c := range sts.Spec.Template.Spec.Containers {
4✔
842
                if c.Name == containerName {
4✔
843
                        return k
2✔
844
                }
2✔
845
        }
846
        return -1
×
847
}
848

849
func generateAnnotationByContainers(containers []corev1.Container) string {
2✔
850
        containerNames := []string{}
2✔
851
        for _, c := range containers {
4✔
852
                containerNames = append(containerNames, c.Name)
2✔
853
        }
2✔
854
        return strings.Join(containerNames, ",")
2✔
855
}
856

857
func updateEmqxStatus(instance appsv1beta3.Emqx, emqxNodes []appsv1beta3.EmqxNode) appsv1beta3.Emqx {
2✔
858
        status := instance.GetStatus()
2✔
859
        status.Replicas = *instance.GetReplicas()
2✔
860
        if emqxNodes != nil {
4✔
861
                readyReplicas := int32(0)
2✔
862
                for _, node := range emqxNodes {
4✔
863
                        if node.NodeStatus == "Running" {
4✔
864
                                readyReplicas++
2✔
865
                        }
2✔
866
                }
867
                status.ReadyReplicas = readyReplicas
2✔
868
                status.EmqxNodes = emqxNodes
2✔
869
        }
870

871
        var cond *appsv1beta3.Condition
2✔
872
        if status.Replicas == status.ReadyReplicas {
4✔
873
                cond = appsv1beta3.NewCondition(
2✔
874
                        appsv1beta3.ConditionRunning,
2✔
875
                        corev1.ConditionTrue,
2✔
876
                        "ClusterReady",
2✔
877
                        "All resources are ready",
2✔
878
                )
2✔
879
        } else {
4✔
880
                cond = appsv1beta3.NewCondition(
2✔
881
                        appsv1beta3.ConditionRunning,
2✔
882
                        corev1.ConditionFalse,
2✔
883
                        "ClusterNotReady",
2✔
884
                        "Some nodes are not ready",
2✔
885
                )
2✔
886
        }
2✔
887
        status.SetCondition(*cond)
2✔
888
        instance.SetStatus(status)
2✔
889
        return instance
2✔
890
}
891

892
func updatePluginsConfigForSts(sts *appsv1.StatefulSet, PluginsConfig *corev1.ConfigMap) *appsv1.StatefulSet {
2✔
893
        return updateEnvAndVolumeForSts(sts,
2✔
894
                corev1.EnvVar{
2✔
895
                        Name:  "EMQX_PLUGINS__ETC_DIR",
2✔
896
                        Value: "/mounted/plugins/etc",
2✔
897
                },
2✔
898
                corev1.VolumeMount{
2✔
899
                        Name:      PluginsConfig.Name,
2✔
900
                        MountPath: "/mounted/plugins/etc",
2✔
901
                },
2✔
902
                corev1.Volume{
2✔
903
                        Name: PluginsConfig.Name,
2✔
904
                        VolumeSource: corev1.VolumeSource{
2✔
905
                                ConfigMap: &corev1.ConfigMapVolumeSource{
2✔
906
                                        LocalObjectReference: corev1.LocalObjectReference{
2✔
907
                                                Name: PluginsConfig.Name,
2✔
908
                                        },
2✔
909
                                },
2✔
910
                        },
2✔
911
                },
2✔
912
        )
2✔
913
}
2✔
914

915
func updateAclForSts(sts *appsv1.StatefulSet, acl *corev1.ConfigMap) *appsv1.StatefulSet {
2✔
916
        if sts.Spec.Template.Annotations == nil {
3✔
917
                sts.Spec.Template.Annotations = make(map[string]string)
1✔
918
        }
1✔
919
        sts.Spec.Template.Annotations["ACL/Base64EncodeConfig"] = base64.StdEncoding.EncodeToString([]byte(acl.Data["acl.conf"]))
2✔
920
        return updateEnvAndVolumeForSts(sts,
2✔
921
                corev1.EnvVar{
2✔
922
                        Name:  "EMQX_ACL_FILE",
2✔
923
                        Value: "/mounted/acl/acl.conf",
2✔
924
                },
2✔
925
                corev1.VolumeMount{
2✔
926
                        Name:      acl.Name,
2✔
927
                        MountPath: "/mounted/acl",
2✔
928
                },
2✔
929
                corev1.Volume{
2✔
930
                        Name: acl.Name,
2✔
931
                        VolumeSource: corev1.VolumeSource{
2✔
932
                                ConfigMap: &corev1.ConfigMapVolumeSource{
2✔
933
                                        LocalObjectReference: corev1.LocalObjectReference{
2✔
934
                                                Name: acl.Name,
2✔
935
                                        },
2✔
936
                                },
2✔
937
                        },
2✔
938
                },
2✔
939
        )
2✔
940
}
941

942
func updateLoadedModulesForSts(sts *appsv1.StatefulSet, loadedModules *corev1.ConfigMap) *appsv1.StatefulSet {
2✔
943
        if sts.Spec.Template.Annotations == nil {
3✔
944
                sts.Spec.Template.Annotations = make(map[string]string)
1✔
945
        }
1✔
946
        sts.Spec.Template.Annotations["LoadedModules/Base64EncodeConfig"] = base64.StdEncoding.EncodeToString([]byte(loadedModules.Data["loaded_modules"]))
2✔
947
        return updateEnvAndVolumeForSts(sts,
2✔
948
                corev1.EnvVar{
2✔
949
                        Name:  "EMQX_MODULES__LOADED_FILE",
2✔
950
                        Value: "/mounted/modules/loaded_modules",
2✔
951
                },
2✔
952
                corev1.VolumeMount{
2✔
953
                        Name:      loadedModules.Name,
2✔
954
                        MountPath: "/mounted/modules",
2✔
955
                },
2✔
956
                corev1.Volume{
2✔
957
                        Name: loadedModules.Name,
2✔
958
                        VolumeSource: corev1.VolumeSource{
2✔
959
                                ConfigMap: &corev1.ConfigMapVolumeSource{
2✔
960
                                        LocalObjectReference: corev1.LocalObjectReference{
2✔
961
                                                Name: loadedModules.Name,
2✔
962
                                        },
2✔
963
                                },
2✔
964
                        },
2✔
965
                },
2✔
966
        )
2✔
967
}
968

969
func updateLicenseForsts(sts *appsv1.StatefulSet, license *corev1.Secret) *appsv1.StatefulSet {
2✔
970
        fileName := "emqx.lic"
2✔
971
        for k := range license.Data {
4✔
972
                fileName = k
2✔
973
                break
2✔
974
        }
975

976
        return updateEnvAndVolumeForSts(sts,
2✔
977
                corev1.EnvVar{
2✔
978
                        Name:  "EMQX_LICENSE__FILE",
2✔
979
                        Value: filepath.Join("/mounted/license", fileName),
2✔
980
                },
2✔
981
                corev1.VolumeMount{
2✔
982
                        Name:      license.Name,
2✔
983
                        MountPath: "/mounted/license",
2✔
984
                        ReadOnly:  true,
2✔
985
                },
2✔
986
                corev1.Volume{
2✔
987
                        Name: license.Name,
2✔
988
                        VolumeSource: corev1.VolumeSource{
2✔
989
                                Secret: &corev1.SecretVolumeSource{
2✔
990
                                        SecretName: license.Name,
2✔
991
                                },
2✔
992
                        },
2✔
993
                },
2✔
994
        )
2✔
995
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc