• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 8884

27 Mar 2024 02:32PM UTC coverage: 71.594% (+0.7%) from 70.888%
8884

Pull #1295

travis-pro

smshareef
Add missing unit tests

Added UTs for hostagent/fabricvlanpools.go, hostagent/ipam.go.
Added UT for the case where the static IP of service is updated
and the status conditions are not changed.
Pull Request #1295: Add missing unit tests

10757 of 15025 relevant lines covered (71.59%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.61
/pkg/controller/controller.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "encoding/json"
19
        "fmt"
20
        "net"
21
        "os"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/sirupsen/logrus"
29
        "github.com/yl2chen/cidranger"
30
        "golang.org/x/time/rate"
31

32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        "k8s.io/apimachinery/pkg/labels"
35
        "k8s.io/apimachinery/pkg/util/wait"
36
        "k8s.io/client-go/kubernetes"
37
        "k8s.io/client-go/tools/cache"
38
        "k8s.io/client-go/util/workqueue"
39

40
        "github.com/noironetworks/aci-containers/pkg/apicapi"
41
        fabattv1 "github.com/noironetworks/aci-containers/pkg/fabricattachment/apis/aci.fabricattachment/v1"
42
        "github.com/noironetworks/aci-containers/pkg/index"
43
        "github.com/noironetworks/aci-containers/pkg/ipam"
44
        istiov1 "github.com/noironetworks/aci-containers/pkg/istiocrd/apis/aci.istio/v1"
45
        "github.com/noironetworks/aci-containers/pkg/metadata"
46
        nodeinfo "github.com/noironetworks/aci-containers/pkg/nodeinfo/apis/aci.snat/v1"
47
        rdConfig "github.com/noironetworks/aci-containers/pkg/rdconfig/apis/aci.snat/v1"
48
        snatglobalinfo "github.com/noironetworks/aci-containers/pkg/snatglobalinfo/apis/aci.snat/v1"
49
        "github.com/noironetworks/aci-containers/pkg/util"
50
)
51

52
type podUpdateFunc func(*v1.Pod) (*v1.Pod, error)
53
type nodeUpdateFunc func(*v1.Node) (*v1.Node, error)
54
type serviceUpdateFunc func(*v1.Service) (*v1.Service, error)
55

56
type AciController struct {
57
        log    *logrus.Logger
58
        config *ControllerConfig
59
        env    Environment
60

61
        defaultEg string
62
        defaultSg string
63

64
        unitTestMode bool
65

66
        podQueue            workqueue.RateLimitingInterface
67
        netPolQueue         workqueue.RateLimitingInterface
68
        qosQueue            workqueue.RateLimitingInterface
69
        serviceQueue        workqueue.RateLimitingInterface
70
        snatQueue           workqueue.RateLimitingInterface
71
        netflowQueue        workqueue.RateLimitingInterface
72
        erspanQueue         workqueue.RateLimitingInterface
73
        snatNodeInfoQueue   workqueue.RateLimitingInterface
74
        rdConfigQueue       workqueue.RateLimitingInterface
75
        istioQueue          workqueue.RateLimitingInterface
76
        nodeFabNetAttQueue  workqueue.RateLimitingInterface
77
        netFabConfigQueue   workqueue.RateLimitingInterface
78
        nadVlanMapQueue     workqueue.RateLimitingInterface
79
        fabricVlanPoolQueue workqueue.RateLimitingInterface
80

81
        namespaceIndexer       cache.Indexer
82
        namespaceInformer      cache.Controller
83
        podIndexer             cache.Indexer
84
        podInformer            cache.Controller
85
        endpointsIndexer       cache.Indexer
86
        endpointsInformer      cache.Controller
87
        serviceIndexer         cache.Indexer
88
        serviceInformer        cache.Controller
89
        replicaSetIndexer      cache.Indexer
90
        replicaSetInformer     cache.Controller
91
        deploymentIndexer      cache.Indexer
92
        deploymentInformer     cache.Controller
93
        nodeIndexer            cache.Indexer
94
        nodeInformer           cache.Controller
95
        networkPolicyIndexer   cache.Indexer
96
        networkPolicyInformer  cache.Controller
97
        snatIndexer            cache.Indexer
98
        snatInformer           cache.Controller
99
        snatNodeInfoIndexer    cache.Indexer
100
        snatNodeInformer       cache.Controller
101
        crdInformer            cache.Controller
102
        rdConfigInformer       cache.Controller
103
        rdConfigIndexer        cache.Indexer
104
        qosIndexer             cache.Indexer
105
        qosInformer            cache.Controller
106
        netflowIndexer         cache.Indexer
107
        netflowInformer        cache.Controller
108
        erspanIndexer          cache.Indexer
109
        erspanInformer         cache.Controller
110
        nodePodIfIndexer       cache.Indexer
111
        nodePodIfInformer      cache.Controller
112
        istioIndexer           cache.Indexer
113
        istioInformer          cache.Controller
114
        endpointSliceIndexer   cache.Indexer
115
        endpointSliceInformer  cache.Controller
116
        snatCfgInformer        cache.Controller
117
        updatePod              podUpdateFunc
118
        updateNode             nodeUpdateFunc
119
        updateServiceStatus    serviceUpdateFunc
120
        nodeFabNetAttInformer  cache.SharedIndexInformer
121
        netFabConfigInformer   cache.SharedIndexInformer
122
        nadVlanMapInformer     cache.SharedIndexInformer
123
        fabricVlanPoolInformer cache.SharedIndexInformer
124

125
        indexMutex sync.Mutex
126

127
        configuredPodNetworkIps *netIps
128
        podNetworkIps           *netIps
129
        serviceIps              *ipam.IpCache
130
        staticServiceIps        *netIps
131
        nodeServiceIps          *netIps
132

133
        // index of pods matched by deployments
134
        depPods *index.PodSelectorIndex
135
        // index of pods matched by network policies
136
        netPolPods *index.PodSelectorIndex
137
        // index of pods matched by network policy ingress rules
138
        netPolIngressPods *index.PodSelectorIndex
139
        // index of pods matched by network policy egress rules
140
        netPolEgressPods *index.PodSelectorIndex
141
        // index of IP addresses contained in endpoints objects
142
        endpointsIpIndex cidranger.Ranger
143
        // index of service target ports
144
        targetPortIndex map[string]*portIndexEntry
145
        // index of ip blocks referenced by network policy egress rules
146
        netPolSubnetIndex cidranger.Ranger
147
        // index of pods matched by erspan policies
148
        erspanPolPods *index.PodSelectorIndex
149

150
        apicConn *apicapi.ApicConnection
151

152
        nodeServiceMetaCache map[string]*nodeServiceMeta
153
        nodeACIPod           map[string]aciPodAnnot
154
        nodeACIPodAnnot      map[string]aciPodAnnot
155
        nodeOpflexDevice     map[string]apicapi.ApicSlice
156
        nodePodNetCache      map[string]*nodePodNetMeta
157
        serviceMetaCache     map[string]*serviceMeta
158
        snatPolicyCache      map[string]*ContSnatPolicy
159
        delayedEpSlices      []*DelayedEpSlice
160
        snatServices         map[string]bool
161
        snatNodeInfoCache    map[string]*nodeinfo.NodeInfo
162
        rdConfigCache        map[string]*rdConfig.RdConfig
163
        rdConfigSubnetCache  map[string]*rdConfig.RdConfigSpec
164
        istioCache           map[string]*istiov1.AciIstioOperator
165
        podIftoEp            map[string]*EndPointData
166
        // Node Name and Policy Name
167
        snatGlobalInfoCache map[string]map[string]*snatglobalinfo.GlobalInfo
168
        nodeSyncEnabled     bool
169
        serviceSyncEnabled  bool
170
        snatSyncEnabled     bool
171
        tunnelGetter        *tunnelState
172
        syncQueue           workqueue.RateLimitingInterface
173
        syncProcessors      map[string]func() bool
174
        serviceEndPoints    ServiceEndPointType
175
        crdHandlers         map[string]func(*AciController, <-chan struct{})
176
        stopCh              <-chan struct{}
177
        //index of containerportname to ctrPortNameEntry
178
        ctrPortNameCache map[string]*ctrPortNameEntry
179
        // named networkPolicies
180
        nmPortNp map[string]bool
181
        //maps network policy hash to hpp
182
        hppRef map[string]hppReference
183
        // cache to look for Epg DNs which are bound to Vmm domain
184
        cachedEpgDns             []string
185
        vmmClusterFaultSupported bool
186
        additionalNetworkCache   map[string]*AdditionalNetworkMeta
187
        //Used in Shared mode
188
        sharedEncapCache map[int]*sharedEncapData
189
        // vlan to propertiesList
190
        sharedEncapNfcCache    map[int]*NfcData
191
        sharedEncapNfcVlanMap  map[int]*NfcData
192
        sharedEncapNfcLabelMap map[string]*NfcData
193
        // nadVlanMap encapLabel to vlan
194
        sharedEncapLabelMap map[string][]int
195
        lldpIfCache         map[string]string
196
        globalVlanConfig    globalVlanConfig
197
        fabricVlanPoolMap   map[string]map[string]string
198
}
199

200
type NfcData struct {
201
        Aeps map[string]bool
202
        Epg  fabattv1.Epg
203
}
204

205
type sharedEncapData struct {
206
        //node to NAD to pods
207
        Pods   map[string]map[string][]string
208
        NetRef map[string]*AdditionalNetworkMeta
209
}
210

211
type globalVlanConfig struct {
212
        SharedPhysDom apicapi.ApicObject
213
}
214

215
type hppReference struct {
216
        RefCount uint              `json:"ref-count,omitempty"`
217
        Npkeys   []string          `json:"npkeys,omitempty"`
218
        HppObj   apicapi.ApicSlice `json:"hpp-obj,omitempty"`
219
}
220

221
type DelayedEpSlice struct {
222
        ServiceKey  string
223
        OldEpSlice  *discovery.EndpointSlice
224
        NewEpSlice  *discovery.EndpointSlice
225
        DelayedTime time.Time
226
}
227

228
type aciPodAnnot struct {
229
        aciPod             string
230
        isSingleOpflexOdev bool
231
        disconnectTime     time.Time
232
        connectTime        time.Time
233
}
234

235
type nodeServiceMeta struct {
236
        serviceEp metadata.ServiceEndpoint
237
}
238

239
type nodePodNetMeta struct {
240
        nodePods            map[string]bool
241
        podNetIps           metadata.NetIps
242
        podNetIpsAnnotation string
243
}
244

245
type serviceMeta struct {
246
        requestedIps     []net.IP
247
        ingressIps       []net.IP
248
        staticIngressIps []net.IP
249
}
250

251
type ipIndexEntry struct {
252
        ipNet net.IPNet
253
        keys  map[string]bool
254
}
255

256
type targetPort struct {
257
        proto v1.Protocol
258
        ports []int
259
}
260

261
type portIndexEntry struct {
262
        port              targetPort
263
        serviceKeys       map[string]bool
264
        networkPolicyKeys map[string]bool
265
}
266

267
type portRangeSnat struct {
268
        start int
269
        end   int
270
}
271

272
// EndPointData holds PodIF data in controller.
273
type EndPointData struct {
274
        MacAddr    string
275
        EPG        string
276
        Namespace  string
277
        AppProfile string
278
}
279

280
type ctrPortNameEntry struct {
281
        // Proto+port->pods
282
        ctrNmpToPods map[string]map[string]bool
283
}
284

285
type LinkData struct {
286
        Link []string
287
        Pods []string
288
}
289

290
type AdditionalNetworkMeta struct {
291
        NetworkName string
292
        EncapVlan   string
293
        //node+localiface->fabricLinks
294
        FabricLink map[string]map[string]LinkData
295
        NodeCache  map[string]*fabattv1.NodeFabricNetworkAttachment
296
        Mode       util.EncapMode
297
}
298

299
type ServiceEndPointType interface {
300
        InitClientInformer(kubeClient *kubernetes.Clientset)
301
        Run(stopCh <-chan struct{})
302
        Wait(stopCh <-chan struct{})
303
        UpdateServicesForNode(nodename string)
304
        GetnodesMetadata(key string, service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint)
305
        SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool
306
        SetNpServiceAugmentForService(servicekey string, service *v1.Service, prs *portRemoteSubnet,
307
                portAugments map[string]*portServiceAugment, subnetIndex cidranger.Ranger, logger *logrus.Entry)
308
}
309

310
type serviceEndpoint struct {
311
        cont *AciController
312
}
313
type serviceEndpointSlice struct {
314
        cont *AciController
315
}
316

317
func (sep *serviceEndpoint) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
318
        sep.cont.initEndpointsInformerFromClient(kubeClient)
×
319
}
×
320

321
func (seps *serviceEndpointSlice) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
322
        seps.cont.initEndpointSliceInformerFromClient(kubeClient)
×
323
}
×
324

325
func (sep *serviceEndpoint) Run(stopCh <-chan struct{}) {
1✔
326
        go sep.cont.endpointsInformer.Run(stopCh)
1✔
327
}
1✔
328

329
func (seps *serviceEndpointSlice) Run(stopCh <-chan struct{}) {
1✔
330
        go seps.cont.endpointSliceInformer.Run(stopCh)
1✔
331
}
1✔
332

333
func (sep *serviceEndpoint) Wait(stopCh <-chan struct{}) {
1✔
334
        cache.WaitForCacheSync(stopCh,
1✔
335
                sep.cont.endpointsInformer.HasSynced,
1✔
336
                sep.cont.serviceInformer.HasSynced)
1✔
337
}
1✔
338

339
func (seps *serviceEndpointSlice) Wait(stopCh <-chan struct{}) {
1✔
340
        seps.cont.log.Debug("Waiting for EndPointSlicecache sync")
1✔
341
        cache.WaitForCacheSync(stopCh,
1✔
342
                seps.cont.endpointSliceInformer.HasSynced,
1✔
343
                seps.cont.serviceInformer.HasSynced)
1✔
344
}
1✔
345

346
func (e *ipIndexEntry) Network() net.IPNet {
1✔
347
        return e.ipNet
1✔
348
}
1✔
349

350
func newNodePodNetMeta() *nodePodNetMeta {
1✔
351
        return &nodePodNetMeta{
1✔
352
                nodePods: make(map[string]bool),
1✔
353
        }
1✔
354
}
1✔
355

356
func createQueue(name string) workqueue.RateLimitingInterface {
1✔
357
        return workqueue.NewNamedRateLimitingQueue(
1✔
358
                workqueue.NewMaxOfRateLimiter(
1✔
359
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
360
                                10*time.Second),
1✔
361
                        &workqueue.BucketRateLimiter{
1✔
362
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
363
                        },
1✔
364
                ),
1✔
365
                "delta")
1✔
366
}
1✔
367

368
func NewController(config *ControllerConfig, env Environment, log *logrus.Logger, unittestmode bool) *AciController {
1✔
369
        cont := &AciController{
1✔
370
                log:          log,
1✔
371
                config:       config,
1✔
372
                env:          env,
1✔
373
                defaultEg:    "",
1✔
374
                defaultSg:    "",
1✔
375
                unitTestMode: unittestmode,
1✔
376

1✔
377
                podQueue:            createQueue("pod"),
1✔
378
                netPolQueue:         createQueue("networkPolicy"),
1✔
379
                qosQueue:            createQueue("qos"),
1✔
380
                netflowQueue:        createQueue("netflow"),
1✔
381
                erspanQueue:         createQueue("erspan"),
1✔
382
                serviceQueue:        createQueue("service"),
1✔
383
                snatQueue:           createQueue("snat"),
1✔
384
                snatNodeInfoQueue:   createQueue("snatnodeinfo"),
1✔
385
                rdConfigQueue:       createQueue("rdconfig"),
1✔
386
                istioQueue:          createQueue("istio"),
1✔
387
                nodeFabNetAttQueue:  createQueue("nodefabricnetworkattachment"),
1✔
388
                netFabConfigQueue:   createQueue("networkfabricconfiguration"),
1✔
389
                nadVlanMapQueue:     createQueue("nadvlanmap"),
1✔
390
                fabricVlanPoolQueue: createQueue("fabricvlanpool"),
1✔
391
                syncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
392
                        &workqueue.BucketRateLimiter{
1✔
393
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
394
                        }, "sync"),
1✔
395

1✔
396
                configuredPodNetworkIps: newNetIps(),
1✔
397
                podNetworkIps:           newNetIps(),
1✔
398
                serviceIps:              ipam.NewIpCache(),
1✔
399
                staticServiceIps:        newNetIps(),
1✔
400
                nodeServiceIps:          newNetIps(),
1✔
401

1✔
402
                nodeACIPod:       make(map[string]aciPodAnnot),
1✔
403
                nodeACIPodAnnot:  make(map[string]aciPodAnnot),
1✔
404
                nodeOpflexDevice: make(map[string]apicapi.ApicSlice),
1✔
405

1✔
406
                nodeServiceMetaCache:   make(map[string]*nodeServiceMeta),
1✔
407
                nodePodNetCache:        make(map[string]*nodePodNetMeta),
1✔
408
                serviceMetaCache:       make(map[string]*serviceMeta),
1✔
409
                snatPolicyCache:        make(map[string]*ContSnatPolicy),
1✔
410
                snatServices:           make(map[string]bool),
1✔
411
                snatNodeInfoCache:      make(map[string]*nodeinfo.NodeInfo),
1✔
412
                rdConfigCache:          make(map[string]*rdConfig.RdConfig),
1✔
413
                rdConfigSubnetCache:    make(map[string]*rdConfig.RdConfigSpec),
1✔
414
                podIftoEp:              make(map[string]*EndPointData),
1✔
415
                snatGlobalInfoCache:    make(map[string]map[string]*snatglobalinfo.GlobalInfo),
1✔
416
                istioCache:             make(map[string]*istiov1.AciIstioOperator),
1✔
417
                crdHandlers:            make(map[string]func(*AciController, <-chan struct{})),
1✔
418
                ctrPortNameCache:       make(map[string]*ctrPortNameEntry),
1✔
419
                nmPortNp:               make(map[string]bool),
1✔
420
                hppRef:                 make(map[string]hppReference),
1✔
421
                additionalNetworkCache: make(map[string]*AdditionalNetworkMeta),
1✔
422
                sharedEncapCache:       make(map[int]*sharedEncapData),
1✔
423
                sharedEncapNfcCache:    make(map[int]*NfcData),
1✔
424
                sharedEncapNfcVlanMap:  make(map[int]*NfcData),
1✔
425
                sharedEncapNfcLabelMap: make(map[string]*NfcData),
1✔
426
                sharedEncapLabelMap:    make(map[string][]int),
1✔
427
                lldpIfCache:            make(map[string]string),
1✔
428
                fabricVlanPoolMap:      make(map[string]map[string]string),
1✔
429
        }
1✔
430
        cont.syncProcessors = map[string]func() bool{
1✔
431
                "snatGlobalInfo": cont.syncSnatGlobalInfo,
1✔
432
                "rdConfig":       cont.syncRdConfig,
1✔
433
                /* Commenting code to remove dependency from istio.io/istio package.
1✔
434
                   Vulnerabilties were detected by quay.io security scan of aci-containers-controller
1✔
435
                   and aci-containers-operator images for istio.io/istio package
1✔
436

1✔
437
                "istioCR":        cont.createIstioCR,
1✔
438
                */
1✔
439
        }
1✔
440
        return cont
1✔
441
}
1✔
442

443
func (cont *AciController) Init() {
×
444
        if cont.config.ChainedMode {
×
445
                cont.log.Info("In chained mode")
×
446
        }
×
447
        if cont.config.LBType != lbTypeAci && !cont.config.ChainedMode {
×
448
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedNwPol")
×
449
                if err != nil {
×
450
                        panic(err.Error())
×
451
                }
452
        }
453

454
        egdata, err := json.Marshal(cont.config.DefaultEg)
×
455
        if err != nil {
×
456
                cont.log.Error("Could not serialize default endpoint group")
×
457
                panic(err.Error())
×
458
        }
459
        cont.defaultEg = string(egdata)
×
460

×
461
        sgdata, err := json.Marshal(cont.config.DefaultSg)
×
462
        if err != nil {
×
463
                cont.log.Error("Could not serialize default security groups")
×
464
                panic(err.Error())
×
465
        }
466
        cont.defaultSg = string(sgdata)
×
467

×
468
        cont.log.Debug("Initializing IPAM")
×
469
        cont.initIpam()
×
470
        // check if the cluster supports endpoint slices
×
471
        // if cluster doesn't have the support fallback to endpoints
×
472
        kubeClient := cont.env.(*K8sEnvironment).kubeClient
×
473
        if util.IsEndPointSlicesSupported(kubeClient) {
×
474
                cont.serviceEndPoints = &serviceEndpointSlice{}
×
475
                cont.serviceEndPoints.(*serviceEndpointSlice).cont = cont
×
476
                cont.log.Info("Initializing ServiceEndpointSlices")
×
477
        } else {
×
478
                cont.serviceEndPoints = &serviceEndpoint{}
×
479
                cont.serviceEndPoints.(*serviceEndpoint).cont = cont
×
480
                cont.log.Info("Initializing ServiceEndpoints")
×
481
        }
×
482

483
        err = cont.env.Init(cont)
×
484
        if err != nil {
×
485
                panic(err.Error())
×
486
        }
487
}
488

489
func (cont *AciController) processQueue(queue workqueue.RateLimitingInterface,
490
        store cache.Store, handler func(interface{}) bool,
491
        deleteHandler func(string) bool,
492
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
493
        go wait.Until(func() {
2✔
494
                for {
2✔
495
                        key, quit := queue.Get()
1✔
496
                        if quit {
2✔
497
                                break
1✔
498
                        }
499

500
                        var requeue bool
1✔
501
                        switch key := key.(type) {
1✔
502
                        case chan struct{}:
×
503
                                close(key)
×
504
                        case string:
1✔
505
                                if strings.HasPrefix(key, "DELETED_") {
2✔
506
                                        delKey := strings.Trim(key, "DELETED_")
1✔
507
                                        requeue = deleteHandler(delKey)
1✔
508
                                } else {
2✔
509
                                        obj, exists, err := store.GetByKey(key)
1✔
510
                                        if err != nil {
1✔
511
                                                cont.log.Debugf("Error fetching object with key %s from store: %v", key, err)
×
512
                                        }
×
513
                                        //Handle Add/Update/Delete
514
                                        if exists && handler != nil {
2✔
515
                                                requeue = handler(obj)
1✔
516
                                        }
1✔
517
                                        //Handle Post Delete
518
                                        if !exists && postDelHandler != nil {
1✔
519
                                                requeue = postDelHandler()
×
520
                                        }
×
521
                                }
522
                        }
523
                        if requeue {
2✔
524
                                queue.AddRateLimited(key)
1✔
525
                        } else {
2✔
526
                                queue.Forget(key)
1✔
527
                        }
1✔
528
                        queue.Done(key)
1✔
529
                }
530
        }, time.Second, stopCh)
531
        <-stopCh
1✔
532
        queue.ShutDown()
1✔
533
}
534

535
func (cont *AciController) globalStaticObjs() apicapi.ApicSlice {
1✔
536
        return apicapi.ApicSlice{}
1✔
537
}
1✔
538

539
func (cont *AciController) aciNameForKey(ktype, key string) string {
1✔
540
        return util.AciNameForKey(cont.config.AciPrefix, ktype, key)
1✔
541
}
1✔
542

543
func (cont *AciController) initStaticObjs() {
1✔
544
        cont.env.InitStaticAciObjects()
1✔
545
        cont.apicConn.WriteStaticApicObjects(cont.config.AciPrefix+"_static",
1✔
546
                cont.globalStaticObjs())
1✔
547
}
1✔
548

549
func (cont *AciController) vmmDomainProvider() (vmmProv string) {
1✔
550
        vmmProv = "Kubernetes"
1✔
551
        if strings.ToLower(cont.config.AciVmmDomainType) == "openshift" {
1✔
552
                vmmProv = "OpenShift"
×
553
        }
×
554
        return
1✔
555
}
556

557
func (cont *AciController) Run(stopCh <-chan struct{}) {
1✔
558
        var err error
1✔
559
        var privKey []byte
1✔
560
        var apicCert []byte
1✔
561

1✔
562
        cont.config.AciVrfDn = "uni/tn-" + cont.config.AciVrfTenant + "/ctx-" + cont.config.AciVrf
1✔
563

1✔
564
        if cont.config.ApicPrivateKeyPath != "" {
1✔
565
                privKey, err = os.ReadFile(cont.config.ApicPrivateKeyPath)
×
566
                if err != nil {
×
567
                        panic(err)
×
568
                }
569
        }
570
        if cont.config.ApicCertPath != "" {
1✔
571
                apicCert, err = os.ReadFile(cont.config.ApicCertPath)
×
572
                if err != nil {
×
573
                        panic(err)
×
574
                }
575
        }
576
        // If not defined, default is 1800
577
        if cont.config.ApicRefreshTimer == "" {
2✔
578
                cont.config.ApicRefreshTimer = "1800"
1✔
579
        }
1✔
580
        refreshTimeout, err := strconv.Atoi(cont.config.ApicRefreshTimer)
1✔
581
        if err != nil {
1✔
582
                panic(err)
×
583
        }
584
        cont.log.Info("ApicRefreshTimer conf is set to: ", refreshTimeout)
1✔
585

1✔
586
        // Bailout if the refreshTimeout is more than 12Hours or less than 5Mins
1✔
587
        if refreshTimeout > (12*60*60) || refreshTimeout < (5*60) {
1✔
588
                cont.log.Info("ApicRefreshTimer can't be more than 12Hrs or less than 5Mins")
×
589
                panic(err)
×
590
        }
591

592
        // If RefreshTickerAdjustInterval is not defined, default to 150Sec.
593
        if cont.config.ApicRefreshTickerAdjust == "" {
2✔
594
                cont.config.ApicRefreshTickerAdjust = "150"
1✔
595
        }
1✔
596
        refreshTickerAdjust, err := strconv.Atoi(cont.config.ApicRefreshTickerAdjust)
1✔
597
        if err != nil {
1✔
598
                panic(err)
×
599
        }
600

601
        //If ApicSubscriptionDelay is not defined, default to 100ms
602
        if cont.config.ApicSubscriptionDelay == 0 {
2✔
603
                cont.config.ApicSubscriptionDelay = 100
1✔
604
        }
1✔
605
        cont.log.Info("ApicSubscriptionDelay conf is set to: ", cont.config.ApicSubscriptionDelay)
1✔
606

1✔
607
        // If OpflexDeviceDeleteTimeout is not defined, default to 1800s
1✔
608
        if cont.config.OpflexDeviceDeleteTimeout == 0 {
2✔
609
                cont.config.OpflexDeviceDeleteTimeout = 1800
1✔
610
        }
1✔
611

612
        // If OpflexDeviceReconnectWaitTimeout is not defined, default to 25s
613
        if cont.config.OpflexDeviceReconnectWaitTimeout == 0 {
2✔
614
                cont.config.OpflexDeviceReconnectWaitTimeout = 25
1✔
615
        }
1✔
616
        cont.log.Debug("OpflexDeviceReconnectWaitTimeout set to: ", cont.config.OpflexDeviceReconnectWaitTimeout)
1✔
617

1✔
618
        // If SleepTimeSnatGlobalInfoSync is not defined, default to 60
1✔
619
        if cont.config.SleepTimeSnatGlobalInfoSync == 0 {
2✔
620
                cont.config.SleepTimeSnatGlobalInfoSync = 60
1✔
621
        }
1✔
622

623
        // If not defined, default to 32
624
        if cont.config.PodIpPoolChunkSize == 0 {
2✔
625
                cont.config.PodIpPoolChunkSize = 32
1✔
626
        }
1✔
627
        if !cont.config.ChainedMode {
2✔
628
                cont.log.Info("PodIpPoolChunkSize conf is set to: ", cont.config.PodIpPoolChunkSize)
1✔
629
        }
1✔
630

631
        // If ApicConnectionRetryLimit is not defined, default to 5
632
        if cont.config.ApicConnectionRetryLimit == 0 {
2✔
633
                cont.config.ApicConnectionRetryLimit = 5
1✔
634
        }
1✔
635
        cont.log.Debug("ApicConnectionRetryLimit set to: ", cont.config.ApicConnectionRetryLimit)
1✔
636

1✔
637
        // If not valid, default to 5000-65000
1✔
638
        // other permissible values 1-65000
1✔
639
        defStart := 5000
1✔
640
        defEnd := 65000
1✔
641
        if cont.config.SnatDefaultPortRangeStart == 0 {
2✔
642
                cont.config.SnatDefaultPortRangeStart = defStart
1✔
643
        }
1✔
644
        if cont.config.SnatDefaultPortRangeEnd == 0 {
2✔
645
                cont.config.SnatDefaultPortRangeEnd = defEnd
1✔
646
        }
1✔
647
        if cont.config.SnatDefaultPortRangeStart < 0 || cont.config.SnatDefaultPortRangeEnd < 0 ||
1✔
648
                cont.config.SnatDefaultPortRangeStart > defEnd || cont.config.SnatDefaultPortRangeEnd > defEnd ||
1✔
649
                cont.config.SnatDefaultPortRangeStart > cont.config.SnatDefaultPortRangeEnd {
1✔
650
                cont.config.SnatDefaultPortRangeStart = defStart
×
651
                cont.config.SnatDefaultPortRangeEnd = defEnd
×
652
        }
×
653

654
        // Set default value for pbr programming delay if services list is not empty
655
        // and delay value is empty
656
        if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 &&
1✔
657
                cont.config.ServiceGraphEndpointAddDelay.Services != nil &&
1✔
658
                len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 {
1✔
659
                cont.config.ServiceGraphEndpointAddDelay.Delay = 90
×
660
        }
×
661
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
662
                cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay)
×
663
        }
×
664

665
        // Set contract scope for snat svc graph to global by default
666
        if cont.config.SnatSvcContractScope == "" {
2✔
667
                cont.config.SnatSvcContractScope = "global"
1✔
668
        }
1✔
669
        if cont.config.MaxSvcGraphNodes == 0 {
2✔
670
                cont.config.MaxSvcGraphNodes = 32
1✔
671
        }
1✔
672
        if !cont.config.ChainedMode {
2✔
673
                cont.log.Info("Max number of nodes per svc graph is set to: ", cont.config.MaxSvcGraphNodes)
1✔
674
        }
1✔
675
        cont.apicConn, err = apicapi.New(cont.log, cont.config.ApicHosts,
1✔
676
                cont.config.ApicUsername, cont.config.ApicPassword,
1✔
677
                privKey, apicCert, cont.config.AciPrefix,
1✔
678
                refreshTimeout, refreshTickerAdjust, cont.config.ApicSubscriptionDelay,
1✔
679
                cont.config.AciVrfTenant)
1✔
680
        if err != nil {
1✔
681
                panic(err)
×
682
        }
683

684
        cont.apicConn.ReconnectRetryLimit = cont.config.ApicConnectionRetryLimit
1✔
685

1✔
686
        if len(cont.config.ApicHosts) != 0 {
1✔
687
        APIC_SWITCH:
×
688
                cont.log.WithFields(logrus.Fields{
×
689
                        "mod":  "APICAPI",
×
690
                        "host": cont.apicConn.Apic[cont.apicConn.ApicIndex],
×
691
                }).Debug("Connecting to APIC to determine the Version")
×
692

×
693
                version, err := cont.apicConn.GetVersion()
×
694
                if err != nil {
×
695
                        cont.log.Error("Could not get APIC version, switching to next APIC")
×
696
                        cont.apicConn.ApicIndex = (cont.apicConn.ApicIndex + 1) % len(cont.apicConn.Apic)
×
697
                        time.Sleep(cont.apicConn.ReconnectInterval)
×
698
                        goto APIC_SWITCH
×
699
                }
700
                cont.apicConn.CachedVersion = version
×
701
                apicapi.ApicVersion = version
×
702
                if version >= "4.2(4i)" {
×
703
                        cont.apicConn.SnatPbrFltrChain = true
×
704
                } else {
×
705
                        cont.apicConn.SnatPbrFltrChain = false
×
706
                }
×
707
                if version >= "5.2" {
×
708
                        cont.vmmClusterFaultSupported = true
×
709
                }
×
710
        } else { // For unit-tests
1✔
711
                cont.apicConn.SnatPbrFltrChain = true
1✔
712
        }
1✔
713

714
        if !cont.config.ChainedMode {
2✔
715
                cont.log.Debug("SnatPbrFltrChain set to:", cont.apicConn.SnatPbrFltrChain)
1✔
716
                // Make sure Pod/NodeBDs are assoicated to same VRF.
1✔
717
                if len(cont.config.ApicHosts) != 0 && cont.config.AciPodBdDn != "" && cont.config.AciNodeBdDn != "" {
1✔
718
                        var expectedVrfRelations []string
×
719
                        expectedVrfRelations = append(expectedVrfRelations, cont.config.AciPodBdDn, cont.config.AciNodeBdDn)
×
720
                        cont.log.Debug("expectedVrfRelations:", expectedVrfRelations)
×
721
                        err = cont.apicConn.ValidateAciVrfAssociation(cont.config.AciVrfDn, expectedVrfRelations)
×
722
                        if err != nil {
×
723
                                cont.log.Error("Pod/NodeBDs and AciL3Out VRF association is incorrect")
×
724
                                panic(err)
×
725
                        }
726
                }
727
        }
728

729
        if len(cont.config.ApicHosts) != 0 && cont.vmmClusterFaultSupported && !cont.config.ChainedMode {
1✔
730
                //Clear fault instances when the controller starts
×
731
                cont.clearFaultInstances()
×
732
                //Subscribe for vmmEpPD for a given domain
×
733
                var tnTargetFilterEpg string
×
734
                tnTargetFilterEpg += fmt.Sprintf("uni/vmmp-%s/dom-%s/", cont.vmmDomainProvider(), cont.config.AciVmmDomain)
×
735
                subnetTargetFilterEpg := fmt.Sprintf("and(wcard(vmmEpPD.dn,\"%s\"))", tnTargetFilterEpg)
×
736
                cont.apicConn.AddSubscriptionClass("vmmEpPD",
×
737
                        []string{"vmmEpPD"}, subnetTargetFilterEpg)
×
738
                cont.apicConn.SetSubscriptionHooks("vmmEpPD",
×
739
                        func(obj apicapi.ApicObject) bool {
×
740
                                cont.vmmEpPDChanged(obj)
×
741
                                return true
×
742
                        },
×
743
                        func(dn string) {
×
744
                                cont.vmmEpPDDeleted(dn)
×
745
                        })
×
746
        }
747

748
        cont.initStaticObjs()
1✔
749

1✔
750
        err = cont.env.PrepareRun(stopCh)
1✔
751
        if err != nil {
1✔
752
                panic(err.Error())
×
753
        }
754

755
        cont.apicConn.FullSyncHook = func() {
1✔
756
                // put a channel into each work queue and wait on it to
×
757
                // checkpoint object syncing in response to new subscription
×
758
                // updates
×
759
                cont.log.Debug("Starting checkpoint")
×
760
                var chans []chan struct{}
×
761
                qs := make([]workqueue.RateLimitingInterface, 0)
×
762
                _, ok := cont.env.(*K8sEnvironment)
×
763
                if ok {
×
764
                        qs = []workqueue.RateLimitingInterface{cont.podQueue}
×
765
                        if !cont.config.ChainedMode {
×
766
                                if !cont.config.DisableHppRendering {
×
767
                                        qs = append(qs, cont.netPolQueue)
×
768
                                }
×
769
                                qs = append(qs, cont.qosQueue, cont.serviceQueue,
×
770
                                        cont.snatQueue, cont.netflowQueue, cont.snatNodeInfoQueue,
×
771
                                        cont.rdConfigQueue, cont.erspanQueue)
×
772
                        }
773
                }
774
                for _, q := range qs {
×
775
                        c := make(chan struct{})
×
776
                        chans = append(chans, c)
×
777
                        q.Add(c)
×
778
                }
×
779
                for _, c := range chans {
×
780
                        <-c
×
781
                }
×
782
                cont.log.Debug("Checkpoint complete")
×
783
        }
784

785
        if len(cont.config.ApicHosts) != 0 && !cont.config.ChainedMode {
1✔
786
                cont.BuildSubnetDnCache(cont.config.AciVrfDn, cont.config.AciVrfDn)
×
787
                cont.scheduleRdConfig()
×
788
        }
×
789

790
        if !cont.config.ChainedMode {
2✔
791
                if cont.config.AciPolicyTenant != cont.config.AciVrfTenant {
2✔
792
                        cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
793
                                []string{"hostprotPol"})
1✔
794
                }
1✔
795
        } else {
1✔
796
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
797
                        []string{"fvBD", "fvAp"})
1✔
798
                cont.apicConn.AddSubscriptionClass("fvnsVlanInstP",
1✔
799
                        []string{"fvnsVlanInstP"}, "")
1✔
800
                cont.apicConn.AddSubscriptionClass("infraRsDomP",
1✔
801
                        []string{"infraRsDomP"}, "")
1✔
802
                cont.apicConn.AddSubscriptionClass("physDomP",
1✔
803
                        []string{"physDomP"}, "")
1✔
804
                cont.apicConn.AddSubscriptionClass("infraRsVlanNs",
1✔
805
                        []string{"infraRsVlanNs"}, "")
1✔
806
                cont.apicConn.AddSubscriptionClass("infraGeneric",
1✔
807
                        []string{"infraGeneric", "infraRsFuncToEpg"}, "")
1✔
808
        }
1✔
809
        if !cont.config.ChainedMode {
2✔
810
                // When a new class is added for subscriptio, check if its name attribute
1✔
811
                // is in the format aciPrefix-<some value>, if so add it in nameAttrClass
1✔
812
                // in apicapi.go
1✔
813
                subscribeMo := []string{"fvBD", "vnsLDevVip", "vnsAbsGraph", "vnsLDevCtx",
1✔
814
                        "vzFilter", "vzBrCP", "l3extInstP", "vnsSvcRedirectPol",
1✔
815
                        "vnsRedirectHealthGroup", "fvIPSLAMonitoringPol"}
1✔
816
                if cont.config.AciPolicyTenant == cont.config.AciVrfTenant {
1✔
817
                        subscribeMo = append(subscribeMo, "hostprotPol")
×
818
                }
×
819
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciVrfTenant,
1✔
820
                        subscribeMo)
1✔
821
                cont.apicConn.AddSubscriptionDn(fmt.Sprintf("uni/tn-%s/out-%s",
1✔
822
                        cont.config.AciVrfTenant, cont.config.AciL3Out),
1✔
823
                        []string{"fvRsCons"})
1✔
824
                vmmDn := fmt.Sprintf("comp/prov-%s/ctrlr-[%s]-%s/injcont",
1✔
825
                        cont.env.VmmPolicy(), cont.config.AciVmmDomain,
1✔
826
                        cont.config.AciVmmController)
1✔
827
                // Before subscribing to vmm objects, add vmmInjectedLabel as a child after explicit APIC version check
1✔
828
                // Since it is not supported for APIC versions < "5.0"
1✔
829
                cont.addVmmInjectedLabel()
1✔
830
                cont.apicConn.AddSubscriptionDn(vmmDn,
1✔
831
                        []string{"vmmInjectedHost", "vmmInjectedNs"})
1✔
832

1✔
833
                var tnTargetFilter string
1✔
834
                if len(cont.config.AciVrfRelatedTenants) > 0 {
1✔
835
                        for _, tn := range cont.config.AciVrfRelatedTenants {
×
836
                                tnTargetFilter += fmt.Sprintf("tn-%s/|", tn)
×
837
                        }
×
838
                } else {
1✔
839
                        tnTargetFilter += fmt.Sprintf("tn-%s/|tn-%s/",
1✔
840
                                cont.config.AciPolicyTenant, cont.config.AciVrfTenant)
1✔
841
                }
1✔
842
                subnetTargetFilter := fmt.Sprintf("and(wcard(fvSubnet.dn,\"%s\"))",
1✔
843
                        tnTargetFilter)
1✔
844
                cont.apicConn.AddSubscriptionClass("fvSubnet",
1✔
845
                        []string{"fvSubnet"}, subnetTargetFilter)
1✔
846

1✔
847
                cont.apicConn.SetSubscriptionHooks("fvSubnet",
1✔
848
                        func(obj apicapi.ApicObject) bool {
1✔
849
                                cont.SubnetChanged(obj, cont.config.AciVrfDn)
×
850
                                return true
×
851
                        },
×
852
                        func(dn string) {
×
853
                                cont.SubnetDeleted(dn)
×
854
                        })
×
855

856
                cont.apicConn.AddSubscriptionClass("opflexODev",
1✔
857
                        []string{"opflexODev"}, "")
1✔
858

1✔
859
                cont.apicConn.SetSubscriptionHooks("opflexODev",
1✔
860
                        func(obj apicapi.ApicObject) bool {
1✔
861
                                cont.opflexDeviceChanged(obj)
×
862
                                return true
×
863
                        },
×
864
                        func(dn string) {
×
865
                                cont.opflexDeviceDeleted(dn)
×
866
                        })
×
867
        }
868
        go cont.apicConn.Run(stopCh)
1✔
869
}
870

871
func (cont *AciController) syncNodeAciPods(stopCh <-chan struct{}, seconds time.Duration) {
1✔
872
        cont.log.Debug("Go routine to periodically check the aci pod information in the opflexOdev of a node")
1✔
873
        ticker := time.NewTicker(seconds * time.Second)
1✔
874
        defer ticker.Stop()
1✔
875

1✔
876
        for {
2✔
877
                select {
1✔
878
                case <-ticker.C:
1✔
879
                        if cont.config.EnableOpflexAgentReconnect {
1✔
880
                                cont.checkChangeOfOpflexOdevAciPod()
×
881
                        }
×
882
                        if cont.config.AciMultipod {
1✔
883
                                cont.checkChangeOfOdevAciPod()
×
884
                        }
×
885
                case <-stopCh:
1✔
886
                        return
1✔
887
                }
888
        }
889
}
890

891
func (cont *AciController) syncOpflexDevices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
892
        cont.log.Debug("Go routine to periodically delete old opflexdevices started")
1✔
893
        ticker := time.NewTicker(seconds * time.Second)
1✔
894
        defer ticker.Stop()
1✔
895

1✔
896
        for {
2✔
897
                select {
1✔
898
                case <-ticker.C:
1✔
899
                        cont.deleteOldOpflexDevices()
1✔
900
                case <-stopCh:
1✔
901
                        return
1✔
902
                }
903
        }
904
}
905

906
func (cont *AciController) syncDelayedEpSlices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
907
        cont.log.Debug("Go routine to periodically check and process the epslices having delay adding in service")
1✔
908
        ticker := time.NewTicker(seconds * time.Second)
1✔
909
        defer ticker.Stop()
1✔
910

1✔
911
        for {
2✔
912
                select {
1✔
913
                case <-ticker.C:
1✔
914
                        cont.processDelayedEpSlices()
1✔
915
                case <-stopCh:
1✔
916
                        return
1✔
917
                }
918
        }
919
}
920

921
func (cont *AciController) snatGlobalInfoSync(stopCh <-chan struct{}, seconds int) {
1✔
922
        time.Sleep(time.Duration(seconds) * time.Second)
1✔
923
        cont.log.Debug("Go routine to periodically sync globalinfo and nodeinfo started")
1✔
924
        iteration := 0
1✔
925
        for {
2✔
926
                // To avoid noisy logs, only printing once in 5 minutes
1✔
927
                if iteration%5 == 0 {
2✔
928
                        cont.log.Debug("Syncing GlobalInfo with Node infos")
1✔
929
                }
1✔
930
                var nodeInfos []*nodeinfo.NodeInfo
1✔
931
                cont.indexMutex.Lock()
1✔
932
                cache.ListAll(cont.snatNodeInfoIndexer, labels.Everything(),
1✔
933
                        func(nodeInfoObj interface{}) {
2✔
934
                                nodeInfo := nodeInfoObj.(*nodeinfo.NodeInfo)
1✔
935
                                nodeInfos = append(nodeInfos, nodeInfo)
1✔
936
                        })
1✔
937
                expectedmap := make(map[string]map[string]bool)
1✔
938
                for _, glinfo := range cont.snatGlobalInfoCache {
2✔
939
                        for nodename, entry := range glinfo {
2✔
940
                                if _, found := expectedmap[nodename]; !found {
2✔
941
                                        newentry := make(map[string]bool)
1✔
942
                                        newentry[entry.SnatPolicyName] = true
1✔
943
                                        expectedmap[nodename] = newentry
1✔
944
                                } else {
2✔
945
                                        currententry := expectedmap[nodename]
1✔
946
                                        currententry[entry.SnatPolicyName] = true
1✔
947
                                        expectedmap[nodename] = currententry
1✔
948
                                }
1✔
949
                        }
950
                }
951
                cont.indexMutex.Unlock()
1✔
952

1✔
953
                for _, value := range nodeInfos {
2✔
954
                        marked := false
1✔
955
                        policyNames := value.Spec.SnatPolicyNames
1✔
956
                        nodeName := value.ObjectMeta.Name
1✔
957
                        _, ok := expectedmap[nodeName]
1✔
958
                        if !ok && len(policyNames) > 0 {
2✔
959
                                cont.log.Info("Adding missing entry in snatglobalinfo for node: ", nodeName)
1✔
960
                                cont.log.Debug("No snat policies found in snatglobalinfo")
1✔
961
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
962
                                marked = true
1✔
963
                        } else if len(policyNames) != len(expectedmap[nodeName]) {
3✔
964
                                cont.log.Info("Adding missing snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
965
                                cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
966
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
967
                                marked = true
1✔
968
                        } else {
2✔
969
                                if len(policyNames) == 0 && len(expectedmap[nodeName]) == 0 {
1✔
970
                                        // No snatpolicies present
×
971
                                        continue
×
972
                                }
973
                                eq := reflect.DeepEqual(expectedmap[nodeName], policyNames)
1✔
974
                                if !eq {
2✔
975
                                        cont.log.Debug("Syncing inconsistent snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
976
                                        cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
977
                                        cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
978
                                        marked = true
1✔
979
                                }
1✔
980
                        }
981
                        if marked {
2✔
982
                                cont.log.Info("Nodeinfo and globalinfo out of sync for node: ", nodeName)
1✔
983
                                nodeinfokey, err := cache.MetaNamespaceKeyFunc(value)
1✔
984
                                if err != nil {
1✔
985
                                        cont.log.Error("Not able to get key for node: ", nodeName)
×
986
                                        continue
×
987
                                }
988
                                cont.log.Info("Queuing nodeinfokey for globalinfo sync: ", nodeinfokey)
1✔
989
                                cont.queueNodeInfoUpdateByKey(nodeinfokey)
1✔
990
                        } else if iteration%5 == 0 {
2✔
991
                                cont.log.Info("Nodeinfo and globalinfo in sync for node: ", nodeName)
1✔
992
                        }
1✔
993
                }
994
                time.Sleep(time.Duration(seconds) * time.Second)
1✔
995
                iteration++
1✔
996
        }
997
}
998

999
func (cont *AciController) processSyncQueue(queue workqueue.RateLimitingInterface,
1000
        queueStop <-chan struct{}) {
1✔
1001
        go wait.Until(func() {
2✔
1002
                for {
2✔
1003
                        syncType, quit := queue.Get()
1✔
1004
                        if quit {
2✔
1005
                                break
1✔
1006
                        }
1007
                        var requeue bool
1✔
1008
                        if sType, ok := syncType.(string); ok {
2✔
1009
                                if f, ok := cont.syncProcessors[sType]; ok {
2✔
1010
                                        requeue = f()
1✔
1011
                                }
1✔
1012
                        }
1013
                        if requeue {
1✔
1014
                                queue.AddRateLimited(syncType)
×
1015
                        } else {
1✔
1016
                                queue.Forget(syncType)
1✔
1017
                        }
1✔
1018
                        queue.Done(syncType)
1✔
1019
                }
1020
        }, time.Second, queueStop)
1021
        <-queueStop
1✔
1022
        queue.ShutDown()
1✔
1023
}
1024

1025
func (cont *AciController) scheduleSyncGlobalInfo() {
1✔
1026
        cont.syncQueue.AddRateLimited("snatGlobalInfo")
1✔
1027
}
1✔
1028
func (cont *AciController) scheduleRdConfig() {
×
1029
        cont.syncQueue.AddRateLimited("rdConfig")
×
1030
}
×
1031
func (cont *AciController) scheduleCreateIstioCR() {
×
1032
        cont.syncQueue.AddRateLimited("istioCR")
×
1033
}
×
1034

1035
func (cont *AciController) addVmmInjectedLabel() {
1✔
1036
        if apicapi.ApicVersion >= "5.2" {
1✔
1037
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedLabel")
×
1038
                if err != nil {
×
1039
                        panic(err.Error())
×
1040
                }
1041
                err = apicapi.AddMetaDataChild("vmmInjectedSvc", "vmmInjectedLabel")
×
1042
                if err != nil {
×
1043
                        panic(err.Error())
×
1044
                }
1045
        }
1046
        if apicapi.ApicVersion >= "5.0" {
2✔
1047
                err := apicapi.AddMetaDataChild("vmmInjectedReplSet", "vmmInjectedLabel")
1✔
1048
                if err != nil {
1✔
1049
                        panic(err.Error())
×
1050
                }
1051
                err = apicapi.AddMetaDataChild("vmmInjectedContGrp", "vmmInjectedLabel")
1✔
1052
                if err != nil {
1✔
1053
                        panic(err.Error())
×
1054
                }
1055
                err = apicapi.AddMetaDataChild("vmmInjectedDepl", "vmmInjectedLabel")
1✔
1056
                if err != nil {
1✔
1057
                        panic(err.Error())
×
1058
                }
1059
        }
1060
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc