• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 8900

29 Mar 2024 06:31PM UTC coverage: 71.015% (+0.1%) from 70.888%
8900

push

travis-pro

web-flow
Merge pull request #1298 from noironetworks/depalert-fix

Upgrade protobuf to version 1.33.0

10785 of 15187 relevant lines covered (71.01%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.48
/pkg/controller/controller.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "encoding/json"
19
        "fmt"
20
        "net"
21
        "os"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/sirupsen/logrus"
29
        "github.com/yl2chen/cidranger"
30
        "golang.org/x/time/rate"
31

32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        "k8s.io/apimachinery/pkg/labels"
35
        "k8s.io/apimachinery/pkg/util/wait"
36
        "k8s.io/client-go/kubernetes"
37
        "k8s.io/client-go/tools/cache"
38
        "k8s.io/client-go/util/workqueue"
39

40
        "github.com/noironetworks/aci-containers/pkg/apicapi"
41
        fabattv1 "github.com/noironetworks/aci-containers/pkg/fabricattachment/apis/aci.fabricattachment/v1"
42
        "github.com/noironetworks/aci-containers/pkg/index"
43
        "github.com/noironetworks/aci-containers/pkg/ipam"
44
        istiov1 "github.com/noironetworks/aci-containers/pkg/istiocrd/apis/aci.istio/v1"
45
        "github.com/noironetworks/aci-containers/pkg/metadata"
46
        nodeinfo "github.com/noironetworks/aci-containers/pkg/nodeinfo/apis/aci.snat/v1"
47
        rdConfig "github.com/noironetworks/aci-containers/pkg/rdconfig/apis/aci.snat/v1"
48
        snatglobalinfo "github.com/noironetworks/aci-containers/pkg/snatglobalinfo/apis/aci.snat/v1"
49
        "github.com/noironetworks/aci-containers/pkg/util"
50
)
51

52
type podUpdateFunc func(*v1.Pod) (*v1.Pod, error)
53
type nodeUpdateFunc func(*v1.Node) (*v1.Node, error)
54
type serviceUpdateFunc func(*v1.Service) (*v1.Service, error)
55

56
type AciController struct {
57
        log    *logrus.Logger
58
        config *ControllerConfig
59
        env    Environment
60

61
        defaultEg string
62
        defaultSg string
63

64
        unitTestMode bool
65

66
        podQueue            workqueue.RateLimitingInterface
67
        netPolQueue         workqueue.RateLimitingInterface
68
        qosQueue            workqueue.RateLimitingInterface
69
        serviceQueue        workqueue.RateLimitingInterface
70
        snatQueue           workqueue.RateLimitingInterface
71
        netflowQueue        workqueue.RateLimitingInterface
72
        erspanQueue         workqueue.RateLimitingInterface
73
        snatNodeInfoQueue   workqueue.RateLimitingInterface
74
        rdConfigQueue       workqueue.RateLimitingInterface
75
        istioQueue          workqueue.RateLimitingInterface
76
        nodeFabNetAttQueue  workqueue.RateLimitingInterface
77
        netFabConfigQueue   workqueue.RateLimitingInterface
78
        nadVlanMapQueue     workqueue.RateLimitingInterface
79
        fabricVlanPoolQueue workqueue.RateLimitingInterface
80

81
        namespaceIndexer       cache.Indexer
82
        namespaceInformer      cache.Controller
83
        podIndexer             cache.Indexer
84
        podInformer            cache.Controller
85
        endpointsIndexer       cache.Indexer
86
        endpointsInformer      cache.Controller
87
        serviceIndexer         cache.Indexer
88
        serviceInformer        cache.Controller
89
        replicaSetIndexer      cache.Indexer
90
        replicaSetInformer     cache.Controller
91
        deploymentIndexer      cache.Indexer
92
        deploymentInformer     cache.Controller
93
        nodeIndexer            cache.Indexer
94
        nodeInformer           cache.Controller
95
        networkPolicyIndexer   cache.Indexer
96
        networkPolicyInformer  cache.Controller
97
        snatIndexer            cache.Indexer
98
        snatInformer           cache.Controller
99
        snatNodeInfoIndexer    cache.Indexer
100
        snatNodeInformer       cache.Controller
101
        crdInformer            cache.Controller
102
        rdConfigInformer       cache.Controller
103
        rdConfigIndexer        cache.Indexer
104
        qosIndexer             cache.Indexer
105
        qosInformer            cache.Controller
106
        netflowIndexer         cache.Indexer
107
        netflowInformer        cache.Controller
108
        erspanIndexer          cache.Indexer
109
        erspanInformer         cache.Controller
110
        nodePodIfIndexer       cache.Indexer
111
        nodePodIfInformer      cache.Controller
112
        istioIndexer           cache.Indexer
113
        istioInformer          cache.Controller
114
        endpointSliceIndexer   cache.Indexer
115
        endpointSliceInformer  cache.Controller
116
        snatCfgInformer        cache.Controller
117
        updatePod              podUpdateFunc
118
        updateNode             nodeUpdateFunc
119
        updateServiceStatus    serviceUpdateFunc
120
        nodeFabNetAttInformer  cache.SharedIndexInformer
121
        netFabConfigInformer   cache.SharedIndexInformer
122
        nadVlanMapInformer     cache.SharedIndexInformer
123
        fabricVlanPoolInformer cache.SharedIndexInformer
124

125
        indexMutex sync.Mutex
126

127
        configuredPodNetworkIps *netIps
128
        podNetworkIps           *netIps
129
        serviceIps              *ipam.IpCache
130
        staticServiceIps        *netIps
131
        nodeServiceIps          *netIps
132

133
        // index of pods matched by deployments
134
        depPods *index.PodSelectorIndex
135
        // index of pods matched by network policies
136
        netPolPods *index.PodSelectorIndex
137
        // index of pods matched by network policy ingress rules
138
        netPolIngressPods *index.PodSelectorIndex
139
        // index of pods matched by network policy egress rules
140
        netPolEgressPods *index.PodSelectorIndex
141
        // index of IP addresses contained in endpoints objects
142
        endpointsIpIndex cidranger.Ranger
143
        // index of service target ports
144
        targetPortIndex map[string]*portIndexEntry
145
        // index of ip blocks referenced by network policy egress rules
146
        netPolSubnetIndex cidranger.Ranger
147
        // index of pods matched by erspan policies
148
        erspanPolPods *index.PodSelectorIndex
149

150
        apicConn *apicapi.ApicConnection
151

152
        nodeServiceMetaCache map[string]*nodeServiceMeta
153
        nodeACIPod           map[string]aciPodAnnot
154
        nodeACIPodAnnot      map[string]aciPodAnnot
155
        nodeOpflexDevice     map[string]apicapi.ApicSlice
156
        nodePodNetCache      map[string]*nodePodNetMeta
157
        serviceMetaCache     map[string]*serviceMeta
158
        snatPolicyCache      map[string]*ContSnatPolicy
159
        delayedEpSlices      []*DelayedEpSlice
160
        snatServices         map[string]bool
161
        snatNodeInfoCache    map[string]*nodeinfo.NodeInfo
162
        rdConfigCache        map[string]*rdConfig.RdConfig
163
        rdConfigSubnetCache  map[string]*rdConfig.RdConfigSpec
164
        istioCache           map[string]*istiov1.AciIstioOperator
165
        podIftoEp            map[string]*EndPointData
166
        // Node Name and Policy Name
167
        snatGlobalInfoCache map[string]map[string]*snatglobalinfo.GlobalInfo
168
        nodeSyncEnabled     bool
169
        serviceSyncEnabled  bool
170
        snatSyncEnabled     bool
171
        tunnelGetter        *tunnelState
172
        syncQueue           workqueue.RateLimitingInterface
173
        syncProcessors      map[string]func() bool
174
        serviceEndPoints    ServiceEndPointType
175
        crdHandlers         map[string]func(*AciController, <-chan struct{})
176
        stopCh              <-chan struct{}
177
        //index of containerportname to ctrPortNameEntry
178
        ctrPortNameCache map[string]*ctrPortNameEntry
179
        // named networkPolicies
180
        nmPortNp map[string]bool
181
        //maps network policy hash to hpp
182
        hppRef map[string]hppReference
183
        // cache to look for Epg DNs which are bound to Vmm domain
184
        cachedEpgDns             []string
185
        vmmClusterFaultSupported bool
186
        additionalNetworkCache   map[string]*AdditionalNetworkMeta
187
        //Used in Shared mode
188
        sharedEncapCache map[int]*sharedEncapData
189
        // vlan to propertiesList
190
        sharedEncapNfcCache         map[int]*NfcData
191
        sharedEncapNfcVlanMap       map[int]*NfcData
192
        sharedEncapNfcLabelMap      map[string]*NfcData
193
        sharedEncapNfcAppProfileMap map[string]map[int]bool
194
        // nadVlanMap encapLabel to vlan
195
        sharedEncapLabelMap      map[string][]int
196
        lldpIfCache              map[string]string
197
        globalVlanConfig         globalVlanConfig
198
        fabricVlanPoolMap        map[string]map[string]string
199
        openStackFabricPathDnMap map[string]openstackOpflexOdevInfo
200
        openStackSystemId        string
201
}
202

203
type NfcData struct {
204
        Aeps map[string]bool
205
        Epg  fabattv1.Epg
206
}
207

208
type sharedEncapData struct {
209
        //node to NAD to pods
210
        Pods   map[string]map[string][]string
211
        NetRef map[string]*AdditionalNetworkMeta
212
}
213

214
type globalVlanConfig struct {
215
        SharedPhysDom apicapi.ApicObject
216
}
217

218
type hppReference struct {
219
        RefCount uint              `json:"ref-count,omitempty"`
220
        Npkeys   []string          `json:"npkeys,omitempty"`
221
        HppObj   apicapi.ApicSlice `json:"hpp-obj,omitempty"`
222
}
223

224
type DelayedEpSlice struct {
225
        ServiceKey  string
226
        OldEpSlice  *discovery.EndpointSlice
227
        NewEpSlice  *discovery.EndpointSlice
228
        DelayedTime time.Time
229
}
230

231
type aciPodAnnot struct {
232
        aciPod             string
233
        isSingleOpflexOdev bool
234
        disconnectTime     time.Time
235
        connectTime        time.Time
236
}
237

238
type nodeServiceMeta struct {
239
        serviceEp metadata.ServiceEndpoint
240
}
241

242
type nodePodNetMeta struct {
243
        nodePods            map[string]bool
244
        podNetIps           metadata.NetIps
245
        podNetIpsAnnotation string
246
}
247

248
type openstackOpflexOdevInfo struct {
249
        opflexODevDn map[string]struct{}
250
        fabricPathDn string
251
}
252

253
type serviceMeta struct {
254
        requestedIps     []net.IP
255
        ingressIps       []net.IP
256
        staticIngressIps []net.IP
257
}
258

259
type ipIndexEntry struct {
260
        ipNet net.IPNet
261
        keys  map[string]bool
262
}
263

264
type targetPort struct {
265
        proto v1.Protocol
266
        ports []int
267
}
268

269
type portIndexEntry struct {
270
        port              targetPort
271
        serviceKeys       map[string]bool
272
        networkPolicyKeys map[string]bool
273
}
274

275
type portRangeSnat struct {
276
        start int
277
        end   int
278
}
279

280
// EndPointData holds PodIF data in controller.
281
type EndPointData struct {
282
        MacAddr    string
283
        EPG        string
284
        Namespace  string
285
        AppProfile string
286
}
287

288
type ctrPortNameEntry struct {
289
        // Proto+port->pods
290
        ctrNmpToPods map[string]map[string]bool
291
}
292

293
type LinkData struct {
294
        Link []string
295
        Pods []string
296
}
297

298
type AdditionalNetworkMeta struct {
299
        NetworkName string
300
        EncapVlan   string
301
        //node+localiface->fabricLinks
302
        FabricLink map[string]map[string]LinkData
303
        NodeCache  map[string]*fabattv1.NodeFabricNetworkAttachment
304
        Mode       util.EncapMode
305
}
306

307
type ServiceEndPointType interface {
308
        InitClientInformer(kubeClient *kubernetes.Clientset)
309
        Run(stopCh <-chan struct{})
310
        Wait(stopCh <-chan struct{})
311
        UpdateServicesForNode(nodename string)
312
        GetnodesMetadata(key string, service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint)
313
        SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool
314
        SetNpServiceAugmentForService(servicekey string, service *v1.Service, prs *portRemoteSubnet,
315
                portAugments map[string]*portServiceAugment, subnetIndex cidranger.Ranger, logger *logrus.Entry)
316
}
317

318
type serviceEndpoint struct {
319
        cont *AciController
320
}
321
type serviceEndpointSlice struct {
322
        cont *AciController
323
}
324

325
func (sep *serviceEndpoint) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
326
        sep.cont.initEndpointsInformerFromClient(kubeClient)
×
327
}
×
328

329
func (seps *serviceEndpointSlice) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
330
        seps.cont.initEndpointSliceInformerFromClient(kubeClient)
×
331
}
×
332

333
func (sep *serviceEndpoint) Run(stopCh <-chan struct{}) {
1✔
334
        go sep.cont.endpointsInformer.Run(stopCh)
1✔
335
}
1✔
336

337
func (seps *serviceEndpointSlice) Run(stopCh <-chan struct{}) {
1✔
338
        go seps.cont.endpointSliceInformer.Run(stopCh)
1✔
339
}
1✔
340

341
func (sep *serviceEndpoint) Wait(stopCh <-chan struct{}) {
1✔
342
        cache.WaitForCacheSync(stopCh,
1✔
343
                sep.cont.endpointsInformer.HasSynced,
1✔
344
                sep.cont.serviceInformer.HasSynced)
1✔
345
}
1✔
346

347
func (seps *serviceEndpointSlice) Wait(stopCh <-chan struct{}) {
1✔
348
        seps.cont.log.Debug("Waiting for EndPointSlicecache sync")
1✔
349
        cache.WaitForCacheSync(stopCh,
1✔
350
                seps.cont.endpointSliceInformer.HasSynced,
1✔
351
                seps.cont.serviceInformer.HasSynced)
1✔
352
}
1✔
353

354
func (e *ipIndexEntry) Network() net.IPNet {
1✔
355
        return e.ipNet
1✔
356
}
1✔
357

358
func newNodePodNetMeta() *nodePodNetMeta {
1✔
359
        return &nodePodNetMeta{
1✔
360
                nodePods: make(map[string]bool),
1✔
361
        }
1✔
362
}
1✔
363

364
func createQueue(name string) workqueue.RateLimitingInterface {
1✔
365
        return workqueue.NewNamedRateLimitingQueue(
1✔
366
                workqueue.NewMaxOfRateLimiter(
1✔
367
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
368
                                10*time.Second),
1✔
369
                        &workqueue.BucketRateLimiter{
1✔
370
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
371
                        },
1✔
372
                ),
1✔
373
                "delta")
1✔
374
}
1✔
375

376
func NewController(config *ControllerConfig, env Environment, log *logrus.Logger, unittestmode bool) *AciController {
1✔
377
        cont := &AciController{
1✔
378
                log:          log,
1✔
379
                config:       config,
1✔
380
                env:          env,
1✔
381
                defaultEg:    "",
1✔
382
                defaultSg:    "",
1✔
383
                unitTestMode: unittestmode,
1✔
384

1✔
385
                podQueue:            createQueue("pod"),
1✔
386
                netPolQueue:         createQueue("networkPolicy"),
1✔
387
                qosQueue:            createQueue("qos"),
1✔
388
                netflowQueue:        createQueue("netflow"),
1✔
389
                erspanQueue:         createQueue("erspan"),
1✔
390
                serviceQueue:        createQueue("service"),
1✔
391
                snatQueue:           createQueue("snat"),
1✔
392
                snatNodeInfoQueue:   createQueue("snatnodeinfo"),
1✔
393
                rdConfigQueue:       createQueue("rdconfig"),
1✔
394
                istioQueue:          createQueue("istio"),
1✔
395
                nodeFabNetAttQueue:  createQueue("nodefabricnetworkattachment"),
1✔
396
                netFabConfigQueue:   createQueue("networkfabricconfiguration"),
1✔
397
                nadVlanMapQueue:     createQueue("nadvlanmap"),
1✔
398
                fabricVlanPoolQueue: createQueue("fabricvlanpool"),
1✔
399
                syncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
400
                        &workqueue.BucketRateLimiter{
1✔
401
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
402
                        }, "sync"),
1✔
403

1✔
404
                configuredPodNetworkIps: newNetIps(),
1✔
405
                podNetworkIps:           newNetIps(),
1✔
406
                serviceIps:              ipam.NewIpCache(),
1✔
407
                staticServiceIps:        newNetIps(),
1✔
408
                nodeServiceIps:          newNetIps(),
1✔
409

1✔
410
                nodeACIPod:       make(map[string]aciPodAnnot),
1✔
411
                nodeACIPodAnnot:  make(map[string]aciPodAnnot),
1✔
412
                nodeOpflexDevice: make(map[string]apicapi.ApicSlice),
1✔
413

1✔
414
                nodeServiceMetaCache:        make(map[string]*nodeServiceMeta),
1✔
415
                nodePodNetCache:             make(map[string]*nodePodNetMeta),
1✔
416
                serviceMetaCache:            make(map[string]*serviceMeta),
1✔
417
                snatPolicyCache:             make(map[string]*ContSnatPolicy),
1✔
418
                snatServices:                make(map[string]bool),
1✔
419
                snatNodeInfoCache:           make(map[string]*nodeinfo.NodeInfo),
1✔
420
                rdConfigCache:               make(map[string]*rdConfig.RdConfig),
1✔
421
                rdConfigSubnetCache:         make(map[string]*rdConfig.RdConfigSpec),
1✔
422
                podIftoEp:                   make(map[string]*EndPointData),
1✔
423
                snatGlobalInfoCache:         make(map[string]map[string]*snatglobalinfo.GlobalInfo),
1✔
424
                istioCache:                  make(map[string]*istiov1.AciIstioOperator),
1✔
425
                crdHandlers:                 make(map[string]func(*AciController, <-chan struct{})),
1✔
426
                ctrPortNameCache:            make(map[string]*ctrPortNameEntry),
1✔
427
                nmPortNp:                    make(map[string]bool),
1✔
428
                hppRef:                      make(map[string]hppReference),
1✔
429
                additionalNetworkCache:      make(map[string]*AdditionalNetworkMeta),
1✔
430
                sharedEncapCache:            make(map[int]*sharedEncapData),
1✔
431
                sharedEncapNfcCache:         make(map[int]*NfcData),
1✔
432
                sharedEncapNfcVlanMap:       make(map[int]*NfcData),
1✔
433
                sharedEncapNfcLabelMap:      make(map[string]*NfcData),
1✔
434
                sharedEncapNfcAppProfileMap: make(map[string]map[int]bool),
1✔
435
                sharedEncapLabelMap:         make(map[string][]int),
1✔
436
                lldpIfCache:                 make(map[string]string),
1✔
437
                fabricVlanPoolMap:           make(map[string]map[string]string),
1✔
438
                openStackFabricPathDnMap:    make(map[string]openstackOpflexOdevInfo),
1✔
439
        }
1✔
440
        cont.syncProcessors = map[string]func() bool{
1✔
441
                "snatGlobalInfo": cont.syncSnatGlobalInfo,
1✔
442
                "rdConfig":       cont.syncRdConfig,
1✔
443
                /* Commenting code to remove dependency from istio.io/istio package.
1✔
444
                   Vulnerabilties were detected by quay.io security scan of aci-containers-controller
1✔
445
                   and aci-containers-operator images for istio.io/istio package
1✔
446

1✔
447
                "istioCR":        cont.createIstioCR,
1✔
448
                */
1✔
449
        }
1✔
450
        return cont
1✔
451
}
1✔
452

453
func (cont *AciController) Init() {
×
454
        if cont.config.ChainedMode {
×
455
                cont.log.Info("In chained mode")
×
456
        }
×
457
        if cont.config.LBType != lbTypeAci && !cont.config.ChainedMode {
×
458
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedNwPol")
×
459
                if err != nil {
×
460
                        panic(err.Error())
×
461
                }
462
        }
463

464
        egdata, err := json.Marshal(cont.config.DefaultEg)
×
465
        if err != nil {
×
466
                cont.log.Error("Could not serialize default endpoint group")
×
467
                panic(err.Error())
×
468
        }
469
        cont.defaultEg = string(egdata)
×
470

×
471
        sgdata, err := json.Marshal(cont.config.DefaultSg)
×
472
        if err != nil {
×
473
                cont.log.Error("Could not serialize default security groups")
×
474
                panic(err.Error())
×
475
        }
476
        cont.defaultSg = string(sgdata)
×
477

×
478
        cont.log.Debug("Initializing IPAM")
×
479
        cont.initIpam()
×
480
        // check if the cluster supports endpoint slices
×
481
        // if cluster doesn't have the support fallback to endpoints
×
482
        kubeClient := cont.env.(*K8sEnvironment).kubeClient
×
483
        if util.IsEndPointSlicesSupported(kubeClient) {
×
484
                cont.serviceEndPoints = &serviceEndpointSlice{}
×
485
                cont.serviceEndPoints.(*serviceEndpointSlice).cont = cont
×
486
                cont.log.Info("Initializing ServiceEndpointSlices")
×
487
        } else {
×
488
                cont.serviceEndPoints = &serviceEndpoint{}
×
489
                cont.serviceEndPoints.(*serviceEndpoint).cont = cont
×
490
                cont.log.Info("Initializing ServiceEndpoints")
×
491
        }
×
492

493
        err = cont.env.Init(cont)
×
494
        if err != nil {
×
495
                panic(err.Error())
×
496
        }
497
}
498

499
func (cont *AciController) processQueue(queue workqueue.RateLimitingInterface,
500
        store cache.Store, handler func(interface{}) bool,
501
        deleteHandler func(string) bool,
502
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
503
        go wait.Until(func() {
2✔
504
                for {
2✔
505
                        key, quit := queue.Get()
1✔
506
                        if quit {
2✔
507
                                break
1✔
508
                        }
509

510
                        var requeue bool
1✔
511
                        switch key := key.(type) {
1✔
512
                        case chan struct{}:
×
513
                                close(key)
×
514
                        case string:
1✔
515
                                if strings.HasPrefix(key, "DELETED_") {
2✔
516
                                        delKey := strings.Trim(key, "DELETED_")
1✔
517
                                        requeue = deleteHandler(delKey)
1✔
518
                                } else {
2✔
519
                                        obj, exists, err := store.GetByKey(key)
1✔
520
                                        if err != nil {
1✔
521
                                                cont.log.Debugf("Error fetching object with key %s from store: %v", key, err)
×
522
                                        }
×
523
                                        //Handle Add/Update/Delete
524
                                        if exists && handler != nil {
2✔
525
                                                requeue = handler(obj)
1✔
526
                                        }
1✔
527
                                        //Handle Post Delete
528
                                        if !exists && postDelHandler != nil {
1✔
529
                                                requeue = postDelHandler()
×
530
                                        }
×
531
                                }
532
                        }
533
                        if requeue {
2✔
534
                                queue.AddRateLimited(key)
1✔
535
                        } else {
2✔
536
                                queue.Forget(key)
1✔
537
                        }
1✔
538
                        queue.Done(key)
1✔
539
                }
540
        }, time.Second, stopCh)
541
        <-stopCh
1✔
542
        queue.ShutDown()
1✔
543
}
544

545
func (cont *AciController) globalStaticObjs() apicapi.ApicSlice {
1✔
546
        return apicapi.ApicSlice{}
1✔
547
}
1✔
548

549
func (cont *AciController) aciNameForKey(ktype, key string) string {
1✔
550
        return util.AciNameForKey(cont.config.AciPrefix, ktype, key)
1✔
551
}
1✔
552

553
func (cont *AciController) initStaticObjs() {
1✔
554
        cont.env.InitStaticAciObjects()
1✔
555
        cont.apicConn.WriteStaticApicObjects(cont.config.AciPrefix+"_static",
1✔
556
                cont.globalStaticObjs())
1✔
557
}
1✔
558

559
func (cont *AciController) vmmDomainProvider() (vmmProv string) {
1✔
560
        vmmProv = "Kubernetes"
1✔
561
        if strings.ToLower(cont.config.AciVmmDomainType) == "openshift" {
1✔
562
                vmmProv = "OpenShift"
×
563
        }
×
564
        return
1✔
565
}
566

567
func (cont *AciController) Run(stopCh <-chan struct{}) {
1✔
568
        var err error
1✔
569
        var privKey []byte
1✔
570
        var apicCert []byte
1✔
571

1✔
572
        cont.config.AciVrfDn = "uni/tn-" + cont.config.AciVrfTenant + "/ctx-" + cont.config.AciVrf
1✔
573

1✔
574
        if cont.config.ApicPrivateKeyPath != "" {
1✔
575
                privKey, err = os.ReadFile(cont.config.ApicPrivateKeyPath)
×
576
                if err != nil {
×
577
                        panic(err)
×
578
                }
579
        }
580
        if cont.config.ApicCertPath != "" {
1✔
581
                apicCert, err = os.ReadFile(cont.config.ApicCertPath)
×
582
                if err != nil {
×
583
                        panic(err)
×
584
                }
585
        }
586
        // If not defined, default is 1800
587
        if cont.config.ApicRefreshTimer == "" {
2✔
588
                cont.config.ApicRefreshTimer = "1800"
1✔
589
        }
1✔
590
        refreshTimeout, err := strconv.Atoi(cont.config.ApicRefreshTimer)
1✔
591
        if err != nil {
1✔
592
                panic(err)
×
593
        }
594
        cont.log.Info("ApicRefreshTimer conf is set to: ", refreshTimeout)
1✔
595

1✔
596
        // Bailout if the refreshTimeout is more than 12Hours or less than 5Mins
1✔
597
        if refreshTimeout > (12*60*60) || refreshTimeout < (5*60) {
1✔
598
                cont.log.Info("ApicRefreshTimer can't be more than 12Hrs or less than 5Mins")
×
599
                panic(err)
×
600
        }
601

602
        // If RefreshTickerAdjustInterval is not defined, default to 150Sec.
603
        if cont.config.ApicRefreshTickerAdjust == "" {
2✔
604
                cont.config.ApicRefreshTickerAdjust = "150"
1✔
605
        }
1✔
606
        refreshTickerAdjust, err := strconv.Atoi(cont.config.ApicRefreshTickerAdjust)
1✔
607
        if err != nil {
1✔
608
                panic(err)
×
609
        }
610

611
        //If ApicSubscriptionDelay is not defined, default to 100ms
612
        if cont.config.ApicSubscriptionDelay == 0 {
2✔
613
                cont.config.ApicSubscriptionDelay = 100
1✔
614
        }
1✔
615
        cont.log.Info("ApicSubscriptionDelay conf is set to: ", cont.config.ApicSubscriptionDelay)
1✔
616

1✔
617
        // If OpflexDeviceDeleteTimeout is not defined, default to 1800s
1✔
618
        if cont.config.OpflexDeviceDeleteTimeout == 0 {
2✔
619
                cont.config.OpflexDeviceDeleteTimeout = 1800
1✔
620
        }
1✔
621

622
        // If OpflexDeviceReconnectWaitTimeout is not defined, default to 25s
623
        if cont.config.OpflexDeviceReconnectWaitTimeout == 0 {
2✔
624
                cont.config.OpflexDeviceReconnectWaitTimeout = 25
1✔
625
        }
1✔
626
        cont.log.Debug("OpflexDeviceReconnectWaitTimeout set to: ", cont.config.OpflexDeviceReconnectWaitTimeout)
1✔
627

1✔
628
        // If SleepTimeSnatGlobalInfoSync is not defined, default to 60
1✔
629
        if cont.config.SleepTimeSnatGlobalInfoSync == 0 {
2✔
630
                cont.config.SleepTimeSnatGlobalInfoSync = 60
1✔
631
        }
1✔
632

633
        // If not defined, default to 32
634
        if cont.config.PodIpPoolChunkSize == 0 {
2✔
635
                cont.config.PodIpPoolChunkSize = 32
1✔
636
        }
1✔
637
        if !cont.config.ChainedMode {
2✔
638
                cont.log.Info("PodIpPoolChunkSize conf is set to: ", cont.config.PodIpPoolChunkSize)
1✔
639
        }
1✔
640

641
        // If ApicConnectionRetryLimit is not defined, default to 5
642
        if cont.config.ApicConnectionRetryLimit == 0 {
2✔
643
                cont.config.ApicConnectionRetryLimit = 5
1✔
644
        }
1✔
645
        cont.log.Debug("ApicConnectionRetryLimit set to: ", cont.config.ApicConnectionRetryLimit)
1✔
646

1✔
647
        // If not valid, default to 5000-65000
1✔
648
        // other permissible values 1-65000
1✔
649
        defStart := 5000
1✔
650
        defEnd := 65000
1✔
651
        if cont.config.SnatDefaultPortRangeStart == 0 {
2✔
652
                cont.config.SnatDefaultPortRangeStart = defStart
1✔
653
        }
1✔
654
        if cont.config.SnatDefaultPortRangeEnd == 0 {
2✔
655
                cont.config.SnatDefaultPortRangeEnd = defEnd
1✔
656
        }
1✔
657
        if cont.config.SnatDefaultPortRangeStart < 0 || cont.config.SnatDefaultPortRangeEnd < 0 ||
1✔
658
                cont.config.SnatDefaultPortRangeStart > defEnd || cont.config.SnatDefaultPortRangeEnd > defEnd ||
1✔
659
                cont.config.SnatDefaultPortRangeStart > cont.config.SnatDefaultPortRangeEnd {
1✔
660
                cont.config.SnatDefaultPortRangeStart = defStart
×
661
                cont.config.SnatDefaultPortRangeEnd = defEnd
×
662
        }
×
663

664
        // Set default value for pbr programming delay if services list is not empty
665
        // and delay value is empty
666
        if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 &&
1✔
667
                cont.config.ServiceGraphEndpointAddDelay.Services != nil &&
1✔
668
                len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 {
1✔
669
                cont.config.ServiceGraphEndpointAddDelay.Delay = 90
×
670
        }
×
671
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
672
                cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay)
×
673
        }
×
674

675
        // Set contract scope for snat svc graph to global by default
676
        if cont.config.SnatSvcContractScope == "" {
2✔
677
                cont.config.SnatSvcContractScope = "global"
1✔
678
        }
1✔
679
        if cont.config.MaxSvcGraphNodes == 0 {
2✔
680
                cont.config.MaxSvcGraphNodes = 32
1✔
681
        }
1✔
682
        if !cont.config.ChainedMode {
2✔
683
                cont.log.Info("Max number of nodes per svc graph is set to: ", cont.config.MaxSvcGraphNodes)
1✔
684
        }
1✔
685
        cont.apicConn, err = apicapi.New(cont.log, cont.config.ApicHosts,
1✔
686
                cont.config.ApicUsername, cont.config.ApicPassword,
1✔
687
                privKey, apicCert, cont.config.AciPrefix,
1✔
688
                refreshTimeout, refreshTickerAdjust, cont.config.ApicSubscriptionDelay,
1✔
689
                cont.config.AciVrfTenant)
1✔
690
        if err != nil {
1✔
691
                panic(err)
×
692
        }
693

694
        cont.apicConn.ReconnectRetryLimit = cont.config.ApicConnectionRetryLimit
1✔
695

1✔
696
        if len(cont.config.ApicHosts) != 0 {
1✔
697
        APIC_SWITCH:
×
698
                cont.log.WithFields(logrus.Fields{
×
699
                        "mod":  "APICAPI",
×
700
                        "host": cont.apicConn.Apic[cont.apicConn.ApicIndex],
×
701
                }).Debug("Connecting to APIC to determine the Version")
×
702

×
703
                version, err := cont.apicConn.GetVersion()
×
704
                if err != nil {
×
705
                        cont.log.Error("Could not get APIC version, switching to next APIC")
×
706
                        cont.apicConn.ApicIndex = (cont.apicConn.ApicIndex + 1) % len(cont.apicConn.Apic)
×
707
                        time.Sleep(cont.apicConn.ReconnectInterval)
×
708
                        goto APIC_SWITCH
×
709
                }
710
                cont.apicConn.CachedVersion = version
×
711
                apicapi.ApicVersion = version
×
712
                if version >= "4.2(4i)" {
×
713
                        cont.apicConn.SnatPbrFltrChain = true
×
714
                } else {
×
715
                        cont.apicConn.SnatPbrFltrChain = false
×
716
                }
×
717
                if version >= "5.2" {
×
718
                        cont.vmmClusterFaultSupported = true
×
719
                }
×
720
        } else { // For unit-tests
1✔
721
                cont.apicConn.SnatPbrFltrChain = true
1✔
722
        }
1✔
723

724
        if !cont.config.ChainedMode {
2✔
725
                cont.log.Debug("SnatPbrFltrChain set to:", cont.apicConn.SnatPbrFltrChain)
1✔
726
                // Make sure Pod/NodeBDs are assoicated to same VRF.
1✔
727
                if len(cont.config.ApicHosts) != 0 && cont.config.AciPodBdDn != "" && cont.config.AciNodeBdDn != "" {
1✔
728
                        var expectedVrfRelations []string
×
729
                        expectedVrfRelations = append(expectedVrfRelations, cont.config.AciPodBdDn, cont.config.AciNodeBdDn)
×
730
                        cont.log.Debug("expectedVrfRelations:", expectedVrfRelations)
×
731
                        err = cont.apicConn.ValidateAciVrfAssociation(cont.config.AciVrfDn, expectedVrfRelations)
×
732
                        if err != nil {
×
733
                                cont.log.Error("Pod/NodeBDs and AciL3Out VRF association is incorrect")
×
734
                                panic(err)
×
735
                        }
736
                }
737
        }
738

739
        if len(cont.config.ApicHosts) != 0 && cont.vmmClusterFaultSupported && !cont.config.ChainedMode {
1✔
740
                //Clear fault instances when the controller starts
×
741
                cont.clearFaultInstances()
×
742
                //Subscribe for vmmEpPD for a given domain
×
743
                var tnTargetFilterEpg string
×
744
                tnTargetFilterEpg += fmt.Sprintf("uni/vmmp-%s/dom-%s/", cont.vmmDomainProvider(), cont.config.AciVmmDomain)
×
745
                subnetTargetFilterEpg := fmt.Sprintf("and(wcard(vmmEpPD.dn,\"%s\"))", tnTargetFilterEpg)
×
746
                cont.apicConn.AddSubscriptionClass("vmmEpPD",
×
747
                        []string{"vmmEpPD"}, subnetTargetFilterEpg)
×
748
                cont.apicConn.SetSubscriptionHooks("vmmEpPD",
×
749
                        func(obj apicapi.ApicObject) bool {
×
750
                                cont.vmmEpPDChanged(obj)
×
751
                                return true
×
752
                        },
×
753
                        func(dn string) {
×
754
                                cont.vmmEpPDDeleted(dn)
×
755
                        })
×
756
        }
757

758
        cont.initStaticObjs()
1✔
759

1✔
760
        err = cont.env.PrepareRun(stopCh)
1✔
761
        if err != nil {
1✔
762
                panic(err.Error())
×
763
        }
764

765
        cont.apicConn.FullSyncHook = func() {
1✔
766
                // put a channel into each work queue and wait on it to
×
767
                // checkpoint object syncing in response to new subscription
×
768
                // updates
×
769
                cont.log.Debug("Starting checkpoint")
×
770
                var chans []chan struct{}
×
771
                qs := make([]workqueue.RateLimitingInterface, 0)
×
772
                _, ok := cont.env.(*K8sEnvironment)
×
773
                if ok {
×
774
                        qs = []workqueue.RateLimitingInterface{cont.podQueue}
×
775
                        if !cont.config.ChainedMode {
×
776
                                if !cont.config.DisableHppRendering {
×
777
                                        qs = append(qs, cont.netPolQueue)
×
778
                                }
×
779
                                qs = append(qs, cont.qosQueue, cont.serviceQueue,
×
780
                                        cont.snatQueue, cont.netflowQueue, cont.snatNodeInfoQueue,
×
781
                                        cont.rdConfigQueue, cont.erspanQueue)
×
782
                        }
783
                }
784
                for _, q := range qs {
×
785
                        c := make(chan struct{})
×
786
                        chans = append(chans, c)
×
787
                        q.Add(c)
×
788
                }
×
789
                for _, c := range chans {
×
790
                        <-c
×
791
                }
×
792
                cont.log.Debug("Checkpoint complete")
×
793
        }
794

795
        if len(cont.config.ApicHosts) != 0 && !cont.config.ChainedMode {
1✔
796
                cont.BuildSubnetDnCache(cont.config.AciVrfDn, cont.config.AciVrfDn)
×
797
                cont.scheduleRdConfig()
×
798
                if strings.Contains(cont.config.Flavor, "openstack") {
×
799
                        cont.setOpenStackSystemId()
×
800
                }
×
801
        }
802

803
        if !cont.config.ChainedMode {
2✔
804
                if cont.config.AciPolicyTenant != cont.config.AciVrfTenant {
2✔
805
                        cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
806
                                []string{"hostprotPol"})
1✔
807
                }
1✔
808
        } else {
1✔
809
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
810
                        []string{"fvBD", "fvAp"})
1✔
811
                cont.apicConn.AddSubscriptionClass("fvnsVlanInstP",
1✔
812
                        []string{"fvnsVlanInstP"}, "")
1✔
813
                cont.apicConn.AddSubscriptionClass("infraRsDomP",
1✔
814
                        []string{"infraRsDomP"}, "")
1✔
815
                cont.apicConn.AddSubscriptionClass("physDomP",
1✔
816
                        []string{"physDomP"}, "")
1✔
817
                cont.apicConn.AddSubscriptionClass("infraRsVlanNs",
1✔
818
                        []string{"infraRsVlanNs"}, "")
1✔
819
                cont.apicConn.AddSubscriptionClass("infraGeneric",
1✔
820
                        []string{"infraGeneric", "infraRsFuncToEpg"}, "")
1✔
821
        }
1✔
822
        if !cont.config.ChainedMode {
2✔
823
                // When a new class is added for subscriptio, check if its name attribute
1✔
824
                // is in the format aciPrefix-<some value>, if so add it in nameAttrClass
1✔
825
                // in apicapi.go
1✔
826
                subscribeMo := []string{"fvBD", "vnsLDevVip", "vnsAbsGraph", "vnsLDevCtx",
1✔
827
                        "vzFilter", "vzBrCP", "l3extInstP", "vnsSvcRedirectPol",
1✔
828
                        "vnsRedirectHealthGroup", "fvIPSLAMonitoringPol"}
1✔
829
                if cont.config.AciPolicyTenant == cont.config.AciVrfTenant {
1✔
830
                        subscribeMo = append(subscribeMo, "hostprotPol")
×
831
                }
×
832
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciVrfTenant,
1✔
833
                        subscribeMo)
1✔
834
                cont.apicConn.AddSubscriptionDn(fmt.Sprintf("uni/tn-%s/out-%s",
1✔
835
                        cont.config.AciVrfTenant, cont.config.AciL3Out),
1✔
836
                        []string{"fvRsCons"})
1✔
837
                vmmDn := fmt.Sprintf("comp/prov-%s/ctrlr-[%s]-%s/injcont",
1✔
838
                        cont.env.VmmPolicy(), cont.config.AciVmmDomain,
1✔
839
                        cont.config.AciVmmController)
1✔
840
                // Before subscribing to vmm objects, add vmmInjectedLabel as a child after explicit APIC version check
1✔
841
                // Since it is not supported for APIC versions < "5.0"
1✔
842
                cont.addVmmInjectedLabel()
1✔
843
                cont.apicConn.AddSubscriptionDn(vmmDn,
1✔
844
                        []string{"vmmInjectedHost", "vmmInjectedNs"})
1✔
845

1✔
846
                var tnTargetFilter string
1✔
847
                if len(cont.config.AciVrfRelatedTenants) > 0 {
1✔
848
                        for _, tn := range cont.config.AciVrfRelatedTenants {
×
849
                                tnTargetFilter += fmt.Sprintf("tn-%s/|", tn)
×
850
                        }
×
851
                } else {
1✔
852
                        tnTargetFilter += fmt.Sprintf("tn-%s/|tn-%s/",
1✔
853
                                cont.config.AciPolicyTenant, cont.config.AciVrfTenant)
1✔
854
                }
1✔
855
                subnetTargetFilter := fmt.Sprintf("and(wcard(fvSubnet.dn,\"%s\"))",
1✔
856
                        tnTargetFilter)
1✔
857
                cont.apicConn.AddSubscriptionClass("fvSubnet",
1✔
858
                        []string{"fvSubnet"}, subnetTargetFilter)
1✔
859

1✔
860
                cont.apicConn.SetSubscriptionHooks("fvSubnet",
1✔
861
                        func(obj apicapi.ApicObject) bool {
1✔
862
                                cont.SubnetChanged(obj, cont.config.AciVrfDn)
×
863
                                return true
×
864
                        },
×
865
                        func(dn string) {
×
866
                                cont.SubnetDeleted(dn)
×
867
                        })
×
868

869
                cont.apicConn.AddSubscriptionClass("opflexODev",
1✔
870
                        []string{"opflexODev"}, "")
1✔
871

1✔
872
                cont.apicConn.SetSubscriptionHooks("opflexODev",
1✔
873
                        func(obj apicapi.ApicObject) bool {
1✔
874
                                cont.opflexDeviceChanged(obj)
×
875
                                return true
×
876
                        },
×
877
                        func(dn string) {
×
878
                                cont.opflexDeviceDeleted(dn)
×
879
                        })
×
880
        }
881
        go cont.apicConn.Run(stopCh)
1✔
882
}
883

884
func (cont *AciController) syncNodeAciPods(stopCh <-chan struct{}, seconds time.Duration) {
1✔
885
        cont.log.Debug("Go routine to periodically check the aci pod information in the opflexOdev of a node")
1✔
886
        ticker := time.NewTicker(seconds * time.Second)
1✔
887
        defer ticker.Stop()
1✔
888

1✔
889
        for {
2✔
890
                select {
1✔
891
                case <-ticker.C:
1✔
892
                        if cont.config.EnableOpflexAgentReconnect {
1✔
893
                                cont.checkChangeOfOpflexOdevAciPod()
×
894
                        }
×
895
                        if cont.config.AciMultipod {
1✔
896
                                cont.checkChangeOfOdevAciPod()
×
897
                        }
×
898
                case <-stopCh:
1✔
899
                        return
1✔
900
                }
901
        }
902
}
903

904
func (cont *AciController) syncOpflexDevices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
905
        cont.log.Debug("Go routine to periodically delete old opflexdevices started")
1✔
906
        ticker := time.NewTicker(seconds * time.Second)
1✔
907
        defer ticker.Stop()
1✔
908

1✔
909
        for {
2✔
910
                select {
1✔
911
                case <-ticker.C:
1✔
912
                        cont.deleteOldOpflexDevices()
1✔
913
                case <-stopCh:
1✔
914
                        return
1✔
915
                }
916
        }
917
}
918

919
func (cont *AciController) syncDelayedEpSlices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
920
        cont.log.Debug("Go routine to periodically check and process the epslices having delay adding in service")
1✔
921
        ticker := time.NewTicker(seconds * time.Second)
1✔
922
        defer ticker.Stop()
1✔
923

1✔
924
        for {
2✔
925
                select {
1✔
926
                case <-ticker.C:
1✔
927
                        cont.processDelayedEpSlices()
1✔
928
                case <-stopCh:
1✔
929
                        return
1✔
930
                }
931
        }
932
}
933

934
func (cont *AciController) snatGlobalInfoSync(stopCh <-chan struct{}, seconds int) {
1✔
935
        time.Sleep(time.Duration(seconds) * time.Second)
1✔
936
        cont.log.Debug("Go routine to periodically sync globalinfo and nodeinfo started")
1✔
937
        iteration := 0
1✔
938
        for {
2✔
939
                // To avoid noisy logs, only printing once in 5 minutes
1✔
940
                if iteration%5 == 0 {
2✔
941
                        cont.log.Debug("Syncing GlobalInfo with Node infos")
1✔
942
                }
1✔
943
                var nodeInfos []*nodeinfo.NodeInfo
1✔
944
                cont.indexMutex.Lock()
1✔
945
                cache.ListAll(cont.snatNodeInfoIndexer, labels.Everything(),
1✔
946
                        func(nodeInfoObj interface{}) {
2✔
947
                                nodeInfo := nodeInfoObj.(*nodeinfo.NodeInfo)
1✔
948
                                nodeInfos = append(nodeInfos, nodeInfo)
1✔
949
                        })
1✔
950
                expectedmap := make(map[string]map[string]bool)
1✔
951
                for _, glinfo := range cont.snatGlobalInfoCache {
2✔
952
                        for nodename, entry := range glinfo {
2✔
953
                                if _, found := expectedmap[nodename]; !found {
2✔
954
                                        newentry := make(map[string]bool)
1✔
955
                                        newentry[entry.SnatPolicyName] = true
1✔
956
                                        expectedmap[nodename] = newentry
1✔
957
                                } else {
2✔
958
                                        currententry := expectedmap[nodename]
1✔
959
                                        currententry[entry.SnatPolicyName] = true
1✔
960
                                        expectedmap[nodename] = currententry
1✔
961
                                }
1✔
962
                        }
963
                }
964
                cont.indexMutex.Unlock()
1✔
965

1✔
966
                for _, value := range nodeInfos {
2✔
967
                        marked := false
1✔
968
                        policyNames := value.Spec.SnatPolicyNames
1✔
969
                        nodeName := value.ObjectMeta.Name
1✔
970
                        _, ok := expectedmap[nodeName]
1✔
971
                        if !ok && len(policyNames) > 0 {
2✔
972
                                cont.log.Info("Adding missing entry in snatglobalinfo for node: ", nodeName)
1✔
973
                                cont.log.Debug("No snat policies found in snatglobalinfo")
1✔
974
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
975
                                marked = true
1✔
976
                        } else if len(policyNames) != len(expectedmap[nodeName]) {
3✔
977
                                cont.log.Info("Adding missing snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
978
                                cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
979
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
980
                                marked = true
1✔
981
                        } else {
2✔
982
                                if len(policyNames) == 0 && len(expectedmap[nodeName]) == 0 {
1✔
983
                                        // No snatpolicies present
×
984
                                        continue
×
985
                                }
986
                                eq := reflect.DeepEqual(expectedmap[nodeName], policyNames)
1✔
987
                                if !eq {
2✔
988
                                        cont.log.Debug("Syncing inconsistent snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
989
                                        cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
990
                                        cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
991
                                        marked = true
1✔
992
                                }
1✔
993
                        }
994
                        if marked {
2✔
995
                                cont.log.Info("Nodeinfo and globalinfo out of sync for node: ", nodeName)
1✔
996
                                nodeinfokey, err := cache.MetaNamespaceKeyFunc(value)
1✔
997
                                if err != nil {
1✔
998
                                        cont.log.Error("Not able to get key for node: ", nodeName)
×
999
                                        continue
×
1000
                                }
1001
                                cont.log.Info("Queuing nodeinfokey for globalinfo sync: ", nodeinfokey)
1✔
1002
                                cont.queueNodeInfoUpdateByKey(nodeinfokey)
1✔
1003
                        } else if iteration%5 == 0 {
2✔
1004
                                cont.log.Info("Nodeinfo and globalinfo in sync for node: ", nodeName)
1✔
1005
                        }
1✔
1006
                }
1007
                time.Sleep(time.Duration(seconds) * time.Second)
1✔
1008
                iteration++
1✔
1009
        }
1010
}
1011

1012
func (cont *AciController) processSyncQueue(queue workqueue.RateLimitingInterface,
1013
        queueStop <-chan struct{}) {
1✔
1014
        go wait.Until(func() {
2✔
1015
                for {
2✔
1016
                        syncType, quit := queue.Get()
1✔
1017
                        if quit {
2✔
1018
                                break
1✔
1019
                        }
1020
                        var requeue bool
1✔
1021
                        if sType, ok := syncType.(string); ok {
2✔
1022
                                if f, ok := cont.syncProcessors[sType]; ok {
2✔
1023
                                        requeue = f()
1✔
1024
                                }
1✔
1025
                        }
1026
                        if requeue {
1✔
1027
                                queue.AddRateLimited(syncType)
×
1028
                        } else {
1✔
1029
                                queue.Forget(syncType)
1✔
1030
                        }
1✔
1031
                        queue.Done(syncType)
1✔
1032
                }
1033
        }, time.Second, queueStop)
1034
        <-queueStop
1✔
1035
        queue.ShutDown()
1✔
1036
}
1037

1038
func (cont *AciController) scheduleSyncGlobalInfo() {
1✔
1039
        cont.syncQueue.AddRateLimited("snatGlobalInfo")
1✔
1040
}
1✔
1041
func (cont *AciController) scheduleRdConfig() {
×
1042
        cont.syncQueue.AddRateLimited("rdConfig")
×
1043
}
×
1044
func (cont *AciController) scheduleCreateIstioCR() {
×
1045
        cont.syncQueue.AddRateLimited("istioCR")
×
1046
}
×
1047

1048
func (cont *AciController) addVmmInjectedLabel() {
1✔
1049
        if apicapi.ApicVersion >= "5.2" {
1✔
1050
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedLabel")
×
1051
                if err != nil {
×
1052
                        panic(err.Error())
×
1053
                }
1054
                err = apicapi.AddMetaDataChild("vmmInjectedSvc", "vmmInjectedLabel")
×
1055
                if err != nil {
×
1056
                        panic(err.Error())
×
1057
                }
1058
        }
1059
        if apicapi.ApicVersion >= "5.0" {
2✔
1060
                err := apicapi.AddMetaDataChild("vmmInjectedReplSet", "vmmInjectedLabel")
1✔
1061
                if err != nil {
1✔
1062
                        panic(err.Error())
×
1063
                }
1064
                err = apicapi.AddMetaDataChild("vmmInjectedContGrp", "vmmInjectedLabel")
1✔
1065
                if err != nil {
1✔
1066
                        panic(err.Error())
×
1067
                }
1068
                err = apicapi.AddMetaDataChild("vmmInjectedDepl", "vmmInjectedLabel")
1✔
1069
                if err != nil {
1✔
1070
                        panic(err.Error())
×
1071
                }
1072
        }
1073
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc