• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 8274

06 Dec 2023 10:05AM UTC coverage: 53.692% (-0.1%) from 53.79%
8274

Pull #1206

travis-pro

web-flow
Merge cbf87aef6 into afa25a6a5
Pull Request #1206: Added paremeter enable-opflex-agent-reconnect

5 of 29 new or added lines in 3 files covered. (17.24%)

17 existing lines in 3 files now uncovered.

13263 of 24702 relevant lines covered (53.69%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.67
/pkg/controller/controller.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "encoding/json"
19
        "fmt"
20
        "net"
21
        "os"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/sirupsen/logrus"
29
        "github.com/yl2chen/cidranger"
30
        "golang.org/x/time/rate"
31

32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        "k8s.io/apimachinery/pkg/labels"
35
        "k8s.io/apimachinery/pkg/util/wait"
36
        "k8s.io/client-go/kubernetes"
37
        "k8s.io/client-go/tools/cache"
38
        "k8s.io/client-go/util/workqueue"
39

40
        "github.com/noironetworks/aci-containers/pkg/apicapi"
41
        fabattv1 "github.com/noironetworks/aci-containers/pkg/fabricattachment/apis/aci.fabricattachment/v1"
42
        "github.com/noironetworks/aci-containers/pkg/index"
43
        "github.com/noironetworks/aci-containers/pkg/ipam"
44
        istiov1 "github.com/noironetworks/aci-containers/pkg/istiocrd/apis/aci.istio/v1"
45
        "github.com/noironetworks/aci-containers/pkg/metadata"
46
        nodeinfo "github.com/noironetworks/aci-containers/pkg/nodeinfo/apis/aci.snat/v1"
47
        rdConfig "github.com/noironetworks/aci-containers/pkg/rdconfig/apis/aci.snat/v1"
48
        snatglobalinfo "github.com/noironetworks/aci-containers/pkg/snatglobalinfo/apis/aci.snat/v1"
49
        "github.com/noironetworks/aci-containers/pkg/util"
50
)
51

52
type podUpdateFunc func(*v1.Pod) (*v1.Pod, error)
53
type nodeUpdateFunc func(*v1.Node) (*v1.Node, error)
54
type serviceUpdateFunc func(*v1.Service) (*v1.Service, error)
55

56
type AciController struct {
57
        log    *logrus.Logger
58
        config *ControllerConfig
59
        env    Environment
60

61
        defaultEg string
62
        defaultSg string
63

64
        unitTestMode bool
65

66
        podQueue             workqueue.RateLimitingInterface
67
        netPolQueue          workqueue.RateLimitingInterface
68
        qosQueue             workqueue.RateLimitingInterface
69
        serviceQueue         workqueue.RateLimitingInterface
70
        snatQueue            workqueue.RateLimitingInterface
71
        netflowQueue         workqueue.RateLimitingInterface
72
        erspanQueue          workqueue.RateLimitingInterface
73
        snatNodeInfoQueue    workqueue.RateLimitingInterface
74
        rdConfigQueue        workqueue.RateLimitingInterface
75
        istioQueue           workqueue.RateLimitingInterface
76
        nodeFabNetAttQueue   workqueue.RateLimitingInterface
77
        staticFabNetAttQueue workqueue.RateLimitingInterface
78
        nadVlanMapQueue      workqueue.RateLimitingInterface
79
        fabricVlanPoolQueue  workqueue.RateLimitingInterface
80

81
        namespaceIndexer        cache.Indexer
82
        namespaceInformer       cache.Controller
83
        podIndexer              cache.Indexer
84
        podInformer             cache.Controller
85
        endpointsIndexer        cache.Indexer
86
        endpointsInformer       cache.Controller
87
        serviceIndexer          cache.Indexer
88
        serviceInformer         cache.Controller
89
        replicaSetIndexer       cache.Indexer
90
        replicaSetInformer      cache.Controller
91
        deploymentIndexer       cache.Indexer
92
        deploymentInformer      cache.Controller
93
        nodeIndexer             cache.Indexer
94
        nodeInformer            cache.Controller
95
        networkPolicyIndexer    cache.Indexer
96
        networkPolicyInformer   cache.Controller
97
        snatIndexer             cache.Indexer
98
        snatInformer            cache.Controller
99
        snatNodeInfoIndexer     cache.Indexer
100
        snatNodeInformer        cache.Controller
101
        crdInformer             cache.Controller
102
        rdConfigInformer        cache.Controller
103
        rdConfigIndexer         cache.Indexer
104
        qosIndexer              cache.Indexer
105
        qosInformer             cache.Controller
106
        netflowIndexer          cache.Indexer
107
        netflowInformer         cache.Controller
108
        erspanIndexer           cache.Indexer
109
        erspanInformer          cache.Controller
110
        nodePodIfIndexer        cache.Indexer
111
        nodePodIfInformer       cache.Controller
112
        istioIndexer            cache.Indexer
113
        istioInformer           cache.Controller
114
        endpointSliceIndexer    cache.Indexer
115
        endpointSliceInformer   cache.Controller
116
        snatCfgInformer         cache.Controller
117
        updatePod               podUpdateFunc
118
        updateNode              nodeUpdateFunc
119
        updateServiceStatus     serviceUpdateFunc
120
        nodeFabNetAttInformer   cache.SharedIndexInformer
121
        staticFabNetAttInformer cache.SharedIndexInformer
122
        nadVlanMapInformer      cache.SharedIndexInformer
123
        fabricVlanPoolInformer  cache.SharedIndexInformer
124

125
        indexMutex sync.Mutex
126

127
        configuredPodNetworkIps *netIps
128
        podNetworkIps           *netIps
129
        serviceIps              *ipam.IpCache
130
        staticServiceIps        *netIps
131
        nodeServiceIps          *netIps
132

133
        // index of pods matched by deployments
134
        depPods *index.PodSelectorIndex
135
        // index of pods matched by network policies
136
        netPolPods *index.PodSelectorIndex
137
        // index of pods matched by network policy ingress rules
138
        netPolIngressPods *index.PodSelectorIndex
139
        // index of pods matched by network policy egress rules
140
        netPolEgressPods *index.PodSelectorIndex
141
        // index of IP addresses contained in endpoints objects
142
        endpointsIpIndex cidranger.Ranger
143
        // index of service target ports
144
        targetPortIndex map[string]*portIndexEntry
145
        // index of ip blocks referenced by network policy egress rules
146
        netPolSubnetIndex cidranger.Ranger
147
        // index of pods matched by erspan policies
148
        erspanPolPods *index.PodSelectorIndex
149

150
        apicConn *apicapi.ApicConnection
151

152
        nodeServiceMetaCache map[string]*nodeServiceMeta
153
        nodeACIPod           map[string]aciPodAnnot
154
        nodeACIPodAnnot      map[string]aciPodAnnot
155
        nodeOpflexDevice     map[string]apicapi.ApicSlice
156
        nodePodNetCache      map[string]*nodePodNetMeta
157
        serviceMetaCache     map[string]*serviceMeta
158
        snatPolicyCache      map[string]*ContSnatPolicy
159
        delayedEpSlices      []*DelayedEpSlice
160
        snatServices         map[string]bool
161
        snatNodeInfoCache    map[string]*nodeinfo.NodeInfo
162
        rdConfigCache        map[string]*rdConfig.RdConfig
163
        rdConfigSubnetCache  map[string]*rdConfig.RdConfigSpec
164
        istioCache           map[string]*istiov1.AciIstioOperator
165
        podIftoEp            map[string]*EndPointData
166
        // Node Name and Policy Name
167
        snatGlobalInfoCache map[string]map[string]*snatglobalinfo.GlobalInfo
168
        nodeSyncEnabled     bool
169
        serviceSyncEnabled  bool
170
        snatSyncEnabled     bool
171
        tunnelGetter        *tunnelState
172
        syncQueue           workqueue.RateLimitingInterface
173
        syncProcessors      map[string]func() bool
174
        serviceEndPoints    ServiceEndPointType
175
        crdHandlers         map[string]func(*AciController, <-chan struct{})
176
        stopCh              <-chan struct{}
177
        //index of containerportname to ctrPortNameEntry
178
        ctrPortNameCache map[string]*ctrPortNameEntry
179
        // named networkPolicies
180
        nmPortNp map[string]bool
181
        //maps network policy hash to hpp
182
        hppRef map[string]hppReference
183
        // cache to look for Epg DNs which are bound to Vmm domain
184
        cachedEpgDns             []string
185
        vmmClusterFaultSupported bool
186
        additionalNetworkCache   map[string]*AdditionalNetworkMeta
187
        //Used in Shared mode
188
        sharedEncapCache map[int]*sharedEncapData
189
        // vlan to aepList
190
        sharedEncapSfnaCache    map[int]map[string]bool
191
        sharedEncapSfnaVlanMap  map[int]map[string]bool
192
        sharedEncapSfnaLabelMap map[string]map[string]bool
193
        // nadVlanMap encapLabel to vlan
194
        sharedEncapLabelMap map[string][]int
195
        lldpIfCache         map[string]string
196
        globalVlanConfig    globalVlanConfig
197
        fabricVlanPoolMap   map[string]map[string]string
198
}
199

200
type sharedEncapData struct {
201
        //node to NAD to pods
202
        Pods   map[string]map[string][]string
203
        NetRef map[string]*AdditionalNetworkMeta
204
}
205

206
type globalVlanConfig struct {
207
        SharedPhysDom apicapi.ApicObject
208
}
209

210
type hppReference struct {
211
        RefCount uint              `json:"ref-count,omitempty"`
212
        Npkeys   []string          `json:"npkeys,omitempty"`
213
        HppObj   apicapi.ApicSlice `json:"hpp-obj,omitempty"`
214
}
215

216
type DelayedEpSlice struct {
217
        ServiceKey  string
218
        OldEpSlice  *discovery.EndpointSlice
219
        NewEpSlice  *discovery.EndpointSlice
220
        DelayedTime time.Time
221
}
222

223
type aciPodAnnot struct {
224
        aciPod             string
225
        isSingleOpflexOdev bool
226
        disconnectTime     time.Time
227
        connectTime        time.Time
228
}
229

230
type nodeServiceMeta struct {
231
        serviceEp metadata.ServiceEndpoint
232
}
233

234
type nodePodNetMeta struct {
235
        nodePods            map[string]bool
236
        podNetIps           metadata.NetIps
237
        podNetIpsAnnotation string
238
}
239

240
type serviceMeta struct {
241
        requestedIps     []net.IP
242
        ingressIps       []net.IP
243
        staticIngressIps []net.IP
244
}
245

246
type ipIndexEntry struct {
247
        ipNet net.IPNet
248
        keys  map[string]bool
249
}
250

251
type targetPort struct {
252
        proto v1.Protocol
253
        ports []int
254
}
255

256
type portIndexEntry struct {
257
        port              targetPort
258
        serviceKeys       map[string]bool
259
        networkPolicyKeys map[string]bool
260
}
261

262
type portRangeSnat struct {
263
        start int
264
        end   int
265
}
266

267
// EndPointData holds PodIF data in controller.
268
type EndPointData struct {
269
        MacAddr    string
270
        EPG        string
271
        Namespace  string
272
        AppProfile string
273
}
274

275
type ctrPortNameEntry struct {
276
        // Proto+port->pods
277
        ctrNmpToPods map[string]map[string]bool
278
}
279

280
type LinkData struct {
281
        Link []string
282
        Pods []string
283
}
284

285
type AdditionalNetworkMeta struct {
286
        NetworkName string
287
        EncapVlan   string
288
        //node+localiface->fabricLinks
289
        FabricLink map[string]map[string]LinkData
290
        NodeCache  map[string]*fabattv1.NodeFabricNetworkAttachment
291
}
292

293
type ServiceEndPointType interface {
294
        InitClientInformer(kubeClient *kubernetes.Clientset)
295
        Run(stopCh <-chan struct{})
296
        Wait(stopCh <-chan struct{})
297
        UpdateServicesForNode(nodename string)
298
        GetnodesMetadata(key string, service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint)
299
        SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool
300
        SetNpServiceAugmentForService(servicekey string, service *v1.Service, prs *portRemoteSubnet,
301
                portAugments map[string]*portServiceAugment, subnetIndex cidranger.Ranger, logger *logrus.Entry)
302
}
303

304
type serviceEndpoint struct {
305
        cont *AciController
306
}
307
type serviceEndpointSlice struct {
308
        cont *AciController
309
}
310

311
func (sep *serviceEndpoint) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
312
        sep.cont.initEndpointsInformerFromClient(kubeClient)
×
313
}
×
314

315
func (seps *serviceEndpointSlice) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
316
        seps.cont.initEndpointSliceInformerFromClient(kubeClient)
×
317
}
×
318

319
func (sep *serviceEndpoint) Run(stopCh <-chan struct{}) {
1✔
320
        go sep.cont.endpointsInformer.Run(stopCh)
1✔
321
}
1✔
322

323
func (seps *serviceEndpointSlice) Run(stopCh <-chan struct{}) {
1✔
324
        go seps.cont.endpointSliceInformer.Run(stopCh)
1✔
325
}
1✔
326

327
func (sep *serviceEndpoint) Wait(stopCh <-chan struct{}) {
1✔
328
        cache.WaitForCacheSync(stopCh,
1✔
329
                sep.cont.endpointsInformer.HasSynced,
1✔
330
                sep.cont.serviceInformer.HasSynced)
1✔
331
}
1✔
332

333
func (seps *serviceEndpointSlice) Wait(stopCh <-chan struct{}) {
1✔
334
        seps.cont.log.Debug("Waiting for EndPointSlicecache sync")
1✔
335
        cache.WaitForCacheSync(stopCh,
1✔
336
                seps.cont.endpointSliceInformer.HasSynced,
1✔
337
                seps.cont.serviceInformer.HasSynced)
1✔
338
}
1✔
339

340
func (e *ipIndexEntry) Network() net.IPNet {
1✔
341
        return e.ipNet
1✔
342
}
1✔
343

344
func newNodePodNetMeta() *nodePodNetMeta {
1✔
345
        return &nodePodNetMeta{
1✔
346
                nodePods: make(map[string]bool),
1✔
347
        }
1✔
348
}
1✔
349

350
func createQueue(name string) workqueue.RateLimitingInterface {
1✔
351
        return workqueue.NewNamedRateLimitingQueue(
1✔
352
                workqueue.NewMaxOfRateLimiter(
1✔
353
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
354
                                10*time.Second),
1✔
355
                        &workqueue.BucketRateLimiter{
1✔
356
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
357
                        },
1✔
358
                ),
1✔
359
                "delta")
1✔
360
}
1✔
361

362
func NewController(config *ControllerConfig, env Environment, log *logrus.Logger, unittestmode bool) *AciController {
1✔
363
        cont := &AciController{
1✔
364
                log:          log,
1✔
365
                config:       config,
1✔
366
                env:          env,
1✔
367
                defaultEg:    "",
1✔
368
                defaultSg:    "",
1✔
369
                unitTestMode: unittestmode,
1✔
370

1✔
371
                podQueue:             createQueue("pod"),
1✔
372
                netPolQueue:          createQueue("networkPolicy"),
1✔
373
                qosQueue:             createQueue("qos"),
1✔
374
                netflowQueue:         createQueue("netflow"),
1✔
375
                erspanQueue:          createQueue("erspan"),
1✔
376
                serviceQueue:         createQueue("service"),
1✔
377
                snatQueue:            createQueue("snat"),
1✔
378
                snatNodeInfoQueue:    createQueue("snatnodeinfo"),
1✔
379
                rdConfigQueue:        createQueue("rdconfig"),
1✔
380
                istioQueue:           createQueue("istio"),
1✔
381
                nodeFabNetAttQueue:   createQueue("nodefabricnetworkattachment"),
1✔
382
                staticFabNetAttQueue: createQueue("staticfabricnetworkattachment"),
1✔
383
                nadVlanMapQueue:      createQueue("nadvlanmap"),
1✔
384
                fabricVlanPoolQueue:  createQueue("fabricvlanpool"),
1✔
385
                syncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
386
                        &workqueue.BucketRateLimiter{
1✔
387
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
388
                        }, "sync"),
1✔
389

1✔
390
                configuredPodNetworkIps: newNetIps(),
1✔
391
                podNetworkIps:           newNetIps(),
1✔
392
                serviceIps:              ipam.NewIpCache(),
1✔
393
                staticServiceIps:        newNetIps(),
1✔
394
                nodeServiceIps:          newNetIps(),
1✔
395

1✔
396
                nodeACIPod:       make(map[string]aciPodAnnot),
1✔
397
                nodeACIPodAnnot:  make(map[string]aciPodAnnot),
1✔
398
                nodeOpflexDevice: make(map[string]apicapi.ApicSlice),
1✔
399

1✔
400
                nodeServiceMetaCache:    make(map[string]*nodeServiceMeta),
1✔
401
                nodePodNetCache:         make(map[string]*nodePodNetMeta),
1✔
402
                serviceMetaCache:        make(map[string]*serviceMeta),
1✔
403
                snatPolicyCache:         make(map[string]*ContSnatPolicy),
1✔
404
                snatServices:            make(map[string]bool),
1✔
405
                snatNodeInfoCache:       make(map[string]*nodeinfo.NodeInfo),
1✔
406
                rdConfigCache:           make(map[string]*rdConfig.RdConfig),
1✔
407
                rdConfigSubnetCache:     make(map[string]*rdConfig.RdConfigSpec),
1✔
408
                podIftoEp:               make(map[string]*EndPointData),
1✔
409
                snatGlobalInfoCache:     make(map[string]map[string]*snatglobalinfo.GlobalInfo),
1✔
410
                istioCache:              make(map[string]*istiov1.AciIstioOperator),
1✔
411
                crdHandlers:             make(map[string]func(*AciController, <-chan struct{})),
1✔
412
                ctrPortNameCache:        make(map[string]*ctrPortNameEntry),
1✔
413
                nmPortNp:                make(map[string]bool),
1✔
414
                hppRef:                  make(map[string]hppReference),
1✔
415
                additionalNetworkCache:  make(map[string]*AdditionalNetworkMeta),
1✔
416
                sharedEncapCache:        make(map[int]*sharedEncapData),
1✔
417
                sharedEncapSfnaCache:    make(map[int]map[string]bool),
1✔
418
                sharedEncapSfnaVlanMap:  make(map[int]map[string]bool),
1✔
419
                sharedEncapSfnaLabelMap: make(map[string]map[string]bool),
1✔
420
                sharedEncapLabelMap:     make(map[string][]int),
1✔
421
                lldpIfCache:             make(map[string]string),
1✔
422
                fabricVlanPoolMap:       make(map[string]map[string]string),
1✔
423
        }
1✔
424
        cont.syncProcessors = map[string]func() bool{
1✔
425
                "snatGlobalInfo": cont.syncSnatGlobalInfo,
1✔
426
                "rdConfig":       cont.syncRdConfig,
1✔
427
                "istioCR":        cont.createIstioCR,
1✔
428
        }
1✔
429
        return cont
1✔
430
}
1✔
431

432
func (cont *AciController) Init() {
×
433
        if cont.config.ChainedMode {
×
434
                cont.log.Info("In chained mode")
×
435
        }
×
436
        if cont.config.LBType != lbTypeAci && !cont.config.ChainedMode {
×
437
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedNwPol")
×
438
                if err != nil {
×
439
                        panic(err.Error())
×
440
                }
441
        }
442

443
        egdata, err := json.Marshal(cont.config.DefaultEg)
×
444
        if err != nil {
×
445
                cont.log.Error("Could not serialize default endpoint group")
×
446
                panic(err.Error())
×
447
        }
448
        cont.defaultEg = string(egdata)
×
449

×
450
        sgdata, err := json.Marshal(cont.config.DefaultSg)
×
451
        if err != nil {
×
452
                cont.log.Error("Could not serialize default security groups")
×
453
                panic(err.Error())
×
454
        }
455
        cont.defaultSg = string(sgdata)
×
456

×
457
        cont.log.Debug("Initializing IPAM")
×
458
        cont.initIpam()
×
459
        // check if the cluster supports endpoint slices
×
460
        // if cluster doesn't have the support fallback to endpoints
×
461
        kubeClient := cont.env.(*K8sEnvironment).kubeClient
×
462
        if util.IsEndPointSlicesSupported(kubeClient) {
×
463
                cont.serviceEndPoints = &serviceEndpointSlice{}
×
464
                cont.serviceEndPoints.(*serviceEndpointSlice).cont = cont
×
465
                cont.log.Info("Initializing ServiceEndpointSlices")
×
466
        } else {
×
467
                cont.serviceEndPoints = &serviceEndpoint{}
×
468
                cont.serviceEndPoints.(*serviceEndpoint).cont = cont
×
469
                cont.log.Info("Initializing ServiceEndpoints")
×
470
        }
×
471

472
        err = cont.env.Init(cont)
×
473
        if err != nil {
×
474
                panic(err.Error())
×
475
        }
476
}
477

478
func (cont *AciController) processQueue(queue workqueue.RateLimitingInterface,
479
        store cache.Store, handler func(interface{}) bool,
480
        deleteHandler func(string) bool,
481
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
482
        go wait.Until(func() {
2✔
483
                for {
2✔
484
                        key, quit := queue.Get()
1✔
485
                        if quit {
2✔
486
                                break
1✔
487
                        }
488

489
                        var requeue bool
1✔
490
                        switch key := key.(type) {
1✔
491
                        case chan struct{}:
×
492
                                close(key)
×
493
                        case string:
1✔
494
                                if strings.HasPrefix(key, "DELETED_") {
2✔
495
                                        delKey := strings.Trim(key, "DELETED_")
1✔
496
                                        requeue = deleteHandler(delKey)
1✔
497
                                } else {
2✔
498
                                        obj, exists, err := store.GetByKey(key)
1✔
499
                                        if err != nil {
1✔
500
                                                cont.log.Debugf("Error fetching object with key %s from store: %v", key, err)
×
501
                                        }
×
502
                                        //Handle Add/Update/Delete
503
                                        if exists && handler != nil {
2✔
504
                                                requeue = handler(obj)
1✔
505
                                        }
1✔
506
                                        //Handle Post Delete
507
                                        if !exists && postDelHandler != nil {
1✔
508
                                                requeue = postDelHandler()
×
509
                                        }
×
510
                                }
511
                        }
512
                        if requeue {
2✔
513
                                queue.AddRateLimited(key)
1✔
514
                        } else {
2✔
515
                                queue.Forget(key)
1✔
516
                        }
1✔
517
                        queue.Done(key)
1✔
518
                }
519
        }, time.Second, stopCh)
520
        <-stopCh
1✔
521
        queue.ShutDown()
1✔
522
}
523

524
func (cont *AciController) globalStaticObjs() apicapi.ApicSlice {
1✔
525
        return apicapi.ApicSlice{}
1✔
526
}
1✔
527

528
func (cont *AciController) aciNameForKey(ktype, key string) string {
1✔
529
        return util.AciNameForKey(cont.config.AciPrefix, ktype, key)
1✔
530
}
1✔
531

532
func (cont *AciController) initStaticObjs() {
1✔
533
        cont.env.InitStaticAciObjects()
1✔
534
        cont.apicConn.WriteStaticApicObjects(cont.config.AciPrefix+"_static",
1✔
535
                cont.globalStaticObjs())
1✔
536
}
1✔
537

538
func (cont *AciController) vmmDomainProvider() (vmmProv string) {
1✔
539
        vmmProv = "Kubernetes"
1✔
540
        if strings.ToLower(cont.config.AciVmmDomainType) == "openshift" {
1✔
541
                vmmProv = "OpenShift"
×
542
        }
×
543
        return
1✔
544
}
545

546
func (cont *AciController) Run(stopCh <-chan struct{}) {
1✔
547
        var err error
1✔
548
        var privKey []byte
1✔
549
        var apicCert []byte
1✔
550

1✔
551
        cont.config.AciVrfDn = "uni/tn-" + cont.config.AciVrfTenant + "/ctx-" + cont.config.AciVrf
1✔
552

1✔
553
        if cont.config.ApicPrivateKeyPath != "" {
1✔
554
                privKey, err = os.ReadFile(cont.config.ApicPrivateKeyPath)
×
555
                if err != nil {
×
556
                        panic(err)
×
557
                }
558
        }
559
        if cont.config.ApicCertPath != "" {
1✔
560
                apicCert, err = os.ReadFile(cont.config.ApicCertPath)
×
561
                if err != nil {
×
562
                        panic(err)
×
563
                }
564
        }
565
        // If not defined, default is 1800
566
        if cont.config.ApicRefreshTimer == "" {
2✔
567
                cont.config.ApicRefreshTimer = "1800"
1✔
568
        }
1✔
569
        refreshTimeout, err := strconv.Atoi(cont.config.ApicRefreshTimer)
1✔
570
        if err != nil {
1✔
571
                panic(err)
×
572
        }
573
        cont.log.Info("ApicRefreshTimer conf is set to: ", refreshTimeout)
1✔
574

1✔
575
        // Bailout if the refreshTimeout is more than 12Hours or less than 5Mins
1✔
576
        if refreshTimeout > (12*60*60) || refreshTimeout < (5*60) {
1✔
577
                cont.log.Info("ApicRefreshTimer can't be more than 12Hrs or less than 5Mins")
×
578
                panic(err)
×
579
        }
580

581
        // If RefreshTickerAdjustInterval is not defined, default to 150Sec.
582
        if cont.config.ApicRefreshTickerAdjust == "" {
2✔
583
                cont.config.ApicRefreshTickerAdjust = "150"
1✔
584
        }
1✔
585
        refreshTickerAdjust, err := strconv.Atoi(cont.config.ApicRefreshTickerAdjust)
1✔
586
        if err != nil {
1✔
587
                panic(err)
×
588
        }
589

590
        //If ApicSubscriptionDelay is not defined, default to 100ms
591
        if cont.config.ApicSubscriptionDelay == 0 {
2✔
592
                cont.config.ApicSubscriptionDelay = 100
1✔
593
        }
1✔
594
        cont.log.Info("ApicSubscriptionDelay conf is set to: ", cont.config.ApicSubscriptionDelay)
1✔
595

1✔
596
        // If OpflexDeviceDeleteTimeout is not defined, default to 1800s
1✔
597
        if cont.config.OpflexDeviceDeleteTimeout == 0 {
2✔
598
                cont.config.OpflexDeviceDeleteTimeout = 1800
1✔
599
        }
1✔
600

601
        // If OpflexDeviceReconnectWaitTimeout is not defined, default to 25s
602
        if cont.config.OpflexDeviceReconnectWaitTimeout == 0 {
2✔
603
                cont.config.OpflexDeviceReconnectWaitTimeout = 25
1✔
604
        }
1✔
605
        cont.log.Debug("OpflexDeviceReconnectWaitTimeout set to: ", cont.config.OpflexDeviceReconnectWaitTimeout)
1✔
606

1✔
607
        // If SleepTimeSnatGlobalInfoSync is not defined, default to 60
1✔
608
        if cont.config.SleepTimeSnatGlobalInfoSync == 0 {
2✔
609
                cont.config.SleepTimeSnatGlobalInfoSync = 60
1✔
610
        }
1✔
611

612
        // If not defined, default to 32
613
        if cont.config.PodIpPoolChunkSize == 0 {
2✔
614
                cont.config.PodIpPoolChunkSize = 32
1✔
615
        }
1✔
616
        if !cont.config.ChainedMode {
2✔
617
                cont.log.Info("PodIpPoolChunkSize conf is set to: ", cont.config.PodIpPoolChunkSize)
1✔
618
        }
1✔
619
        // If not valid, default to 5000-65000
620
        // other permissible values 1-65000
621
        defStart := 5000
1✔
622
        defEnd := 65000
1✔
623
        if cont.config.SnatDefaultPortRangeStart == 0 {
2✔
624
                cont.config.SnatDefaultPortRangeStart = defStart
1✔
625
        }
1✔
626
        if cont.config.SnatDefaultPortRangeEnd == 0 {
2✔
627
                cont.config.SnatDefaultPortRangeEnd = defEnd
1✔
628
        }
1✔
629
        if cont.config.SnatDefaultPortRangeStart < 0 || cont.config.SnatDefaultPortRangeEnd < 0 ||
1✔
630
                cont.config.SnatDefaultPortRangeStart > defEnd || cont.config.SnatDefaultPortRangeEnd > defEnd ||
1✔
631
                cont.config.SnatDefaultPortRangeStart > cont.config.SnatDefaultPortRangeEnd {
1✔
632
                cont.config.SnatDefaultPortRangeStart = defStart
×
633
                cont.config.SnatDefaultPortRangeEnd = defEnd
×
634
        }
×
635

636
        // Set default value for pbr programming delay if services list is not empty
637
        // and delay value is empty
638
        if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 &&
1✔
639
                cont.config.ServiceGraphEndpointAddDelay.Services != nil &&
1✔
640
                len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 {
1✔
641
                cont.config.ServiceGraphEndpointAddDelay.Delay = 90
×
642
        }
×
643
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
644
                cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay)
×
645
        }
×
646

647
        // Set contract scope for snat svc graph to global by default
648
        if cont.config.SnatSvcContractScope == "" {
2✔
649
                cont.config.SnatSvcContractScope = "global"
1✔
650
        }
1✔
651
        if cont.config.MaxSvcGraphNodes == 0 {
2✔
652
                cont.config.MaxSvcGraphNodes = 32
1✔
653
        }
1✔
654
        if !cont.config.ChainedMode {
2✔
655
                cont.log.Info("Max number of nodes per svc graph is set to: ", cont.config.MaxSvcGraphNodes)
1✔
656
        }
1✔
657
        cont.apicConn, err = apicapi.New(cont.log, cont.config.ApicHosts,
1✔
658
                cont.config.ApicUsername, cont.config.ApicPassword,
1✔
659
                privKey, apicCert, cont.config.AciPrefix,
1✔
660
                refreshTimeout, refreshTickerAdjust, cont.config.ApicSubscriptionDelay,
1✔
661
                cont.config.AciVrfTenant)
1✔
662
        if err != nil {
1✔
663
                panic(err)
×
664
        }
665

666
        if len(cont.config.ApicHosts) != 0 {
1✔
667
                cont.log.WithFields(logrus.Fields{
×
668
                        "mod":  "APICAPI",
×
669
                        "host": cont.apicConn.Apic[cont.apicConn.ApicIndex],
×
670
                }).Debug("Connecting to APIC to determine the Version")
×
671

×
672
                version, err := cont.apicConn.GetVersion()
×
673
                if err != nil {
×
674
                        cont.log.Error("Could not get APIC version")
×
675
                        panic(err)
×
676
                }
677
                cont.apicConn.CachedVersion = version
×
678
                apicapi.ApicVersion = version
×
679
                if version >= "4.2(4i)" {
×
680
                        cont.apicConn.SnatPbrFltrChain = true
×
681
                } else {
×
682
                        cont.apicConn.SnatPbrFltrChain = false
×
683
                }
×
684
                if version >= "5.2" {
×
685
                        cont.vmmClusterFaultSupported = true
×
686
                }
×
687
        } else { // For unit-tests
1✔
688
                cont.apicConn.SnatPbrFltrChain = true
1✔
689
        }
1✔
690

691
        if !cont.config.ChainedMode {
2✔
692
                cont.log.Debug("SnatPbrFltrChain set to:", cont.apicConn.SnatPbrFltrChain)
1✔
693
                // Make sure Pod/NodeBDs are assoicated to same VRF.
1✔
694
                if len(cont.config.ApicHosts) != 0 && cont.config.AciPodBdDn != "" && cont.config.AciNodeBdDn != "" {
1✔
695
                        var expectedVrfRelations []string
×
696
                        expectedVrfRelations = append(expectedVrfRelations, cont.config.AciPodBdDn, cont.config.AciNodeBdDn)
×
697
                        cont.log.Debug("expectedVrfRelations:", expectedVrfRelations)
×
698
                        err = cont.apicConn.ValidateAciVrfAssociation(cont.config.AciVrfDn, expectedVrfRelations)
×
699
                        if err != nil {
×
700
                                cont.log.Error("Pod/NodeBDs and AciL3Out VRF association is incorrect")
×
701
                                panic(err)
×
702
                        }
703
                }
704
        }
705

706
        if len(cont.config.ApicHosts) != 0 && cont.vmmClusterFaultSupported && !cont.config.ChainedMode {
1✔
707
                //Clear fault instances when the controller starts
×
708
                cont.clearFaultInstances()
×
709
                //Subscribe for vmmEpPD for a given domain
×
710
                var tnTargetFilterEpg string
×
711
                tnTargetFilterEpg += fmt.Sprintf("uni/vmmp-%s/dom-%s", cont.vmmDomainProvider(), cont.config.AciVmmDomain)
×
712
                subnetTargetFilterEpg := fmt.Sprintf("and(wcard(vmmEpPD.dn,\"%s\"))", tnTargetFilterEpg)
×
713
                cont.apicConn.AddSubscriptionClass("vmmEpPD",
×
714
                        []string{"vmmEpPD"}, subnetTargetFilterEpg)
×
715
                cont.apicConn.SetSubscriptionHooks("vmmEpPD",
×
716
                        func(obj apicapi.ApicObject) bool {
×
717
                                cont.vmmEpPDChanged(obj)
×
718
                                return true
×
719
                        },
×
720
                        func(dn string) {
×
721
                                cont.vmmEpPDDeleted(dn)
×
722
                        })
×
723
        }
724

725
        cont.initStaticObjs()
1✔
726

1✔
727
        err = cont.env.PrepareRun(stopCh)
1✔
728
        if err != nil {
1✔
729
                panic(err.Error())
×
730
        }
731

732
        cont.apicConn.FullSyncHook = func() {
1✔
733
                // put a channel into each work queue and wait on it to
×
734
                // checkpoint object syncing in response to new subscription
×
735
                // updates
×
736
                cont.log.Debug("Starting checkpoint")
×
737
                var chans []chan struct{}
×
738
                qs := make([]workqueue.RateLimitingInterface, 0)
×
739
                _, ok := cont.env.(*K8sEnvironment)
×
740
                if ok {
×
741
                        if !cont.config.ChainedMode {
×
742
                                qs = []workqueue.RateLimitingInterface{
×
743
                                        cont.podQueue, cont.netPolQueue, cont.qosQueue,
×
744
                                        cont.serviceQueue, cont.snatQueue, cont.netflowQueue,
×
745
                                        cont.snatNodeInfoQueue, cont.rdConfigQueue, cont.erspanQueue,
×
746
                                }
×
747
                        } else {
×
748
                                qs = []workqueue.RateLimitingInterface{
×
749
                                        cont.podQueue,
×
750
                                }
×
751
                        }
×
752
                }
753
                for _, q := range qs {
×
754
                        c := make(chan struct{})
×
755
                        chans = append(chans, c)
×
756
                        q.Add(c)
×
757
                }
×
758
                for _, c := range chans {
×
759
                        <-c
×
760
                }
×
761
                cont.log.Debug("Checkpoint complete")
×
762
        }
763

764
        if len(cont.config.ApicHosts) != 0 && !cont.config.ChainedMode {
1✔
765
                cont.BuildSubnetDnCache(cont.config.AciVrfDn, cont.config.AciVrfDn)
×
766
                cont.scheduleRdConfig()
×
767
        }
×
768

769
        if !cont.config.ChainedMode {
2✔
770
                if cont.config.AciPolicyTenant != cont.config.AciVrfTenant {
2✔
771
                        cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
772
                                []string{"hostprotPol"})
1✔
773
                }
1✔
774
        } else {
×
775
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
×
776
                        []string{"fvBD", "fvAp"})
×
777
                cont.apicConn.AddSubscriptionClass("fvnsVlanInstP",
×
778
                        []string{"fvnsVlanInstP"}, "")
×
779
                cont.apicConn.AddSubscriptionClass("infraRsDomP",
×
780
                        []string{"infraRsDomP"}, "")
×
781
                cont.apicConn.AddSubscriptionClass("physDomP",
×
782
                        []string{"physDomP"}, "")
×
783
                cont.apicConn.AddSubscriptionClass("infraRsVlanNs",
×
784
                        []string{"infraRsVlanNs"}, "")
×
785
                cont.apicConn.AddSubscriptionClass("infraGeneric",
×
786
                        []string{"infraGeneric", "infraRsFuncToEpg"}, "")
×
787
        }
×
788
        if !cont.config.ChainedMode {
2✔
789
                // When a new class is added for subscriptio, check if its name attribute
1✔
790
                // is in the format aciPrefix-<some value>, if so add it in nameAttrClass
1✔
791
                // in apicapi.go
1✔
792
                subscribeMo := []string{"fvBD", "vnsLDevVip", "vnsAbsGraph", "vnsLDevCtx",
1✔
793
                        "vzFilter", "vzBrCP", "l3extInstP", "vnsSvcRedirectPol",
1✔
794
                        "vnsRedirectHealthGroup", "fvIPSLAMonitoringPol"}
1✔
795
                if cont.config.AciPolicyTenant == cont.config.AciVrfTenant {
1✔
796
                        subscribeMo = append(subscribeMo, "hostprotPol")
×
797
                }
×
798
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciVrfTenant,
1✔
799
                        subscribeMo)
1✔
800
                cont.apicConn.AddSubscriptionDn(fmt.Sprintf("uni/tn-%s/out-%s",
1✔
801
                        cont.config.AciVrfTenant, cont.config.AciL3Out),
1✔
802
                        []string{"fvRsCons"})
1✔
803
                vmmDn := fmt.Sprintf("comp/prov-%s/ctrlr-[%s]-%s/injcont",
1✔
804
                        cont.env.VmmPolicy(), cont.config.AciVmmDomain,
1✔
805
                        cont.config.AciVmmController)
1✔
806
                // Before subscribing to vmm objects, add vmmInjectedLabel as a child after explicit APIC version check
1✔
807
                // Since it is not supported for APIC versions < "5.0"
1✔
808
                cont.addVmmInjectedLabel()
1✔
809
                cont.apicConn.AddSubscriptionDn(vmmDn,
1✔
810
                        []string{"vmmInjectedHost", "vmmInjectedNs"})
1✔
811

1✔
812
                var tnTargetFilter string
1✔
813
                if len(cont.config.AciVrfRelatedTenants) > 0 {
1✔
814
                        for _, tn := range cont.config.AciVrfRelatedTenants {
×
815
                                tnTargetFilter += fmt.Sprintf("tn-%s|", tn)
×
816
                        }
×
817
                } else {
1✔
818
                        tnTargetFilter += fmt.Sprintf("tn-%s|tn-%s",
1✔
819
                                cont.config.AciPolicyTenant, cont.config.AciVrfTenant)
1✔
820
                }
1✔
821
                subnetTargetFilter := fmt.Sprintf("and(wcard(fvSubnet.dn,\"%s\"))",
1✔
822
                        tnTargetFilter)
1✔
823
                cont.apicConn.AddSubscriptionClass("fvSubnet",
1✔
824
                        []string{"fvSubnet"}, subnetTargetFilter)
1✔
825

1✔
826
                cont.apicConn.SetSubscriptionHooks("fvSubnet",
1✔
827
                        func(obj apicapi.ApicObject) bool {
1✔
828
                                cont.SubnetChanged(obj, cont.config.AciVrfDn)
×
829
                                return true
×
830
                        },
×
831
                        func(dn string) {
×
832
                                cont.SubnetDeleted(dn)
×
833
                        })
×
834

835
                cont.apicConn.AddSubscriptionClass("opflexODev",
1✔
836
                        []string{"opflexODev"}, "")
1✔
837

1✔
838
                cont.apicConn.SetSubscriptionHooks("opflexODev",
1✔
839
                        func(obj apicapi.ApicObject) bool {
1✔
840
                                cont.opflexDeviceChanged(obj)
×
841
                                return true
×
842
                        },
×
843
                        func(dn string) {
×
844
                                cont.opflexDeviceDeleted(dn)
×
845
                        })
×
846
        }
847
        go cont.apicConn.Run(stopCh)
1✔
848
}
849

850
func (cont *AciController) syncNodeAciPods(stopCh <-chan struct{}, seconds time.Duration) {
1✔
851
        cont.log.Debug("Go routine to periodically check the aci pod information in the opflexOdev of a node")
1✔
852
        ticker := time.NewTicker(seconds * time.Second)
1✔
853
        defer ticker.Stop()
1✔
854

1✔
855
        for {
2✔
856
                select {
1✔
857
                case <-ticker.C:
1✔
858
                        if cont.config.EnableOpflexAgentReconnect {
1✔
NEW
859
                                cont.checkChangeOfOpflexOdevAciPod()
×
NEW
860
                        }
×
861
                        if cont.config.AciMultipod {
1✔
862
                                cont.checkChangeOfOdevAciPod()
×
863
                        }
×
864
                case <-stopCh:
1✔
865
                        return
1✔
866
                }
867
        }
868
}
869

870
func (cont *AciController) syncOpflexDevices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
871
        cont.log.Debug("Go routine to periodically delete old opflexdevices started")
1✔
872
        ticker := time.NewTicker(seconds * time.Second)
1✔
873
        defer ticker.Stop()
1✔
874

1✔
875
        for {
2✔
876
                select {
1✔
877
                case <-ticker.C:
1✔
878
                        cont.deleteOldOpflexDevices()
1✔
879
                case <-stopCh:
1✔
880
                        return
1✔
881
                }
882
        }
883
}
884

885
func (cont *AciController) syncDelayedEpSlices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
886
        cont.log.Debug("Go routine to periodically check and process the epslices having delay adding in service")
1✔
887
        ticker := time.NewTicker(seconds * time.Second)
1✔
888
        defer ticker.Stop()
1✔
889

1✔
890
        for {
2✔
891
                select {
1✔
892
                case <-ticker.C:
1✔
893
                        cont.processDelayedEpSlices()
1✔
894
                case <-stopCh:
1✔
895
                        return
1✔
896
                }
897
        }
898
}
899

900
func (cont *AciController) snatGlobalInfoSync(stopCh <-chan struct{}, seconds int) {
1✔
901
        time.Sleep(time.Duration(seconds) * time.Second)
1✔
902
        cont.log.Debug("Go routine to periodically sync globalinfo and nodeinfo started")
1✔
903
        iteration := 0
1✔
904
        for {
2✔
905
                // To avoid noisy logs, only printing once in 5 minutes
1✔
906
                if iteration%5 == 0 {
2✔
907
                        cont.log.Debug("Syncing GlobalInfo with Node infos")
1✔
908
                }
1✔
909
                var nodeInfos []*nodeinfo.NodeInfo
1✔
910
                cont.indexMutex.Lock()
1✔
911
                cache.ListAll(cont.snatNodeInfoIndexer, labels.Everything(),
1✔
912
                        func(nodeInfoObj interface{}) {
2✔
913
                                nodeInfo := nodeInfoObj.(*nodeinfo.NodeInfo)
1✔
914
                                nodeInfos = append(nodeInfos, nodeInfo)
1✔
915
                        })
1✔
916
                expectedmap := make(map[string]map[string]bool)
1✔
917
                for _, glinfo := range cont.snatGlobalInfoCache {
2✔
918
                        for nodename, entry := range glinfo {
2✔
919
                                if _, found := expectedmap[nodename]; !found {
2✔
920
                                        newentry := make(map[string]bool)
1✔
921
                                        newentry[entry.SnatPolicyName] = true
1✔
922
                                        expectedmap[nodename] = newentry
1✔
923
                                } else {
2✔
924
                                        currententry := expectedmap[nodename]
1✔
925
                                        currententry[entry.SnatPolicyName] = true
1✔
926
                                        expectedmap[nodename] = currententry
1✔
927
                                }
1✔
928
                        }
929
                }
930
                cont.indexMutex.Unlock()
1✔
931

1✔
932
                for _, value := range nodeInfos {
2✔
933
                        marked := false
1✔
934
                        policyNames := value.Spec.SnatPolicyNames
1✔
935
                        nodeName := value.ObjectMeta.Name
1✔
936
                        _, ok := expectedmap[nodeName]
1✔
937
                        if !ok && len(policyNames) > 0 {
2✔
938
                                cont.log.Info("Adding missing entry in snatglobalinfo for node: ", nodeName)
1✔
939
                                cont.log.Debug("No snat policies found in snatglobalinfo")
1✔
940
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
941
                                marked = true
1✔
942
                        } else if len(policyNames) != len(expectedmap[nodeName]) {
3✔
943
                                cont.log.Info("Adding missing snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
944
                                cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
945
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
946
                                marked = true
1✔
947
                        } else {
2✔
948
                                if len(policyNames) == 0 && len(expectedmap[nodeName]) == 0 {
1✔
949
                                        // No snatpolicies present
×
950
                                        continue
×
951
                                }
952
                                eq := reflect.DeepEqual(expectedmap[nodeName], policyNames)
1✔
953
                                if !eq {
2✔
954
                                        cont.log.Debug("Syncing inconsistent snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
955
                                        cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
956
                                        cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
957
                                        marked = true
1✔
958
                                }
1✔
959
                        }
960
                        if marked {
2✔
961
                                cont.log.Info("Nodeinfo and globalinfo out of sync for node: ", nodeName)
1✔
962
                                nodeinfokey, err := cache.MetaNamespaceKeyFunc(value)
1✔
963
                                if err != nil {
1✔
964
                                        cont.log.Error("Not able to get key for node: ", nodeName)
×
965
                                        continue
×
966
                                }
967
                                cont.log.Info("Queuing nodeinfokey for globalinfo sync: ", nodeinfokey)
1✔
968
                                cont.queueNodeInfoUpdateByKey(nodeinfokey)
1✔
969
                        } else if iteration%5 == 0 {
2✔
970
                                cont.log.Info("Nodeinfo and globalinfo in sync for node: ", nodeName)
1✔
971
                        }
1✔
972
                }
973
                time.Sleep(time.Duration(seconds) * time.Second)
1✔
974
                iteration++
1✔
975
        }
976
}
977

978
func (cont *AciController) processSyncQueue(queue workqueue.RateLimitingInterface,
979
        queueStop <-chan struct{}) {
1✔
980
        go wait.Until(func() {
2✔
981
                for {
2✔
982
                        syncType, quit := queue.Get()
1✔
983
                        if quit {
2✔
984
                                break
1✔
985
                        }
986
                        var requeue bool
1✔
987
                        if sType, ok := syncType.(string); ok {
2✔
988
                                if f, ok := cont.syncProcessors[sType]; ok {
2✔
989
                                        requeue = f()
1✔
990
                                }
1✔
991
                        }
992
                        if requeue {
1✔
993
                                queue.AddRateLimited(syncType)
×
994
                        } else {
1✔
995
                                queue.Forget(syncType)
1✔
996
                        }
1✔
997
                        queue.Done(syncType)
1✔
998
                }
999
        }, time.Second, queueStop)
1000
        <-queueStop
1✔
1001
        queue.ShutDown()
1✔
1002
}
1003

1004
func (cont *AciController) scheduleSyncGlobalInfo() {
1✔
1005
        cont.syncQueue.AddRateLimited("snatGlobalInfo")
1✔
1006
}
1✔
1007
func (cont *AciController) scheduleRdConfig() {
×
1008
        cont.syncQueue.AddRateLimited("rdConfig")
×
1009
}
×
1010
func (cont *AciController) scheduleCreateIstioCR() {
×
1011
        cont.syncQueue.AddRateLimited("istioCR")
×
1012
}
×
1013

1014
func (cont *AciController) addVmmInjectedLabel() {
1✔
1015
        if apicapi.ApicVersion >= "5.2" {
1✔
1016
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedLabel")
×
1017
                if err != nil {
×
1018
                        panic(err.Error())
×
1019
                }
1020
                err = apicapi.AddMetaDataChild("vmmInjectedSvc", "vmmInjectedLabel")
×
1021
                if err != nil {
×
1022
                        panic(err.Error())
×
1023
                }
1024
        }
1025
        if apicapi.ApicVersion >= "5.0" {
1✔
1026
                err := apicapi.AddMetaDataChild("vmmInjectedReplSet", "vmmInjectedLabel")
×
1027
                if err != nil {
×
1028
                        panic(err.Error())
×
1029
                }
1030
                err = apicapi.AddMetaDataChild("vmmInjectedContGrp", "vmmInjectedLabel")
×
1031
                if err != nil {
×
1032
                        panic(err.Error())
×
1033
                }
1034
                err = apicapi.AddMetaDataChild("vmmInjectedDepl", "vmmInjectedLabel")
×
1035
                if err != nil {
×
1036
                        panic(err.Error())
×
1037
                }
1038
        }
1039
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc