• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 8254

29 Nov 2023 04:19PM UTC coverage: 53.771% (-0.04%) from 53.808%
8254

push

travis-pro

web-flow
Merge pull request #1205 from noironetworks/hpp-subscrib-fix

Fix for hpp subscription

9 of 11 new or added lines in 1 file covered. (81.82%)

11 existing lines in 3 files now uncovered.

13269 of 24677 relevant lines covered (53.77%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.89
/pkg/controller/controller.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "encoding/json"
19
        "fmt"
20
        "net"
21
        "os"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/sirupsen/logrus"
29
        "github.com/yl2chen/cidranger"
30
        "golang.org/x/time/rate"
31

32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        "k8s.io/apimachinery/pkg/labels"
35
        "k8s.io/apimachinery/pkg/util/wait"
36
        "k8s.io/client-go/kubernetes"
37
        "k8s.io/client-go/tools/cache"
38
        "k8s.io/client-go/util/workqueue"
39

40
        "github.com/noironetworks/aci-containers/pkg/apicapi"
41
        fabattv1 "github.com/noironetworks/aci-containers/pkg/fabricattachment/apis/aci.fabricattachment/v1"
42
        "github.com/noironetworks/aci-containers/pkg/index"
43
        "github.com/noironetworks/aci-containers/pkg/ipam"
44
        istiov1 "github.com/noironetworks/aci-containers/pkg/istiocrd/apis/aci.istio/v1"
45
        "github.com/noironetworks/aci-containers/pkg/metadata"
46
        nodeinfo "github.com/noironetworks/aci-containers/pkg/nodeinfo/apis/aci.snat/v1"
47
        rdConfig "github.com/noironetworks/aci-containers/pkg/rdconfig/apis/aci.snat/v1"
48
        snatglobalinfo "github.com/noironetworks/aci-containers/pkg/snatglobalinfo/apis/aci.snat/v1"
49
        "github.com/noironetworks/aci-containers/pkg/util"
50
)
51

52
type podUpdateFunc func(*v1.Pod) (*v1.Pod, error)
53
type nodeUpdateFunc func(*v1.Node) (*v1.Node, error)
54
type serviceUpdateFunc func(*v1.Service) (*v1.Service, error)
55

56
type AciController struct {
57
        log    *logrus.Logger
58
        config *ControllerConfig
59
        env    Environment
60

61
        defaultEg string
62
        defaultSg string
63

64
        unitTestMode bool
65

66
        podQueue             workqueue.RateLimitingInterface
67
        netPolQueue          workqueue.RateLimitingInterface
68
        qosQueue             workqueue.RateLimitingInterface
69
        serviceQueue         workqueue.RateLimitingInterface
70
        snatQueue            workqueue.RateLimitingInterface
71
        netflowQueue         workqueue.RateLimitingInterface
72
        erspanQueue          workqueue.RateLimitingInterface
73
        snatNodeInfoQueue    workqueue.RateLimitingInterface
74
        rdConfigQueue        workqueue.RateLimitingInterface
75
        istioQueue           workqueue.RateLimitingInterface
76
        nodeFabNetAttQueue   workqueue.RateLimitingInterface
77
        staticFabNetAttQueue workqueue.RateLimitingInterface
78
        nadVlanMapQueue      workqueue.RateLimitingInterface
79
        fabricVlanPoolQueue  workqueue.RateLimitingInterface
80

81
        namespaceIndexer        cache.Indexer
82
        namespaceInformer       cache.Controller
83
        podIndexer              cache.Indexer
84
        podInformer             cache.Controller
85
        endpointsIndexer        cache.Indexer
86
        endpointsInformer       cache.Controller
87
        serviceIndexer          cache.Indexer
88
        serviceInformer         cache.Controller
89
        replicaSetIndexer       cache.Indexer
90
        replicaSetInformer      cache.Controller
91
        deploymentIndexer       cache.Indexer
92
        deploymentInformer      cache.Controller
93
        nodeIndexer             cache.Indexer
94
        nodeInformer            cache.Controller
95
        networkPolicyIndexer    cache.Indexer
96
        networkPolicyInformer   cache.Controller
97
        snatIndexer             cache.Indexer
98
        snatInformer            cache.Controller
99
        snatNodeInfoIndexer     cache.Indexer
100
        snatNodeInformer        cache.Controller
101
        crdInformer             cache.Controller
102
        rdConfigInformer        cache.Controller
103
        rdConfigIndexer         cache.Indexer
104
        qosIndexer              cache.Indexer
105
        qosInformer             cache.Controller
106
        netflowIndexer          cache.Indexer
107
        netflowInformer         cache.Controller
108
        erspanIndexer           cache.Indexer
109
        erspanInformer          cache.Controller
110
        nodePodIfIndexer        cache.Indexer
111
        nodePodIfInformer       cache.Controller
112
        istioIndexer            cache.Indexer
113
        istioInformer           cache.Controller
114
        endpointSliceIndexer    cache.Indexer
115
        endpointSliceInformer   cache.Controller
116
        snatCfgInformer         cache.Controller
117
        updatePod               podUpdateFunc
118
        updateNode              nodeUpdateFunc
119
        updateServiceStatus     serviceUpdateFunc
120
        nodeFabNetAttInformer   cache.SharedIndexInformer
121
        staticFabNetAttInformer cache.SharedIndexInformer
122
        nadVlanMapInformer      cache.SharedIndexInformer
123
        fabricVlanPoolInformer  cache.SharedIndexInformer
124

125
        indexMutex sync.Mutex
126

127
        configuredPodNetworkIps *netIps
128
        podNetworkIps           *netIps
129
        serviceIps              *ipam.IpCache
130
        staticServiceIps        *netIps
131
        nodeServiceIps          *netIps
132

133
        // index of pods matched by deployments
134
        depPods *index.PodSelectorIndex
135
        // index of pods matched by network policies
136
        netPolPods *index.PodSelectorIndex
137
        // index of pods matched by network policy ingress rules
138
        netPolIngressPods *index.PodSelectorIndex
139
        // index of pods matched by network policy egress rules
140
        netPolEgressPods *index.PodSelectorIndex
141
        // index of IP addresses contained in endpoints objects
142
        endpointsIpIndex cidranger.Ranger
143
        // index of service target ports
144
        targetPortIndex map[string]*portIndexEntry
145
        // index of ip blocks referenced by network policy egress rules
146
        netPolSubnetIndex cidranger.Ranger
147
        // index of pods matched by erspan policies
148
        erspanPolPods *index.PodSelectorIndex
149

150
        apicConn *apicapi.ApicConnection
151

152
        nodeServiceMetaCache map[string]*nodeServiceMeta
153
        nodeACIPod           map[string]aciPodAnnot
154
        nodeACIPodAnnot      map[string]aciPodAnnot
155
        nodeOpflexDevice     map[string]apicapi.ApicSlice
156
        nodePodNetCache      map[string]*nodePodNetMeta
157
        serviceMetaCache     map[string]*serviceMeta
158
        snatPolicyCache      map[string]*ContSnatPolicy
159
        delayedEpSlices      []*DelayedEpSlice
160
        snatServices         map[string]bool
161
        snatNodeInfoCache    map[string]*nodeinfo.NodeInfo
162
        rdConfigCache        map[string]*rdConfig.RdConfig
163
        rdConfigSubnetCache  map[string]*rdConfig.RdConfigSpec
164
        istioCache           map[string]*istiov1.AciIstioOperator
165
        podIftoEp            map[string]*EndPointData
166
        // Node Name and Policy Name
167
        snatGlobalInfoCache map[string]map[string]*snatglobalinfo.GlobalInfo
168
        nodeSyncEnabled     bool
169
        serviceSyncEnabled  bool
170
        snatSyncEnabled     bool
171
        tunnelGetter        *tunnelState
172
        syncQueue           workqueue.RateLimitingInterface
173
        syncProcessors      map[string]func() bool
174
        serviceEndPoints    ServiceEndPointType
175
        crdHandlers         map[string]func(*AciController, <-chan struct{})
176
        stopCh              <-chan struct{}
177
        //index of containerportname to ctrPortNameEntry
178
        ctrPortNameCache map[string]*ctrPortNameEntry
179
        // named networkPolicies
180
        nmPortNp map[string]bool
181
        //maps network policy hash to hpp
182
        hppRef map[string]hppReference
183
        // cache to look for Epg DNs which are bound to Vmm domain
184
        cachedEpgDns             []string
185
        vmmClusterFaultSupported bool
186
        additionalNetworkCache   map[string]*AdditionalNetworkMeta
187
        //Used in Shared mode
188
        sharedEncapCache map[int]*sharedEncapData
189
        // vlan to aepList
190
        sharedEncapSfnaCache    map[int]map[string]bool
191
        sharedEncapSfnaVlanMap  map[int]map[string]bool
192
        sharedEncapSfnaLabelMap map[string]map[string]bool
193
        // nadVlanMap encapLabel to vlan
194
        sharedEncapLabelMap map[string][]int
195
        lldpIfCache         map[string]string
196
        globalVlanConfig    globalVlanConfig
197
        fabricVlanPoolMap   map[string]map[string]string
198
}
199

200
type sharedEncapData struct {
201
        //node to NAD to pods
202
        Pods   map[string]map[string][]string
203
        NetRef map[string]*AdditionalNetworkMeta
204
}
205

206
type globalVlanConfig struct {
207
        SharedPhysDom apicapi.ApicObject
208
}
209

210
type hppReference struct {
211
        RefCount uint              `json:"ref-count,omitempty"`
212
        Npkeys   []string          `json:"npkeys,omitempty"`
213
        HppObj   apicapi.ApicSlice `json:"hpp-obj,omitempty"`
214
}
215

216
type DelayedEpSlice struct {
217
        ServiceKey  string
218
        OldEpSlice  *discovery.EndpointSlice
219
        NewEpSlice  *discovery.EndpointSlice
220
        DelayedTime time.Time
221
}
222

223
type aciPodAnnot struct {
224
        aciPod             string
225
        isSingleOpflexOdev bool
226
        disconnectTime     time.Time
227
        connectTime        time.Time
228
}
229

230
type nodeServiceMeta struct {
231
        serviceEp metadata.ServiceEndpoint
232
}
233

234
type nodePodNetMeta struct {
235
        nodePods            map[string]bool
236
        podNetIps           metadata.NetIps
237
        podNetIpsAnnotation string
238
}
239

240
type serviceMeta struct {
241
        requestedIps     []net.IP
242
        ingressIps       []net.IP
243
        staticIngressIps []net.IP
244
}
245

246
type ipIndexEntry struct {
247
        ipNet net.IPNet
248
        keys  map[string]bool
249
}
250

251
type targetPort struct {
252
        proto v1.Protocol
253
        ports []int
254
}
255

256
type portIndexEntry struct {
257
        port              targetPort
258
        serviceKeys       map[string]bool
259
        networkPolicyKeys map[string]bool
260
}
261

262
type portRangeSnat struct {
263
        start int
264
        end   int
265
}
266

267
// EndPointData holds PodIF data in controller.
268
type EndPointData struct {
269
        MacAddr    string
270
        EPG        string
271
        Namespace  string
272
        AppProfile string
273
}
274

275
type ctrPortNameEntry struct {
276
        // Proto+port->pods
277
        ctrNmpToPods map[string]map[string]bool
278
}
279

280
type LinkData struct {
281
        Link []string
282
        Pods []string
283
}
284

285
type AdditionalNetworkMeta struct {
286
        NetworkName string
287
        EncapVlan   string
288
        //node+localiface->fabricLinks
289
        FabricLink map[string]map[string]LinkData
290
        NodeCache  map[string]*fabattv1.NodeFabricNetworkAttachment
291
}
292

293
type ServiceEndPointType interface {
294
        InitClientInformer(kubeClient *kubernetes.Clientset)
295
        Run(stopCh <-chan struct{})
296
        Wait(stopCh <-chan struct{})
297
        UpdateServicesForNode(nodename string)
298
        GetnodesMetadata(key string, service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint)
299
        SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool
300
        SetNpServiceAugmentForService(servicekey string, service *v1.Service, prs *portRemoteSubnet,
301
                portAugments map[string]*portServiceAugment, subnetIndex cidranger.Ranger, logger *logrus.Entry)
302
}
303

304
type serviceEndpoint struct {
305
        cont *AciController
306
}
307
type serviceEndpointSlice struct {
308
        cont *AciController
309
}
310

311
func (sep *serviceEndpoint) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
312
        sep.cont.initEndpointsInformerFromClient(kubeClient)
×
313
}
×
314

315
func (seps *serviceEndpointSlice) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
316
        seps.cont.initEndpointSliceInformerFromClient(kubeClient)
×
317
}
×
318

319
func (sep *serviceEndpoint) Run(stopCh <-chan struct{}) {
1✔
320
        go sep.cont.endpointsInformer.Run(stopCh)
1✔
321
}
1✔
322

323
func (seps *serviceEndpointSlice) Run(stopCh <-chan struct{}) {
1✔
324
        go seps.cont.endpointSliceInformer.Run(stopCh)
1✔
325
}
1✔
326

327
func (sep *serviceEndpoint) Wait(stopCh <-chan struct{}) {
1✔
328
        cache.WaitForCacheSync(stopCh,
1✔
329
                sep.cont.endpointsInformer.HasSynced,
1✔
330
                sep.cont.serviceInformer.HasSynced)
1✔
331
}
1✔
332

333
func (seps *serviceEndpointSlice) Wait(stopCh <-chan struct{}) {
1✔
334
        seps.cont.log.Debug("Waiting for EndPointSlicecache sync")
1✔
335
        cache.WaitForCacheSync(stopCh,
1✔
336
                seps.cont.endpointSliceInformer.HasSynced,
1✔
337
                seps.cont.serviceInformer.HasSynced)
1✔
338
}
1✔
339

340
func (e *ipIndexEntry) Network() net.IPNet {
1✔
341
        return e.ipNet
1✔
342
}
1✔
343

344
func newNodePodNetMeta() *nodePodNetMeta {
1✔
345
        return &nodePodNetMeta{
1✔
346
                nodePods: make(map[string]bool),
1✔
347
        }
1✔
348
}
1✔
349

350
func createQueue(name string) workqueue.RateLimitingInterface {
1✔
351
        return workqueue.NewNamedRateLimitingQueue(
1✔
352
                workqueue.NewMaxOfRateLimiter(
1✔
353
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
354
                                10*time.Second),
1✔
355
                        &workqueue.BucketRateLimiter{
1✔
356
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
357
                        },
1✔
358
                ),
1✔
359
                "delta")
1✔
360
}
1✔
361

362
func NewController(config *ControllerConfig, env Environment, log *logrus.Logger, unittestmode bool) *AciController {
1✔
363
        cont := &AciController{
1✔
364
                log:          log,
1✔
365
                config:       config,
1✔
366
                env:          env,
1✔
367
                defaultEg:    "",
1✔
368
                defaultSg:    "",
1✔
369
                unitTestMode: unittestmode,
1✔
370

1✔
371
                podQueue:             createQueue("pod"),
1✔
372
                netPolQueue:          createQueue("networkPolicy"),
1✔
373
                qosQueue:             createQueue("qos"),
1✔
374
                netflowQueue:         createQueue("netflow"),
1✔
375
                erspanQueue:          createQueue("erspan"),
1✔
376
                serviceQueue:         createQueue("service"),
1✔
377
                snatQueue:            createQueue("snat"),
1✔
378
                snatNodeInfoQueue:    createQueue("snatnodeinfo"),
1✔
379
                rdConfigQueue:        createQueue("rdconfig"),
1✔
380
                istioQueue:           createQueue("istio"),
1✔
381
                nodeFabNetAttQueue:   createQueue("nodefabricnetworkattachment"),
1✔
382
                staticFabNetAttQueue: createQueue("staticfabricnetworkattachment"),
1✔
383
                nadVlanMapQueue:      createQueue("nadvlanmap"),
1✔
384
                fabricVlanPoolQueue:  createQueue("fabricvlanpool"),
1✔
385
                syncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
386
                        &workqueue.BucketRateLimiter{
1✔
387
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
388
                        }, "sync"),
1✔
389

1✔
390
                configuredPodNetworkIps: newNetIps(),
1✔
391
                podNetworkIps:           newNetIps(),
1✔
392
                serviceIps:              ipam.NewIpCache(),
1✔
393
                staticServiceIps:        newNetIps(),
1✔
394
                nodeServiceIps:          newNetIps(),
1✔
395

1✔
396
                nodeACIPod:       make(map[string]aciPodAnnot),
1✔
397
                nodeACIPodAnnot:  make(map[string]aciPodAnnot),
1✔
398
                nodeOpflexDevice: make(map[string]apicapi.ApicSlice),
1✔
399

1✔
400
                nodeServiceMetaCache:    make(map[string]*nodeServiceMeta),
1✔
401
                nodePodNetCache:         make(map[string]*nodePodNetMeta),
1✔
402
                serviceMetaCache:        make(map[string]*serviceMeta),
1✔
403
                snatPolicyCache:         make(map[string]*ContSnatPolicy),
1✔
404
                snatServices:            make(map[string]bool),
1✔
405
                snatNodeInfoCache:       make(map[string]*nodeinfo.NodeInfo),
1✔
406
                rdConfigCache:           make(map[string]*rdConfig.RdConfig),
1✔
407
                rdConfigSubnetCache:     make(map[string]*rdConfig.RdConfigSpec),
1✔
408
                podIftoEp:               make(map[string]*EndPointData),
1✔
409
                snatGlobalInfoCache:     make(map[string]map[string]*snatglobalinfo.GlobalInfo),
1✔
410
                istioCache:              make(map[string]*istiov1.AciIstioOperator),
1✔
411
                crdHandlers:             make(map[string]func(*AciController, <-chan struct{})),
1✔
412
                ctrPortNameCache:        make(map[string]*ctrPortNameEntry),
1✔
413
                nmPortNp:                make(map[string]bool),
1✔
414
                hppRef:                  make(map[string]hppReference),
1✔
415
                additionalNetworkCache:  make(map[string]*AdditionalNetworkMeta),
1✔
416
                sharedEncapCache:        make(map[int]*sharedEncapData),
1✔
417
                sharedEncapSfnaCache:    make(map[int]map[string]bool),
1✔
418
                sharedEncapSfnaVlanMap:  make(map[int]map[string]bool),
1✔
419
                sharedEncapSfnaLabelMap: make(map[string]map[string]bool),
1✔
420
                sharedEncapLabelMap:     make(map[string][]int),
1✔
421
                lldpIfCache:             make(map[string]string),
1✔
422
                fabricVlanPoolMap:       make(map[string]map[string]string),
1✔
423
        }
1✔
424
        cont.syncProcessors = map[string]func() bool{
1✔
425
                "snatGlobalInfo": cont.syncSnatGlobalInfo,
1✔
426
                "rdConfig":       cont.syncRdConfig,
1✔
427
                "istioCR":        cont.createIstioCR,
1✔
428
        }
1✔
429
        return cont
1✔
430
}
1✔
431

432
func (cont *AciController) Init() {
×
433
        if cont.config.ChainedMode {
×
434
                cont.log.Info("In chained mode")
×
435
        }
×
436
        if cont.config.LBType != lbTypeAci && !cont.config.ChainedMode {
×
437
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedNwPol")
×
438
                if err != nil {
×
439
                        panic(err.Error())
×
440
                }
441
        }
442

443
        egdata, err := json.Marshal(cont.config.DefaultEg)
×
444
        if err != nil {
×
445
                cont.log.Error("Could not serialize default endpoint group")
×
446
                panic(err.Error())
×
447
        }
448
        cont.defaultEg = string(egdata)
×
449

×
450
        sgdata, err := json.Marshal(cont.config.DefaultSg)
×
451
        if err != nil {
×
452
                cont.log.Error("Could not serialize default security groups")
×
453
                panic(err.Error())
×
454
        }
455
        cont.defaultSg = string(sgdata)
×
456

×
457
        cont.log.Debug("Initializing IPAM")
×
458
        cont.initIpam()
×
459
        // check if the cluster supports endpoint slices
×
460
        // if cluster doesn't have the support fallback to endpoints
×
461
        kubeClient := cont.env.(*K8sEnvironment).kubeClient
×
462
        if util.IsEndPointSlicesSupported(kubeClient) {
×
463
                cont.serviceEndPoints = &serviceEndpointSlice{}
×
464
                cont.serviceEndPoints.(*serviceEndpointSlice).cont = cont
×
465
                cont.log.Info("Initializing ServiceEndpointSlices")
×
466
        } else {
×
467
                cont.serviceEndPoints = &serviceEndpoint{}
×
468
                cont.serviceEndPoints.(*serviceEndpoint).cont = cont
×
469
                cont.log.Info("Initializing ServiceEndpoints")
×
470
        }
×
471

472
        err = cont.env.Init(cont)
×
473
        if err != nil {
×
474
                panic(err.Error())
×
475
        }
476
}
477

478
func (cont *AciController) processQueue(queue workqueue.RateLimitingInterface,
479
        store cache.Store, handler func(interface{}) bool,
480
        deleteHandler func(string) bool,
481
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
482
        go wait.Until(func() {
2✔
483
                for {
2✔
484
                        key, quit := queue.Get()
1✔
485
                        if quit {
2✔
486
                                break
1✔
487
                        }
488

489
                        var requeue bool
1✔
490
                        switch key := key.(type) {
1✔
491
                        case chan struct{}:
×
492
                                close(key)
×
493
                        case string:
1✔
494
                                if strings.HasPrefix(key, "DELETED_") {
2✔
495
                                        delKey := strings.Trim(key, "DELETED_")
1✔
496
                                        requeue = deleteHandler(delKey)
1✔
497
                                } else {
2✔
498
                                        obj, exists, err := store.GetByKey(key)
1✔
499
                                        if err != nil {
1✔
500
                                                cont.log.Debugf("Error fetching object with key %s from store: %v", key, err)
×
501
                                        }
×
502
                                        //Handle Add/Update/Delete
503
                                        if exists && handler != nil {
2✔
504
                                                requeue = handler(obj)
1✔
505
                                        }
1✔
506
                                        //Handle Post Delete
507
                                        if !exists && postDelHandler != nil {
1✔
508
                                                requeue = postDelHandler()
×
509
                                        }
×
510
                                }
511
                        }
512
                        if requeue {
2✔
513
                                queue.AddRateLimited(key)
1✔
514
                        } else {
2✔
515
                                queue.Forget(key)
1✔
516
                        }
1✔
517
                        queue.Done(key)
1✔
518
                }
519
        }, time.Second, stopCh)
520
        <-stopCh
1✔
521
        queue.ShutDown()
1✔
522
}
523

524
func (cont *AciController) globalStaticObjs() apicapi.ApicSlice {
1✔
525
        return apicapi.ApicSlice{}
1✔
526
}
1✔
527

528
func (cont *AciController) aciNameForKey(ktype, key string) string {
1✔
529
        return util.AciNameForKey(cont.config.AciPrefix, ktype, key)
1✔
530
}
1✔
531

532
func (cont *AciController) initStaticObjs() {
1✔
533
        cont.env.InitStaticAciObjects()
1✔
534
        cont.apicConn.WriteStaticApicObjects(cont.config.AciPrefix+"_static",
1✔
535
                cont.globalStaticObjs())
1✔
536
}
1✔
537

538
func (cont *AciController) vmmDomainProvider() (vmmProv string) {
1✔
539
        vmmProv = "Kubernetes"
1✔
540
        if strings.ToLower(cont.config.AciVmmDomainType) == "openshift" {
1✔
541
                vmmProv = "OpenShift"
×
542
        }
×
543
        return
1✔
544
}
545

546
func (cont *AciController) Run(stopCh <-chan struct{}) {
1✔
547
        var err error
1✔
548
        var privKey []byte
1✔
549
        var apicCert []byte
1✔
550

1✔
551
        cont.config.AciVrfDn = "uni/tn-" + cont.config.AciVrfTenant + "/ctx-" + cont.config.AciVrf
1✔
552

1✔
553
        if cont.config.ApicPrivateKeyPath != "" {
1✔
554
                privKey, err = os.ReadFile(cont.config.ApicPrivateKeyPath)
×
555
                if err != nil {
×
556
                        panic(err)
×
557
                }
558
        }
559
        if cont.config.ApicCertPath != "" {
1✔
560
                apicCert, err = os.ReadFile(cont.config.ApicCertPath)
×
561
                if err != nil {
×
562
                        panic(err)
×
563
                }
564
        }
565
        // If not defined, default is 1800
566
        if cont.config.ApicRefreshTimer == "" {
2✔
567
                cont.config.ApicRefreshTimer = "1800"
1✔
568
        }
1✔
569
        refreshTimeout, err := strconv.Atoi(cont.config.ApicRefreshTimer)
1✔
570
        if err != nil {
1✔
571
                panic(err)
×
572
        }
573
        cont.log.Info("ApicRefreshTimer conf is set to: ", refreshTimeout)
1✔
574

1✔
575
        // Bailout if the refreshTimeout is more than 12Hours or less than 5Mins
1✔
576
        if refreshTimeout > (12*60*60) || refreshTimeout < (5*60) {
1✔
577
                cont.log.Info("ApicRefreshTimer can't be more than 12Hrs or less than 5Mins")
×
578
                panic(err)
×
579
        }
580

581
        // If RefreshTickerAdjustInterval is not defined, default to 150Sec.
582
        if cont.config.ApicRefreshTickerAdjust == "" {
2✔
583
                cont.config.ApicRefreshTickerAdjust = "150"
1✔
584
        }
1✔
585
        refreshTickerAdjust, err := strconv.Atoi(cont.config.ApicRefreshTickerAdjust)
1✔
586
        if err != nil {
1✔
587
                panic(err)
×
588
        }
589

590
        //If ApicSubscriptionDelay is not defined, default to 100ms
591
        if cont.config.ApicSubscriptionDelay == 0 {
2✔
592
                cont.config.ApicSubscriptionDelay = 100
1✔
593
        }
1✔
594
        cont.log.Info("ApicSubscriptionDelay conf is set to: ", cont.config.ApicSubscriptionDelay)
1✔
595

1✔
596
        // If OpflexDeviceDeleteTimeout is not defined, default to 1800s
1✔
597
        if cont.config.OpflexDeviceDeleteTimeout == 0 {
2✔
598
                cont.config.OpflexDeviceDeleteTimeout = 1800
1✔
599
        }
1✔
600

601
        // If OpflexDeviceReconnectWaitTimeout is not defined, default to 25s
602
        if cont.config.OpflexDeviceReconnectWaitTimeout == 0 {
2✔
603
                cont.config.OpflexDeviceReconnectWaitTimeout = 25
1✔
604
        }
1✔
605
        cont.log.Debug("OpflexDeviceReconnectWaitTimeout set to: ", cont.config.OpflexDeviceReconnectWaitTimeout)
1✔
606

1✔
607
        // If SleepTimeSnatGlobalInfoSync is not defined, default to 60
1✔
608
        if cont.config.SleepTimeSnatGlobalInfoSync == 0 {
2✔
609
                cont.config.SleepTimeSnatGlobalInfoSync = 60
1✔
610
        }
1✔
611

612
        // If not defined, default to 32
613
        if cont.config.PodIpPoolChunkSize == 0 {
2✔
614
                cont.config.PodIpPoolChunkSize = 32
1✔
615
        }
1✔
616
        if !cont.config.ChainedMode {
2✔
617
                cont.log.Info("PodIpPoolChunkSize conf is set to: ", cont.config.PodIpPoolChunkSize)
1✔
618
        }
1✔
619
        // If not valid, default to 5000-65000
620
        // other permissible values 1-65000
621
        defStart := 5000
1✔
622
        defEnd := 65000
1✔
623
        if cont.config.SnatDefaultPortRangeStart == 0 {
2✔
624
                cont.config.SnatDefaultPortRangeStart = defStart
1✔
625
        }
1✔
626
        if cont.config.SnatDefaultPortRangeEnd == 0 {
2✔
627
                cont.config.SnatDefaultPortRangeEnd = defEnd
1✔
628
        }
1✔
629
        if cont.config.SnatDefaultPortRangeStart < 0 || cont.config.SnatDefaultPortRangeEnd < 0 ||
1✔
630
                cont.config.SnatDefaultPortRangeStart > defEnd || cont.config.SnatDefaultPortRangeEnd > defEnd ||
1✔
631
                cont.config.SnatDefaultPortRangeStart > cont.config.SnatDefaultPortRangeEnd {
1✔
632
                cont.config.SnatDefaultPortRangeStart = defStart
×
633
                cont.config.SnatDefaultPortRangeEnd = defEnd
×
634
        }
×
635

636
        // Set default value for pbr programming delay if services list is not empty
637
        // and delay value is empty
638
        if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 &&
1✔
639
                cont.config.ServiceGraphEndpointAddDelay.Services != nil &&
1✔
640
                len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 {
1✔
641
                cont.config.ServiceGraphEndpointAddDelay.Delay = 90
×
642
        }
×
643
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
644
                cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay)
×
645
        }
×
646

647
        // Set contract scope for snat svc graph to global by default
648
        if cont.config.SnatSvcContractScope == "" {
2✔
649
                cont.config.SnatSvcContractScope = "global"
1✔
650
        }
1✔
651
        if cont.config.MaxSvcGraphNodes == 0 {
2✔
652
                cont.config.MaxSvcGraphNodes = 32
1✔
653
        }
1✔
654
        if !cont.config.ChainedMode {
2✔
655
                cont.log.Info("Max number of nodes per svc graph is set to: ", cont.config.MaxSvcGraphNodes)
1✔
656
        }
1✔
657
        cont.apicConn, err = apicapi.New(cont.log, cont.config.ApicHosts,
1✔
658
                cont.config.ApicUsername, cont.config.ApicPassword,
1✔
659
                privKey, apicCert, cont.config.AciPrefix,
1✔
660
                refreshTimeout, refreshTickerAdjust, cont.config.ApicSubscriptionDelay,
1✔
661
                cont.config.AciVrfTenant)
1✔
662
        if err != nil {
1✔
663
                panic(err)
×
664
        }
665

666
        if len(cont.config.ApicHosts) != 0 {
1✔
667
                cont.log.WithFields(logrus.Fields{
×
668
                        "mod":  "APICAPI",
×
669
                        "host": cont.apicConn.Apic[cont.apicConn.ApicIndex],
×
670
                }).Debug("Connecting to APIC to determine the Version")
×
671

×
672
                version, err := cont.apicConn.GetVersion()
×
673
                if err != nil {
×
674
                        cont.log.Error("Could not get APIC version")
×
675
                        panic(err)
×
676
                }
677
                cont.apicConn.CachedVersion = version
×
678
                apicapi.ApicVersion = version
×
679
                if version >= "4.2(4i)" {
×
680
                        cont.apicConn.SnatPbrFltrChain = true
×
681
                } else {
×
682
                        cont.apicConn.SnatPbrFltrChain = false
×
683
                }
×
684
                if version >= "5.2" {
×
685
                        cont.vmmClusterFaultSupported = true
×
686
                }
×
687
        } else { // For unit-tests
1✔
688
                cont.apicConn.SnatPbrFltrChain = true
1✔
689
        }
1✔
690

691
        if !cont.config.ChainedMode {
2✔
692
                cont.log.Debug("SnatPbrFltrChain set to:", cont.apicConn.SnatPbrFltrChain)
1✔
693
                // Make sure Pod/NodeBDs are assoicated to same VRF.
1✔
694
                if len(cont.config.ApicHosts) != 0 && cont.config.AciPodBdDn != "" && cont.config.AciNodeBdDn != "" {
1✔
695
                        var expectedVrfRelations []string
×
696
                        expectedVrfRelations = append(expectedVrfRelations, cont.config.AciPodBdDn, cont.config.AciNodeBdDn)
×
697
                        cont.log.Debug("expectedVrfRelations:", expectedVrfRelations)
×
698
                        err = cont.apicConn.ValidateAciVrfAssociation(cont.config.AciVrfDn, expectedVrfRelations)
×
699
                        if err != nil {
×
700
                                cont.log.Error("Pod/NodeBDs and AciL3Out VRF association is incorrect")
×
701
                                panic(err)
×
702
                        }
703
                }
704
        }
705

706
        if len(cont.config.ApicHosts) != 0 && cont.vmmClusterFaultSupported && !cont.config.ChainedMode {
1✔
707
                //Clear fault instances when the controller starts
×
708
                cont.clearFaultInstances()
×
709
                //Subscribe for vmmEpPD for a given domain
×
710
                var tnTargetFilterEpg string
×
711
                tnTargetFilterEpg += fmt.Sprintf("uni/vmmp-%s/dom-%s", cont.vmmDomainProvider(), cont.config.AciVmmDomain)
×
712
                subnetTargetFilterEpg := fmt.Sprintf("and(wcard(vmmEpPD.dn,\"%s\"))", tnTargetFilterEpg)
×
713
                cont.apicConn.AddSubscriptionClass("vmmEpPD",
×
714
                        []string{"vmmEpPD"}, subnetTargetFilterEpg)
×
715
                cont.apicConn.SetSubscriptionHooks("vmmEpPD",
×
716
                        func(obj apicapi.ApicObject) bool {
×
717
                                cont.vmmEpPDChanged(obj)
×
718
                                return true
×
719
                        },
×
720
                        func(dn string) {
×
721
                                cont.vmmEpPDDeleted(dn)
×
722
                        })
×
723
        }
724

725
        cont.initStaticObjs()
1✔
726

1✔
727
        err = cont.env.PrepareRun(stopCh)
1✔
728
        if err != nil {
1✔
729
                panic(err.Error())
×
730
        }
731

732
        cont.apicConn.FullSyncHook = func() {
1✔
733
                // put a channel into each work queue and wait on it to
×
734
                // checkpoint object syncing in response to new subscription
×
735
                // updates
×
736
                cont.log.Debug("Starting checkpoint")
×
737
                var chans []chan struct{}
×
738
                qs := make([]workqueue.RateLimitingInterface, 0)
×
739
                _, ok := cont.env.(*K8sEnvironment)
×
740
                if ok {
×
741
                        if !cont.config.ChainedMode {
×
742
                                qs = []workqueue.RateLimitingInterface{
×
743
                                        cont.podQueue, cont.netPolQueue, cont.qosQueue,
×
744
                                        cont.serviceQueue, cont.snatQueue, cont.netflowQueue,
×
745
                                        cont.snatNodeInfoQueue, cont.rdConfigQueue, cont.erspanQueue,
×
746
                                }
×
747
                        } else {
×
748
                                qs = []workqueue.RateLimitingInterface{
×
749
                                        cont.podQueue,
×
750
                                }
×
751
                        }
×
752
                }
753
                for _, q := range qs {
×
754
                        c := make(chan struct{})
×
755
                        chans = append(chans, c)
×
756
                        q.Add(c)
×
757
                }
×
758
                for _, c := range chans {
×
759
                        <-c
×
760
                }
×
761
                cont.log.Debug("Checkpoint complete")
×
762
        }
763

764
        if len(cont.config.ApicHosts) != 0 && !cont.config.ChainedMode {
1✔
765
                cont.BuildSubnetDnCache(cont.config.AciVrfDn, cont.config.AciVrfDn)
×
766
                cont.scheduleRdConfig()
×
767
        }
×
768

769
        if !cont.config.ChainedMode {
2✔
770
                if cont.config.AciPolicyTenant != cont.config.AciVrfTenant {
2✔
771
                        cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
772
                                []string{"hostprotPol"})
1✔
773
                }
1✔
774
        } else {
×
775
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
×
776
                        []string{"fvBD", "fvAp"})
×
777
                cont.apicConn.AddSubscriptionClass("fvnsVlanInstP",
×
778
                        []string{"fvnsVlanInstP"}, "")
×
779
                cont.apicConn.AddSubscriptionClass("infraRsDomP",
×
780
                        []string{"infraRsDomP"}, "")
×
781
                cont.apicConn.AddSubscriptionClass("physDomP",
×
782
                        []string{"physDomP"}, "")
×
783
                cont.apicConn.AddSubscriptionClass("infraRsVlanNs",
×
784
                        []string{"infraRsVlanNs"}, "")
×
785
                cont.apicConn.AddSubscriptionClass("infraGeneric",
×
786
                        []string{"infraGeneric", "infraRsFuncToEpg"}, "")
×
787
        }
×
788
        if !cont.config.ChainedMode {
2✔
789
                // When a new class is added for subscriptio, check if its name attribute
1✔
790
                // is in the format aciPrefix-<some value>, if so add it in nameAttrClass
1✔
791
                // in apicapi.go
1✔
792
                subscribeMo := []string{"fvBD", "vnsLDevVip", "vnsAbsGraph", "vnsLDevCtx",
1✔
793
                        "vzFilter", "vzBrCP", "l3extInstP", "vnsSvcRedirectPol",
1✔
794
                        "vnsRedirectHealthGroup", "fvIPSLAMonitoringPol"}
1✔
795
                if cont.config.AciPolicyTenant == cont.config.AciVrfTenant {
1✔
NEW
796
                        subscribeMo = append(subscribeMo, "hostprotPol")
×
NEW
797
                }
×
798
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciVrfTenant,
1✔
799
                        subscribeMo)
1✔
800
                cont.apicConn.AddSubscriptionDn(fmt.Sprintf("uni/tn-%s/out-%s",
1✔
801
                        cont.config.AciVrfTenant, cont.config.AciL3Out),
1✔
802
                        []string{"fvRsCons"})
1✔
803
                vmmDn := fmt.Sprintf("comp/prov-%s/ctrlr-[%s]-%s/injcont",
1✔
804
                        cont.env.VmmPolicy(), cont.config.AciVmmDomain,
1✔
805
                        cont.config.AciVmmController)
1✔
806
                // Before subscribing to vmm objects, add vmmInjectedLabel as a child after explicit APIC version check
1✔
807
                // Since it is not supported for APIC versions < "5.0"
1✔
808
                cont.addVmmInjectedLabel()
1✔
809
                cont.apicConn.AddSubscriptionDn(vmmDn,
1✔
810
                        []string{"vmmInjectedHost", "vmmInjectedNs"})
1✔
811

1✔
812
                var tnTargetFilter string
1✔
813
                if len(cont.config.AciVrfRelatedTenants) > 0 {
1✔
814
                        for _, tn := range cont.config.AciVrfRelatedTenants {
×
815
                                tnTargetFilter += fmt.Sprintf("tn-%s|", tn)
×
816
                        }
×
817
                } else {
1✔
818
                        tnTargetFilter += fmt.Sprintf("tn-%s|tn-%s",
1✔
819
                                cont.config.AciPolicyTenant, cont.config.AciVrfTenant)
1✔
820
                }
1✔
821
                subnetTargetFilter := fmt.Sprintf("and(wcard(fvSubnet.dn,\"%s\"))",
1✔
822
                        tnTargetFilter)
1✔
823
                cont.apicConn.AddSubscriptionClass("fvSubnet",
1✔
824
                        []string{"fvSubnet"}, subnetTargetFilter)
1✔
825

1✔
826
                cont.apicConn.SetSubscriptionHooks("fvSubnet",
1✔
827
                        func(obj apicapi.ApicObject) bool {
1✔
828
                                cont.SubnetChanged(obj, cont.config.AciVrfDn)
×
829
                                return true
×
830
                        },
×
831
                        func(dn string) {
×
832
                                cont.SubnetDeleted(dn)
×
833
                        })
×
834

835
                cont.apicConn.AddSubscriptionClass("opflexODev",
1✔
836
                        []string{"opflexODev"}, "")
1✔
837

1✔
838
                cont.apicConn.SetSubscriptionHooks("opflexODev",
1✔
839
                        func(obj apicapi.ApicObject) bool {
1✔
840
                                cont.opflexDeviceChanged(obj)
×
841
                                return true
×
842
                        },
×
843
                        func(dn string) {
×
844
                                cont.opflexDeviceDeleted(dn)
×
845
                        })
×
846
        }
847
        go cont.apicConn.Run(stopCh)
1✔
848
}
849

850
func (cont *AciController) syncNodeAciPods(stopCh <-chan struct{}, seconds time.Duration) {
1✔
851
        cont.log.Debug("Go routine to periodically check the aci pod information in the opflexOdev of a node")
1✔
852
        ticker := time.NewTicker(seconds * time.Second)
1✔
853
        defer ticker.Stop()
1✔
854

1✔
855
        for {
2✔
856
                select {
1✔
857
                case <-ticker.C:
1✔
858
                        cont.checkChangeOfOpflexOdevAciPod()
1✔
859
                        if cont.config.AciMultipod {
1✔
860
                                cont.checkChangeOfOdevAciPod()
×
861
                        }
×
862
                case <-stopCh:
1✔
863
                        return
1✔
864
                }
865
        }
866
}
867

868
func (cont *AciController) syncOpflexDevices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
869
        cont.log.Debug("Go routine to periodically delete old opflexdevices started")
1✔
870
        ticker := time.NewTicker(seconds * time.Second)
1✔
871
        defer ticker.Stop()
1✔
872

1✔
873
        for {
2✔
874
                select {
1✔
875
                case <-ticker.C:
1✔
876
                        cont.deleteOldOpflexDevices()
1✔
877
                case <-stopCh:
1✔
878
                        return
1✔
879
                }
880
        }
881
}
882

883
func (cont *AciController) syncDelayedEpSlices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
884
        cont.log.Debug("Go routine to periodically check and process the epslices having delay adding in service")
1✔
885
        ticker := time.NewTicker(seconds * time.Second)
1✔
886
        defer ticker.Stop()
1✔
887

1✔
888
        for {
2✔
889
                select {
1✔
890
                case <-ticker.C:
1✔
891
                        cont.processDelayedEpSlices()
1✔
892
                case <-stopCh:
1✔
893
                        return
1✔
894
                }
895
        }
896
}
897

898
func (cont *AciController) snatGlobalInfoSync(stopCh <-chan struct{}, seconds int) {
1✔
899
        time.Sleep(time.Duration(seconds) * time.Second)
1✔
900
        cont.log.Debug("Go routine to periodically sync globalinfo and nodeinfo started")
1✔
901
        iteration := 0
1✔
902
        for {
2✔
903
                // To avoid noisy logs, only printing once in 5 minutes
1✔
904
                if iteration%5 == 0 {
2✔
905
                        cont.log.Debug("Syncing GlobalInfo with Node infos")
1✔
906
                }
1✔
907
                var nodeInfos []*nodeinfo.NodeInfo
1✔
908
                cont.indexMutex.Lock()
1✔
909
                cache.ListAll(cont.snatNodeInfoIndexer, labels.Everything(),
1✔
910
                        func(nodeInfoObj interface{}) {
2✔
911
                                nodeInfo := nodeInfoObj.(*nodeinfo.NodeInfo)
1✔
912
                                nodeInfos = append(nodeInfos, nodeInfo)
1✔
913
                        })
1✔
914
                expectedmap := make(map[string]map[string]bool)
1✔
915
                for _, glinfo := range cont.snatGlobalInfoCache {
2✔
916
                        for nodename, entry := range glinfo {
2✔
917
                                if _, found := expectedmap[nodename]; !found {
2✔
918
                                        newentry := make(map[string]bool)
1✔
919
                                        newentry[entry.SnatPolicyName] = true
1✔
920
                                        expectedmap[nodename] = newentry
1✔
921
                                } else {
2✔
922
                                        currententry := expectedmap[nodename]
1✔
923
                                        currententry[entry.SnatPolicyName] = true
1✔
924
                                        expectedmap[nodename] = currententry
1✔
925
                                }
1✔
926
                        }
927
                }
928
                cont.indexMutex.Unlock()
1✔
929

1✔
930
                for _, value := range nodeInfos {
2✔
931
                        marked := false
1✔
932
                        policyNames := value.Spec.SnatPolicyNames
1✔
933
                        nodeName := value.ObjectMeta.Name
1✔
934
                        _, ok := expectedmap[nodeName]
1✔
935
                        if !ok && len(policyNames) > 0 {
2✔
936
                                cont.log.Info("Adding missing entry in snatglobalinfo for node: ", nodeName)
1✔
937
                                cont.log.Debug("No snat policies found in snatglobalinfo")
1✔
938
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
939
                                marked = true
1✔
940
                        } else if len(policyNames) != len(expectedmap[nodeName]) {
3✔
941
                                cont.log.Info("Adding missing snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
942
                                cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
943
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
944
                                marked = true
1✔
945
                        } else {
2✔
946
                                if len(policyNames) == 0 && len(expectedmap[nodeName]) == 0 {
1✔
947
                                        // No snatpolicies present
×
948
                                        continue
×
949
                                }
950
                                eq := reflect.DeepEqual(expectedmap[nodeName], policyNames)
1✔
951
                                if !eq {
2✔
952
                                        cont.log.Debug("Syncing inconsistent snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
953
                                        cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
954
                                        cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
955
                                        marked = true
1✔
956
                                }
1✔
957
                        }
958
                        if marked {
2✔
959
                                cont.log.Info("Nodeinfo and globalinfo out of sync for node: ", nodeName)
1✔
960
                                nodeinfokey, err := cache.MetaNamespaceKeyFunc(value)
1✔
961
                                if err != nil {
1✔
962
                                        cont.log.Error("Not able to get key for node: ", nodeName)
×
963
                                        continue
×
964
                                }
965
                                cont.log.Info("Queuing nodeinfokey for globalinfo sync: ", nodeinfokey)
1✔
966
                                cont.queueNodeInfoUpdateByKey(nodeinfokey)
1✔
967
                        } else if iteration%5 == 0 {
2✔
968
                                cont.log.Info("Nodeinfo and globalinfo in sync for node: ", nodeName)
1✔
969
                        }
1✔
970
                }
971
                time.Sleep(time.Duration(seconds) * time.Second)
1✔
972
                iteration++
1✔
973
        }
974
}
975

976
func (cont *AciController) processSyncQueue(queue workqueue.RateLimitingInterface,
977
        queueStop <-chan struct{}) {
1✔
978
        go wait.Until(func() {
2✔
979
                for {
2✔
980
                        syncType, quit := queue.Get()
1✔
981
                        if quit {
2✔
982
                                break
1✔
983
                        }
984
                        var requeue bool
1✔
985
                        if sType, ok := syncType.(string); ok {
2✔
986
                                if f, ok := cont.syncProcessors[sType]; ok {
2✔
987
                                        requeue = f()
1✔
988
                                }
1✔
989
                        }
990
                        if requeue {
1✔
991
                                queue.AddRateLimited(syncType)
×
992
                        } else {
1✔
993
                                queue.Forget(syncType)
1✔
994
                        }
1✔
995
                        queue.Done(syncType)
1✔
996
                }
997
        }, time.Second, queueStop)
998
        <-queueStop
1✔
999
        queue.ShutDown()
1✔
1000
}
1001

1002
func (cont *AciController) scheduleSyncGlobalInfo() {
1✔
1003
        cont.syncQueue.AddRateLimited("snatGlobalInfo")
1✔
1004
}
1✔
1005
func (cont *AciController) scheduleRdConfig() {
×
1006
        cont.syncQueue.AddRateLimited("rdConfig")
×
1007
}
×
1008
func (cont *AciController) scheduleCreateIstioCR() {
×
1009
        cont.syncQueue.AddRateLimited("istioCR")
×
1010
}
×
1011

1012
func (cont *AciController) addVmmInjectedLabel() {
1✔
1013
        if apicapi.ApicVersion >= "5.2" {
1✔
1014
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedLabel")
×
1015
                if err != nil {
×
1016
                        panic(err.Error())
×
1017
                }
1018
                err = apicapi.AddMetaDataChild("vmmInjectedSvc", "vmmInjectedLabel")
×
1019
                if err != nil {
×
1020
                        panic(err.Error())
×
1021
                }
1022
        }
1023
        if apicapi.ApicVersion >= "5.0" {
1✔
1024
                err := apicapi.AddMetaDataChild("vmmInjectedReplSet", "vmmInjectedLabel")
×
1025
                if err != nil {
×
1026
                        panic(err.Error())
×
1027
                }
1028
                err = apicapi.AddMetaDataChild("vmmInjectedContGrp", "vmmInjectedLabel")
×
1029
                if err != nil {
×
1030
                        panic(err.Error())
×
1031
                }
1032
                err = apicapi.AddMetaDataChild("vmmInjectedDepl", "vmmInjectedLabel")
×
1033
                if err != nil {
×
1034
                        panic(err.Error())
×
1035
                }
1036
        }
1037
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc