• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 8852

21 Mar 2024 03:12PM UTC coverage: 70.881% (-0.4%) from 71.311%
8852

Pull #1287

travis-pro

akhilamohanan
Do VLAN programming upfront for OpenShift on OpenStack

For OpenShift on OpenStack clusters, vnsRsCIfPathAtt is created for each
OpenStack compute hosts along with OpenShift nodes so that VLAN will
already be there when a vm is migrated from one compute host to other.

The change is done to avoid datapath issues after vm migration.
Pull Request #1287: Do VLAN programming upfront for OpenShift on OpenStack

27 of 120 new or added lines in 2 files covered. (22.5%)

175 existing lines in 4 files now uncovered.

10703 of 15100 relevant lines covered (70.88%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.09
/pkg/controller/controller.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "encoding/json"
19
        "fmt"
20
        "net"
21
        "os"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/sirupsen/logrus"
29
        "github.com/yl2chen/cidranger"
30
        "golang.org/x/time/rate"
31

32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        "k8s.io/apimachinery/pkg/labels"
35
        "k8s.io/apimachinery/pkg/util/wait"
36
        "k8s.io/client-go/kubernetes"
37
        "k8s.io/client-go/tools/cache"
38
        "k8s.io/client-go/util/workqueue"
39

40
        "github.com/noironetworks/aci-containers/pkg/apicapi"
41
        fabattv1 "github.com/noironetworks/aci-containers/pkg/fabricattachment/apis/aci.fabricattachment/v1"
42
        "github.com/noironetworks/aci-containers/pkg/index"
43
        "github.com/noironetworks/aci-containers/pkg/ipam"
44
        istiov1 "github.com/noironetworks/aci-containers/pkg/istiocrd/apis/aci.istio/v1"
45
        "github.com/noironetworks/aci-containers/pkg/metadata"
46
        nodeinfo "github.com/noironetworks/aci-containers/pkg/nodeinfo/apis/aci.snat/v1"
47
        rdConfig "github.com/noironetworks/aci-containers/pkg/rdconfig/apis/aci.snat/v1"
48
        snatglobalinfo "github.com/noironetworks/aci-containers/pkg/snatglobalinfo/apis/aci.snat/v1"
49
        "github.com/noironetworks/aci-containers/pkg/util"
50
)
51

52
type podUpdateFunc func(*v1.Pod) (*v1.Pod, error)
53
type nodeUpdateFunc func(*v1.Node) (*v1.Node, error)
54
type serviceUpdateFunc func(*v1.Service) (*v1.Service, error)
55

56
type AciController struct {
57
        log    *logrus.Logger
58
        config *ControllerConfig
59
        env    Environment
60

61
        defaultEg string
62
        defaultSg string
63

64
        unitTestMode bool
65

66
        podQueue            workqueue.RateLimitingInterface
67
        netPolQueue         workqueue.RateLimitingInterface
68
        qosQueue            workqueue.RateLimitingInterface
69
        serviceQueue        workqueue.RateLimitingInterface
70
        snatQueue           workqueue.RateLimitingInterface
71
        netflowQueue        workqueue.RateLimitingInterface
72
        erspanQueue         workqueue.RateLimitingInterface
73
        snatNodeInfoQueue   workqueue.RateLimitingInterface
74
        rdConfigQueue       workqueue.RateLimitingInterface
75
        istioQueue          workqueue.RateLimitingInterface
76
        nodeFabNetAttQueue  workqueue.RateLimitingInterface
77
        netFabConfigQueue   workqueue.RateLimitingInterface
78
        nadVlanMapQueue     workqueue.RateLimitingInterface
79
        fabricVlanPoolQueue workqueue.RateLimitingInterface
80

81
        namespaceIndexer       cache.Indexer
82
        namespaceInformer      cache.Controller
83
        podIndexer             cache.Indexer
84
        podInformer            cache.Controller
85
        endpointsIndexer       cache.Indexer
86
        endpointsInformer      cache.Controller
87
        serviceIndexer         cache.Indexer
88
        serviceInformer        cache.Controller
89
        replicaSetIndexer      cache.Indexer
90
        replicaSetInformer     cache.Controller
91
        deploymentIndexer      cache.Indexer
92
        deploymentInformer     cache.Controller
93
        nodeIndexer            cache.Indexer
94
        nodeInformer           cache.Controller
95
        networkPolicyIndexer   cache.Indexer
96
        networkPolicyInformer  cache.Controller
97
        snatIndexer            cache.Indexer
98
        snatInformer           cache.Controller
99
        snatNodeInfoIndexer    cache.Indexer
100
        snatNodeInformer       cache.Controller
101
        crdInformer            cache.Controller
102
        rdConfigInformer       cache.Controller
103
        rdConfigIndexer        cache.Indexer
104
        qosIndexer             cache.Indexer
105
        qosInformer            cache.Controller
106
        netflowIndexer         cache.Indexer
107
        netflowInformer        cache.Controller
108
        erspanIndexer          cache.Indexer
109
        erspanInformer         cache.Controller
110
        nodePodIfIndexer       cache.Indexer
111
        nodePodIfInformer      cache.Controller
112
        istioIndexer           cache.Indexer
113
        istioInformer          cache.Controller
114
        endpointSliceIndexer   cache.Indexer
115
        endpointSliceInformer  cache.Controller
116
        snatCfgInformer        cache.Controller
117
        updatePod              podUpdateFunc
118
        updateNode             nodeUpdateFunc
119
        updateServiceStatus    serviceUpdateFunc
120
        nodeFabNetAttInformer  cache.SharedIndexInformer
121
        netFabConfigInformer   cache.SharedIndexInformer
122
        nadVlanMapInformer     cache.SharedIndexInformer
123
        fabricVlanPoolInformer cache.SharedIndexInformer
124

125
        indexMutex sync.Mutex
126

127
        configuredPodNetworkIps *netIps
128
        podNetworkIps           *netIps
129
        serviceIps              *ipam.IpCache
130
        staticServiceIps        *netIps
131
        nodeServiceIps          *netIps
132

133
        // index of pods matched by deployments
134
        depPods *index.PodSelectorIndex
135
        // index of pods matched by network policies
136
        netPolPods *index.PodSelectorIndex
137
        // index of pods matched by network policy ingress rules
138
        netPolIngressPods *index.PodSelectorIndex
139
        // index of pods matched by network policy egress rules
140
        netPolEgressPods *index.PodSelectorIndex
141
        // index of IP addresses contained in endpoints objects
142
        endpointsIpIndex cidranger.Ranger
143
        // index of service target ports
144
        targetPortIndex map[string]*portIndexEntry
145
        // index of ip blocks referenced by network policy egress rules
146
        netPolSubnetIndex cidranger.Ranger
147
        // index of pods matched by erspan policies
148
        erspanPolPods *index.PodSelectorIndex
149

150
        apicConn *apicapi.ApicConnection
151

152
        nodeServiceMetaCache map[string]*nodeServiceMeta
153
        nodeACIPod           map[string]aciPodAnnot
154
        nodeACIPodAnnot      map[string]aciPodAnnot
155
        nodeOpflexDevice     map[string]apicapi.ApicSlice
156
        nodePodNetCache      map[string]*nodePodNetMeta
157
        serviceMetaCache     map[string]*serviceMeta
158
        snatPolicyCache      map[string]*ContSnatPolicy
159
        delayedEpSlices      []*DelayedEpSlice
160
        snatServices         map[string]bool
161
        snatNodeInfoCache    map[string]*nodeinfo.NodeInfo
162
        rdConfigCache        map[string]*rdConfig.RdConfig
163
        rdConfigSubnetCache  map[string]*rdConfig.RdConfigSpec
164
        istioCache           map[string]*istiov1.AciIstioOperator
165
        podIftoEp            map[string]*EndPointData
166
        // Node Name and Policy Name
167
        snatGlobalInfoCache map[string]map[string]*snatglobalinfo.GlobalInfo
168
        nodeSyncEnabled     bool
169
        serviceSyncEnabled  bool
170
        snatSyncEnabled     bool
171
        tunnelGetter        *tunnelState
172
        syncQueue           workqueue.RateLimitingInterface
173
        syncProcessors      map[string]func() bool
174
        serviceEndPoints    ServiceEndPointType
175
        crdHandlers         map[string]func(*AciController, <-chan struct{})
176
        stopCh              <-chan struct{}
177
        //index of containerportname to ctrPortNameEntry
178
        ctrPortNameCache map[string]*ctrPortNameEntry
179
        // named networkPolicies
180
        nmPortNp map[string]bool
181
        //maps network policy hash to hpp
182
        hppRef map[string]hppReference
183
        // cache to look for Epg DNs which are bound to Vmm domain
184
        cachedEpgDns             []string
185
        vmmClusterFaultSupported bool
186
        additionalNetworkCache   map[string]*AdditionalNetworkMeta
187
        //Used in Shared mode
188
        sharedEncapCache map[int]*sharedEncapData
189
        // vlan to propertiesList
190
        sharedEncapNfcCache    map[int]*NfcData
191
        sharedEncapNfcVlanMap  map[int]*NfcData
192
        sharedEncapNfcLabelMap map[string]*NfcData
193
        // nadVlanMap encapLabel to vlan
194
        sharedEncapLabelMap      map[string][]int
195
        lldpIfCache              map[string]string
196
        globalVlanConfig         globalVlanConfig
197
        fabricVlanPoolMap        map[string]map[string]string
198
        openStackFabricPathDnMap map[string]openstackOpflexOdevInfo
199
        openStackSystemId        string
200
}
201

202
type NfcData struct {
203
        Aeps map[string]bool
204
        Epg  fabattv1.Epg
205
}
206

207
type sharedEncapData struct {
208
        //node to NAD to pods
209
        Pods   map[string]map[string][]string
210
        NetRef map[string]*AdditionalNetworkMeta
211
}
212

213
type globalVlanConfig struct {
214
        SharedPhysDom apicapi.ApicObject
215
}
216

217
type hppReference struct {
218
        RefCount uint              `json:"ref-count,omitempty"`
219
        Npkeys   []string          `json:"npkeys,omitempty"`
220
        HppObj   apicapi.ApicSlice `json:"hpp-obj,omitempty"`
221
}
222

223
type DelayedEpSlice struct {
224
        ServiceKey  string
225
        OldEpSlice  *discovery.EndpointSlice
226
        NewEpSlice  *discovery.EndpointSlice
227
        DelayedTime time.Time
228
}
229

230
type aciPodAnnot struct {
231
        aciPod             string
232
        isSingleOpflexOdev bool
233
        disconnectTime     time.Time
234
        connectTime        time.Time
235
}
236

237
type nodeServiceMeta struct {
238
        serviceEp metadata.ServiceEndpoint
239
}
240

241
type nodePodNetMeta struct {
242
        nodePods            map[string]bool
243
        podNetIps           metadata.NetIps
244
        podNetIpsAnnotation string
245
}
246

247
type openstackOpflexOdevInfo struct {
248
        opflexODevDn map[string]struct{}
249
        fabricPathDn string
250
}
251

252
type serviceMeta struct {
253
        requestedIps     []net.IP
254
        ingressIps       []net.IP
255
        staticIngressIps []net.IP
256
}
257

258
type ipIndexEntry struct {
259
        ipNet net.IPNet
260
        keys  map[string]bool
261
}
262

263
type targetPort struct {
264
        proto v1.Protocol
265
        ports []int
266
}
267

268
type portIndexEntry struct {
269
        port              targetPort
270
        serviceKeys       map[string]bool
271
        networkPolicyKeys map[string]bool
272
}
273

274
type portRangeSnat struct {
275
        start int
276
        end   int
277
}
278

279
// EndPointData holds PodIF data in controller.
280
type EndPointData struct {
281
        MacAddr    string
282
        EPG        string
283
        Namespace  string
284
        AppProfile string
285
}
286

287
type ctrPortNameEntry struct {
288
        // Proto+port->pods
289
        ctrNmpToPods map[string]map[string]bool
290
}
291

292
type LinkData struct {
293
        Link []string
294
        Pods []string
295
}
296

297
type AdditionalNetworkMeta struct {
298
        NetworkName string
299
        EncapVlan   string
300
        //node+localiface->fabricLinks
301
        FabricLink map[string]map[string]LinkData
302
        NodeCache  map[string]*fabattv1.NodeFabricNetworkAttachment
303
        Mode       util.EncapMode
304
}
305

306
type ServiceEndPointType interface {
307
        InitClientInformer(kubeClient *kubernetes.Clientset)
308
        Run(stopCh <-chan struct{})
309
        Wait(stopCh <-chan struct{})
310
        UpdateServicesForNode(nodename string)
311
        GetnodesMetadata(key string, service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint)
312
        SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool
313
        SetNpServiceAugmentForService(servicekey string, service *v1.Service, prs *portRemoteSubnet,
314
                portAugments map[string]*portServiceAugment, subnetIndex cidranger.Ranger, logger *logrus.Entry)
315
}
316

317
type serviceEndpoint struct {
318
        cont *AciController
319
}
320
type serviceEndpointSlice struct {
321
        cont *AciController
322
}
323

324
func (sep *serviceEndpoint) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
325
        sep.cont.initEndpointsInformerFromClient(kubeClient)
×
326
}
×
327

328
func (seps *serviceEndpointSlice) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
329
        seps.cont.initEndpointSliceInformerFromClient(kubeClient)
×
330
}
×
331

332
func (sep *serviceEndpoint) Run(stopCh <-chan struct{}) {
1✔
333
        go sep.cont.endpointsInformer.Run(stopCh)
1✔
334
}
1✔
335

336
func (seps *serviceEndpointSlice) Run(stopCh <-chan struct{}) {
1✔
337
        go seps.cont.endpointSliceInformer.Run(stopCh)
1✔
338
}
1✔
339

340
func (sep *serviceEndpoint) Wait(stopCh <-chan struct{}) {
1✔
341
        cache.WaitForCacheSync(stopCh,
1✔
342
                sep.cont.endpointsInformer.HasSynced,
1✔
343
                sep.cont.serviceInformer.HasSynced)
1✔
344
}
1✔
345

346
func (seps *serviceEndpointSlice) Wait(stopCh <-chan struct{}) {
1✔
347
        seps.cont.log.Debug("Waiting for EndPointSlicecache sync")
1✔
348
        cache.WaitForCacheSync(stopCh,
1✔
349
                seps.cont.endpointSliceInformer.HasSynced,
1✔
350
                seps.cont.serviceInformer.HasSynced)
1✔
351
}
1✔
352

353
func (e *ipIndexEntry) Network() net.IPNet {
1✔
354
        return e.ipNet
1✔
355
}
1✔
356

357
func newNodePodNetMeta() *nodePodNetMeta {
1✔
358
        return &nodePodNetMeta{
1✔
359
                nodePods: make(map[string]bool),
1✔
360
        }
1✔
361
}
1✔
362

363
func createQueue(name string) workqueue.RateLimitingInterface {
1✔
364
        return workqueue.NewNamedRateLimitingQueue(
1✔
365
                workqueue.NewMaxOfRateLimiter(
1✔
366
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
367
                                10*time.Second),
1✔
368
                        &workqueue.BucketRateLimiter{
1✔
369
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
370
                        },
1✔
371
                ),
1✔
372
                "delta")
1✔
373
}
1✔
374

375
func NewController(config *ControllerConfig, env Environment, log *logrus.Logger, unittestmode bool) *AciController {
1✔
376
        cont := &AciController{
1✔
377
                log:          log,
1✔
378
                config:       config,
1✔
379
                env:          env,
1✔
380
                defaultEg:    "",
1✔
381
                defaultSg:    "",
1✔
382
                unitTestMode: unittestmode,
1✔
383

1✔
384
                podQueue:            createQueue("pod"),
1✔
385
                netPolQueue:         createQueue("networkPolicy"),
1✔
386
                qosQueue:            createQueue("qos"),
1✔
387
                netflowQueue:        createQueue("netflow"),
1✔
388
                erspanQueue:         createQueue("erspan"),
1✔
389
                serviceQueue:        createQueue("service"),
1✔
390
                snatQueue:           createQueue("snat"),
1✔
391
                snatNodeInfoQueue:   createQueue("snatnodeinfo"),
1✔
392
                rdConfigQueue:       createQueue("rdconfig"),
1✔
393
                istioQueue:          createQueue("istio"),
1✔
394
                nodeFabNetAttQueue:  createQueue("nodefabricnetworkattachment"),
1✔
395
                netFabConfigQueue:   createQueue("networkfabricconfiguration"),
1✔
396
                nadVlanMapQueue:     createQueue("nadvlanmap"),
1✔
397
                fabricVlanPoolQueue: createQueue("fabricvlanpool"),
1✔
398
                syncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
399
                        &workqueue.BucketRateLimiter{
1✔
400
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
401
                        }, "sync"),
1✔
402

1✔
403
                configuredPodNetworkIps: newNetIps(),
1✔
404
                podNetworkIps:           newNetIps(),
1✔
405
                serviceIps:              ipam.NewIpCache(),
1✔
406
                staticServiceIps:        newNetIps(),
1✔
407
                nodeServiceIps:          newNetIps(),
1✔
408

1✔
409
                nodeACIPod:       make(map[string]aciPodAnnot),
1✔
410
                nodeACIPodAnnot:  make(map[string]aciPodAnnot),
1✔
411
                nodeOpflexDevice: make(map[string]apicapi.ApicSlice),
1✔
412

1✔
413
                nodeServiceMetaCache:     make(map[string]*nodeServiceMeta),
1✔
414
                nodePodNetCache:          make(map[string]*nodePodNetMeta),
1✔
415
                serviceMetaCache:         make(map[string]*serviceMeta),
1✔
416
                snatPolicyCache:          make(map[string]*ContSnatPolicy),
1✔
417
                snatServices:             make(map[string]bool),
1✔
418
                snatNodeInfoCache:        make(map[string]*nodeinfo.NodeInfo),
1✔
419
                rdConfigCache:            make(map[string]*rdConfig.RdConfig),
1✔
420
                rdConfigSubnetCache:      make(map[string]*rdConfig.RdConfigSpec),
1✔
421
                podIftoEp:                make(map[string]*EndPointData),
1✔
422
                snatGlobalInfoCache:      make(map[string]map[string]*snatglobalinfo.GlobalInfo),
1✔
423
                istioCache:               make(map[string]*istiov1.AciIstioOperator),
1✔
424
                crdHandlers:              make(map[string]func(*AciController, <-chan struct{})),
1✔
425
                ctrPortNameCache:         make(map[string]*ctrPortNameEntry),
1✔
426
                nmPortNp:                 make(map[string]bool),
1✔
427
                hppRef:                   make(map[string]hppReference),
1✔
428
                additionalNetworkCache:   make(map[string]*AdditionalNetworkMeta),
1✔
429
                sharedEncapCache:         make(map[int]*sharedEncapData),
1✔
430
                sharedEncapNfcCache:      make(map[int]*NfcData),
1✔
431
                sharedEncapNfcVlanMap:    make(map[int]*NfcData),
1✔
432
                sharedEncapNfcLabelMap:   make(map[string]*NfcData),
1✔
433
                sharedEncapLabelMap:      make(map[string][]int),
1✔
434
                lldpIfCache:              make(map[string]string),
1✔
435
                fabricVlanPoolMap:        make(map[string]map[string]string),
1✔
436
                openStackFabricPathDnMap: make(map[string]openstackOpflexOdevInfo),
1✔
437
        }
1✔
438
        cont.syncProcessors = map[string]func() bool{
1✔
439
                "snatGlobalInfo": cont.syncSnatGlobalInfo,
1✔
440
                "rdConfig":       cont.syncRdConfig,
1✔
441
                /* Commenting code to remove dependency from istio.io/istio package.
1✔
442
                   Vulnerabilties were detected by quay.io security scan of aci-containers-controller
1✔
443
                   and aci-containers-operator images for istio.io/istio package
1✔
444

1✔
445
                "istioCR":        cont.createIstioCR,
1✔
446
                */
1✔
447
        }
1✔
448
        return cont
1✔
449
}
1✔
450

451
func (cont *AciController) Init() {
×
452
        if cont.config.ChainedMode {
×
453
                cont.log.Info("In chained mode")
×
454
        }
×
455
        if cont.config.LBType != lbTypeAci && !cont.config.ChainedMode {
×
456
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedNwPol")
×
457
                if err != nil {
×
458
                        panic(err.Error())
×
459
                }
460
        }
461

462
        egdata, err := json.Marshal(cont.config.DefaultEg)
×
463
        if err != nil {
×
464
                cont.log.Error("Could not serialize default endpoint group")
×
465
                panic(err.Error())
×
466
        }
467
        cont.defaultEg = string(egdata)
×
468

×
469
        sgdata, err := json.Marshal(cont.config.DefaultSg)
×
470
        if err != nil {
×
471
                cont.log.Error("Could not serialize default security groups")
×
472
                panic(err.Error())
×
473
        }
474
        cont.defaultSg = string(sgdata)
×
475

×
476
        cont.log.Debug("Initializing IPAM")
×
477
        cont.initIpam()
×
478
        // check if the cluster supports endpoint slices
×
479
        // if cluster doesn't have the support fallback to endpoints
×
480
        kubeClient := cont.env.(*K8sEnvironment).kubeClient
×
481
        if util.IsEndPointSlicesSupported(kubeClient) {
×
482
                cont.serviceEndPoints = &serviceEndpointSlice{}
×
483
                cont.serviceEndPoints.(*serviceEndpointSlice).cont = cont
×
484
                cont.log.Info("Initializing ServiceEndpointSlices")
×
485
        } else {
×
486
                cont.serviceEndPoints = &serviceEndpoint{}
×
487
                cont.serviceEndPoints.(*serviceEndpoint).cont = cont
×
488
                cont.log.Info("Initializing ServiceEndpoints")
×
489
        }
×
490

491
        err = cont.env.Init(cont)
×
492
        if err != nil {
×
493
                panic(err.Error())
×
494
        }
495
}
496

497
func (cont *AciController) processQueue(queue workqueue.RateLimitingInterface,
498
        store cache.Store, handler func(interface{}) bool,
499
        deleteHandler func(string) bool,
500
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
501
        go wait.Until(func() {
2✔
502
                for {
2✔
503
                        key, quit := queue.Get()
1✔
504
                        if quit {
2✔
505
                                break
1✔
506
                        }
507

508
                        var requeue bool
1✔
509
                        switch key := key.(type) {
1✔
510
                        case chan struct{}:
×
511
                                close(key)
×
512
                        case string:
1✔
513
                                if strings.HasPrefix(key, "DELETED_") {
2✔
514
                                        delKey := strings.Trim(key, "DELETED_")
1✔
515
                                        requeue = deleteHandler(delKey)
1✔
516
                                } else {
2✔
517
                                        obj, exists, err := store.GetByKey(key)
1✔
518
                                        if err != nil {
1✔
519
                                                cont.log.Debugf("Error fetching object with key %s from store: %v", key, err)
×
520
                                        }
×
521
                                        //Handle Add/Update/Delete
522
                                        if exists && handler != nil {
2✔
523
                                                requeue = handler(obj)
1✔
524
                                        }
1✔
525
                                        //Handle Post Delete
526
                                        if !exists && postDelHandler != nil {
1✔
527
                                                requeue = postDelHandler()
×
528
                                        }
×
529
                                }
530
                        }
531
                        if requeue {
2✔
532
                                queue.AddRateLimited(key)
1✔
533
                        } else {
2✔
534
                                queue.Forget(key)
1✔
535
                        }
1✔
536
                        queue.Done(key)
1✔
537
                }
538
        }, time.Second, stopCh)
539
        <-stopCh
1✔
540
        queue.ShutDown()
1✔
541
}
542

543
func (cont *AciController) globalStaticObjs() apicapi.ApicSlice {
1✔
544
        return apicapi.ApicSlice{}
1✔
545
}
1✔
546

547
func (cont *AciController) aciNameForKey(ktype, key string) string {
1✔
548
        return util.AciNameForKey(cont.config.AciPrefix, ktype, key)
1✔
549
}
1✔
550

551
func (cont *AciController) initStaticObjs() {
1✔
552
        cont.env.InitStaticAciObjects()
1✔
553
        cont.apicConn.WriteStaticApicObjects(cont.config.AciPrefix+"_static",
1✔
554
                cont.globalStaticObjs())
1✔
555
}
1✔
556

557
func (cont *AciController) vmmDomainProvider() (vmmProv string) {
1✔
558
        vmmProv = "Kubernetes"
1✔
559
        if strings.ToLower(cont.config.AciVmmDomainType) == "openshift" {
1✔
560
                vmmProv = "OpenShift"
×
561
        }
×
562
        return
1✔
563
}
564

565
func (cont *AciController) Run(stopCh <-chan struct{}) {
1✔
566
        var err error
1✔
567
        var privKey []byte
1✔
568
        var apicCert []byte
1✔
569

1✔
570
        cont.config.AciVrfDn = "uni/tn-" + cont.config.AciVrfTenant + "/ctx-" + cont.config.AciVrf
1✔
571

1✔
572
        if cont.config.ApicPrivateKeyPath != "" {
1✔
573
                privKey, err = os.ReadFile(cont.config.ApicPrivateKeyPath)
×
574
                if err != nil {
×
575
                        panic(err)
×
576
                }
577
        }
578
        if cont.config.ApicCertPath != "" {
1✔
579
                apicCert, err = os.ReadFile(cont.config.ApicCertPath)
×
580
                if err != nil {
×
581
                        panic(err)
×
582
                }
583
        }
584
        // If not defined, default is 1800
585
        if cont.config.ApicRefreshTimer == "" {
2✔
586
                cont.config.ApicRefreshTimer = "1800"
1✔
587
        }
1✔
588
        refreshTimeout, err := strconv.Atoi(cont.config.ApicRefreshTimer)
1✔
589
        if err != nil {
1✔
590
                panic(err)
×
591
        }
592
        cont.log.Info("ApicRefreshTimer conf is set to: ", refreshTimeout)
1✔
593

1✔
594
        // Bailout if the refreshTimeout is more than 12Hours or less than 5Mins
1✔
595
        if refreshTimeout > (12*60*60) || refreshTimeout < (5*60) {
1✔
596
                cont.log.Info("ApicRefreshTimer can't be more than 12Hrs or less than 5Mins")
×
597
                panic(err)
×
598
        }
599

600
        // If RefreshTickerAdjustInterval is not defined, default to 150Sec.
601
        if cont.config.ApicRefreshTickerAdjust == "" {
2✔
602
                cont.config.ApicRefreshTickerAdjust = "150"
1✔
603
        }
1✔
604
        refreshTickerAdjust, err := strconv.Atoi(cont.config.ApicRefreshTickerAdjust)
1✔
605
        if err != nil {
1✔
606
                panic(err)
×
607
        }
608

609
        //If ApicSubscriptionDelay is not defined, default to 100ms
610
        if cont.config.ApicSubscriptionDelay == 0 {
2✔
611
                cont.config.ApicSubscriptionDelay = 100
1✔
612
        }
1✔
613
        cont.log.Info("ApicSubscriptionDelay conf is set to: ", cont.config.ApicSubscriptionDelay)
1✔
614

1✔
615
        // If OpflexDeviceDeleteTimeout is not defined, default to 1800s
1✔
616
        if cont.config.OpflexDeviceDeleteTimeout == 0 {
2✔
617
                cont.config.OpflexDeviceDeleteTimeout = 1800
1✔
618
        }
1✔
619

620
        // If OpflexDeviceReconnectWaitTimeout is not defined, default to 25s
621
        if cont.config.OpflexDeviceReconnectWaitTimeout == 0 {
2✔
622
                cont.config.OpflexDeviceReconnectWaitTimeout = 25
1✔
623
        }
1✔
624
        cont.log.Debug("OpflexDeviceReconnectWaitTimeout set to: ", cont.config.OpflexDeviceReconnectWaitTimeout)
1✔
625

1✔
626
        // If SleepTimeSnatGlobalInfoSync is not defined, default to 60
1✔
627
        if cont.config.SleepTimeSnatGlobalInfoSync == 0 {
2✔
628
                cont.config.SleepTimeSnatGlobalInfoSync = 60
1✔
629
        }
1✔
630

631
        // If not defined, default to 32
632
        if cont.config.PodIpPoolChunkSize == 0 {
2✔
633
                cont.config.PodIpPoolChunkSize = 32
1✔
634
        }
1✔
635
        if !cont.config.ChainedMode {
2✔
636
                cont.log.Info("PodIpPoolChunkSize conf is set to: ", cont.config.PodIpPoolChunkSize)
1✔
637
        }
1✔
638

639
        // If ApicConnectionRetryLimit is not defined, default to 5
640
        if cont.config.ApicConnectionRetryLimit == 0 {
2✔
641
                cont.config.ApicConnectionRetryLimit = 5
1✔
642
        }
1✔
643
        cont.log.Debug("ApicConnectionRetryLimit set to: ", cont.config.ApicConnectionRetryLimit)
1✔
644

1✔
645
        // If not valid, default to 5000-65000
1✔
646
        // other permissible values 1-65000
1✔
647
        defStart := 5000
1✔
648
        defEnd := 65000
1✔
649
        if cont.config.SnatDefaultPortRangeStart == 0 {
2✔
650
                cont.config.SnatDefaultPortRangeStart = defStart
1✔
651
        }
1✔
652
        if cont.config.SnatDefaultPortRangeEnd == 0 {
2✔
653
                cont.config.SnatDefaultPortRangeEnd = defEnd
1✔
654
        }
1✔
655
        if cont.config.SnatDefaultPortRangeStart < 0 || cont.config.SnatDefaultPortRangeEnd < 0 ||
1✔
656
                cont.config.SnatDefaultPortRangeStart > defEnd || cont.config.SnatDefaultPortRangeEnd > defEnd ||
1✔
657
                cont.config.SnatDefaultPortRangeStart > cont.config.SnatDefaultPortRangeEnd {
1✔
658
                cont.config.SnatDefaultPortRangeStart = defStart
×
659
                cont.config.SnatDefaultPortRangeEnd = defEnd
×
660
        }
×
661

662
        // Set default value for pbr programming delay if services list is not empty
663
        // and delay value is empty
664
        if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 &&
1✔
665
                cont.config.ServiceGraphEndpointAddDelay.Services != nil &&
1✔
666
                len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 {
1✔
667
                cont.config.ServiceGraphEndpointAddDelay.Delay = 90
×
668
        }
×
669
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
670
                cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay)
×
671
        }
×
672

673
        // Set contract scope for snat svc graph to global by default
674
        if cont.config.SnatSvcContractScope == "" {
2✔
675
                cont.config.SnatSvcContractScope = "global"
1✔
676
        }
1✔
677
        if cont.config.MaxSvcGraphNodes == 0 {
2✔
678
                cont.config.MaxSvcGraphNodes = 32
1✔
679
        }
1✔
680
        if !cont.config.ChainedMode {
2✔
681
                cont.log.Info("Max number of nodes per svc graph is set to: ", cont.config.MaxSvcGraphNodes)
1✔
682
        }
1✔
683
        cont.apicConn, err = apicapi.New(cont.log, cont.config.ApicHosts,
1✔
684
                cont.config.ApicUsername, cont.config.ApicPassword,
1✔
685
                privKey, apicCert, cont.config.AciPrefix,
1✔
686
                refreshTimeout, refreshTickerAdjust, cont.config.ApicSubscriptionDelay,
1✔
687
                cont.config.AciVrfTenant)
1✔
688
        if err != nil {
1✔
689
                panic(err)
×
690
        }
691

692
        cont.apicConn.ReconnectRetryLimit = cont.config.ApicConnectionRetryLimit
1✔
693

1✔
694
        if len(cont.config.ApicHosts) != 0 {
1✔
695
        APIC_SWITCH:
×
696
                cont.log.WithFields(logrus.Fields{
×
697
                        "mod":  "APICAPI",
×
698
                        "host": cont.apicConn.Apic[cont.apicConn.ApicIndex],
×
699
                }).Debug("Connecting to APIC to determine the Version")
×
700

×
701
                version, err := cont.apicConn.GetVersion()
×
702
                if err != nil {
×
703
                        cont.log.Error("Could not get APIC version, switching to next APIC")
×
704
                        cont.apicConn.ApicIndex = (cont.apicConn.ApicIndex + 1) % len(cont.apicConn.Apic)
×
705
                        time.Sleep(cont.apicConn.ReconnectInterval)
×
706
                        goto APIC_SWITCH
×
707
                }
708
                cont.apicConn.CachedVersion = version
×
709
                apicapi.ApicVersion = version
×
710
                if version >= "4.2(4i)" {
×
711
                        cont.apicConn.SnatPbrFltrChain = true
×
712
                } else {
×
713
                        cont.apicConn.SnatPbrFltrChain = false
×
714
                }
×
715
                if version >= "5.2" {
×
716
                        cont.vmmClusterFaultSupported = true
×
717
                }
×
718
        } else { // For unit-tests
1✔
719
                cont.apicConn.SnatPbrFltrChain = true
1✔
720
        }
1✔
721

722
        if !cont.config.ChainedMode {
2✔
723
                cont.log.Debug("SnatPbrFltrChain set to:", cont.apicConn.SnatPbrFltrChain)
1✔
724
                // Make sure Pod/NodeBDs are assoicated to same VRF.
1✔
725
                if len(cont.config.ApicHosts) != 0 && cont.config.AciPodBdDn != "" && cont.config.AciNodeBdDn != "" {
1✔
726
                        var expectedVrfRelations []string
×
727
                        expectedVrfRelations = append(expectedVrfRelations, cont.config.AciPodBdDn, cont.config.AciNodeBdDn)
×
728
                        cont.log.Debug("expectedVrfRelations:", expectedVrfRelations)
×
729
                        err = cont.apicConn.ValidateAciVrfAssociation(cont.config.AciVrfDn, expectedVrfRelations)
×
730
                        if err != nil {
×
731
                                cont.log.Error("Pod/NodeBDs and AciL3Out VRF association is incorrect")
×
732
                                panic(err)
×
733
                        }
734
                }
735
        }
736

737
        if len(cont.config.ApicHosts) != 0 && cont.vmmClusterFaultSupported && !cont.config.ChainedMode {
1✔
738
                //Clear fault instances when the controller starts
×
739
                cont.clearFaultInstances()
×
740
                //Subscribe for vmmEpPD for a given domain
×
741
                var tnTargetFilterEpg string
×
742
                tnTargetFilterEpg += fmt.Sprintf("uni/vmmp-%s/dom-%s/", cont.vmmDomainProvider(), cont.config.AciVmmDomain)
×
743
                subnetTargetFilterEpg := fmt.Sprintf("and(wcard(vmmEpPD.dn,\"%s\"))", tnTargetFilterEpg)
×
744
                cont.apicConn.AddSubscriptionClass("vmmEpPD",
×
745
                        []string{"vmmEpPD"}, subnetTargetFilterEpg)
×
746
                cont.apicConn.SetSubscriptionHooks("vmmEpPD",
×
747
                        func(obj apicapi.ApicObject) bool {
×
748
                                cont.vmmEpPDChanged(obj)
×
749
                                return true
×
750
                        },
×
751
                        func(dn string) {
×
752
                                cont.vmmEpPDDeleted(dn)
×
753
                        })
×
754
        }
755

756
        cont.initStaticObjs()
1✔
757

1✔
758
        err = cont.env.PrepareRun(stopCh)
1✔
759
        if err != nil {
1✔
760
                panic(err.Error())
×
761
        }
762

763
        cont.apicConn.FullSyncHook = func() {
1✔
764
                // put a channel into each work queue and wait on it to
×
765
                // checkpoint object syncing in response to new subscription
×
766
                // updates
×
767
                cont.log.Debug("Starting checkpoint")
×
768
                var chans []chan struct{}
×
769
                qs := make([]workqueue.RateLimitingInterface, 0)
×
770
                _, ok := cont.env.(*K8sEnvironment)
×
771
                if ok {
×
772
                        if !cont.config.ChainedMode {
×
773
                                qs = []workqueue.RateLimitingInterface{
×
774
                                        cont.podQueue, cont.netPolQueue, cont.qosQueue,
×
775
                                        cont.serviceQueue, cont.snatQueue, cont.netflowQueue,
×
776
                                        cont.snatNodeInfoQueue, cont.rdConfigQueue, cont.erspanQueue,
×
777
                                }
×
778
                        } else {
×
779
                                qs = []workqueue.RateLimitingInterface{
×
UNCOV
780
                                        cont.podQueue,
×
UNCOV
781
                                }
×
782
                        }
×
783
                }
784
                for _, q := range qs {
×
785
                        c := make(chan struct{})
×
786
                        chans = append(chans, c)
×
787
                        q.Add(c)
×
788
                }
×
789
                for _, c := range chans {
×
790
                        <-c
×
UNCOV
791
                }
×
UNCOV
792
                cont.log.Debug("Checkpoint complete")
×
793
        }
794

795
        if len(cont.config.ApicHosts) != 0 && !cont.config.ChainedMode {
1✔
796
                cont.BuildSubnetDnCache(cont.config.AciVrfDn, cont.config.AciVrfDn)
×
UNCOV
797
                cont.scheduleRdConfig()
×
NEW
798
                if strings.Contains(cont.config.Flavor, "openstack") {
×
NEW
799
                        cont.setOpenStackSystemId()
×
NEW
800
                }
×
801
        }
802

803
        if !cont.config.ChainedMode {
2✔
804
                if cont.config.AciPolicyTenant != cont.config.AciVrfTenant {
2✔
805
                        cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
806
                                []string{"hostprotPol"})
1✔
807
                }
1✔
808
        } else {
1✔
809
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
810
                        []string{"fvBD", "fvAp"})
1✔
811
                cont.apicConn.AddSubscriptionClass("fvnsVlanInstP",
1✔
812
                        []string{"fvnsVlanInstP"}, "")
1✔
813
                cont.apicConn.AddSubscriptionClass("infraRsDomP",
1✔
814
                        []string{"infraRsDomP"}, "")
1✔
815
                cont.apicConn.AddSubscriptionClass("physDomP",
1✔
816
                        []string{"physDomP"}, "")
1✔
817
                cont.apicConn.AddSubscriptionClass("infraRsVlanNs",
1✔
818
                        []string{"infraRsVlanNs"}, "")
1✔
819
                cont.apicConn.AddSubscriptionClass("infraGeneric",
1✔
820
                        []string{"infraGeneric", "infraRsFuncToEpg"}, "")
1✔
821
        }
1✔
822
        if !cont.config.ChainedMode {
2✔
823
                // When a new class is added for subscriptio, check if its name attribute
1✔
824
                // is in the format aciPrefix-<some value>, if so add it in nameAttrClass
1✔
825
                // in apicapi.go
1✔
826
                subscribeMo := []string{"fvBD", "vnsLDevVip", "vnsAbsGraph", "vnsLDevCtx",
1✔
827
                        "vzFilter", "vzBrCP", "l3extInstP", "vnsSvcRedirectPol",
1✔
828
                        "vnsRedirectHealthGroup", "fvIPSLAMonitoringPol"}
1✔
829
                if cont.config.AciPolicyTenant == cont.config.AciVrfTenant {
1✔
UNCOV
830
                        subscribeMo = append(subscribeMo, "hostprotPol")
×
UNCOV
831
                }
×
832
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciVrfTenant,
1✔
833
                        subscribeMo)
1✔
834
                cont.apicConn.AddSubscriptionDn(fmt.Sprintf("uni/tn-%s/out-%s",
1✔
835
                        cont.config.AciVrfTenant, cont.config.AciL3Out),
1✔
836
                        []string{"fvRsCons"})
1✔
837
                vmmDn := fmt.Sprintf("comp/prov-%s/ctrlr-[%s]-%s/injcont",
1✔
838
                        cont.env.VmmPolicy(), cont.config.AciVmmDomain,
1✔
839
                        cont.config.AciVmmController)
1✔
840
                // Before subscribing to vmm objects, add vmmInjectedLabel as a child after explicit APIC version check
1✔
841
                // Since it is not supported for APIC versions < "5.0"
1✔
842
                cont.addVmmInjectedLabel()
1✔
843
                cont.apicConn.AddSubscriptionDn(vmmDn,
1✔
844
                        []string{"vmmInjectedHost", "vmmInjectedNs"})
1✔
845

1✔
846
                var tnTargetFilter string
1✔
847
                if len(cont.config.AciVrfRelatedTenants) > 0 {
1✔
848
                        for _, tn := range cont.config.AciVrfRelatedTenants {
×
UNCOV
849
                                tnTargetFilter += fmt.Sprintf("tn-%s/|", tn)
×
UNCOV
850
                        }
×
851
                } else {
1✔
852
                        tnTargetFilter += fmt.Sprintf("tn-%s/|tn-%s/",
1✔
853
                                cont.config.AciPolicyTenant, cont.config.AciVrfTenant)
1✔
854
                }
1✔
855
                subnetTargetFilter := fmt.Sprintf("and(wcard(fvSubnet.dn,\"%s\"))",
1✔
856
                        tnTargetFilter)
1✔
857
                cont.apicConn.AddSubscriptionClass("fvSubnet",
1✔
858
                        []string{"fvSubnet"}, subnetTargetFilter)
1✔
859

1✔
860
                cont.apicConn.SetSubscriptionHooks("fvSubnet",
1✔
861
                        func(obj apicapi.ApicObject) bool {
1✔
862
                                cont.SubnetChanged(obj, cont.config.AciVrfDn)
×
863
                                return true
×
864
                        },
×
865
                        func(dn string) {
×
UNCOV
866
                                cont.SubnetDeleted(dn)
×
UNCOV
867
                        })
×
868

869
                cont.apicConn.AddSubscriptionClass("opflexODev",
1✔
870
                        []string{"opflexODev"}, "")
1✔
871

1✔
872
                cont.apicConn.SetSubscriptionHooks("opflexODev",
1✔
873
                        func(obj apicapi.ApicObject) bool {
1✔
874
                                cont.opflexDeviceChanged(obj)
×
875
                                return true
×
876
                        },
×
877
                        func(dn string) {
×
UNCOV
878
                                cont.opflexDeviceDeleted(dn)
×
UNCOV
879
                        })
×
880
        }
881
        go cont.apicConn.Run(stopCh)
1✔
882
}
883

884
func (cont *AciController) syncNodeAciPods(stopCh <-chan struct{}, seconds time.Duration) {
1✔
885
        cont.log.Debug("Go routine to periodically check the aci pod information in the opflexOdev of a node")
1✔
886
        ticker := time.NewTicker(seconds * time.Second)
1✔
887
        defer ticker.Stop()
1✔
888

1✔
889
        for {
2✔
890
                select {
1✔
891
                case <-ticker.C:
1✔
892
                        if cont.config.EnableOpflexAgentReconnect {
1✔
UNCOV
893
                                cont.checkChangeOfOpflexOdevAciPod()
×
894
                        }
×
895
                        if cont.config.AciMultipod {
1✔
UNCOV
896
                                cont.checkChangeOfOdevAciPod()
×
UNCOV
897
                        }
×
898
                case <-stopCh:
1✔
899
                        return
1✔
900
                }
901
        }
902
}
903

904
func (cont *AciController) syncOpflexDevices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
905
        cont.log.Debug("Go routine to periodically delete old opflexdevices started")
1✔
906
        ticker := time.NewTicker(seconds * time.Second)
1✔
907
        defer ticker.Stop()
1✔
908

1✔
909
        for {
2✔
910
                select {
1✔
911
                case <-ticker.C:
1✔
912
                        cont.deleteOldOpflexDevices()
1✔
913
                case <-stopCh:
1✔
914
                        return
1✔
915
                }
916
        }
917
}
918

919
func (cont *AciController) syncDelayedEpSlices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
920
        cont.log.Debug("Go routine to periodically check and process the epslices having delay adding in service")
1✔
921
        ticker := time.NewTicker(seconds * time.Second)
1✔
922
        defer ticker.Stop()
1✔
923

1✔
924
        for {
2✔
925
                select {
1✔
926
                case <-ticker.C:
1✔
927
                        cont.processDelayedEpSlices()
1✔
928
                case <-stopCh:
1✔
929
                        return
1✔
930
                }
931
        }
932
}
933

934
func (cont *AciController) snatGlobalInfoSync(stopCh <-chan struct{}, seconds int) {
1✔
935
        time.Sleep(time.Duration(seconds) * time.Second)
1✔
936
        cont.log.Debug("Go routine to periodically sync globalinfo and nodeinfo started")
1✔
937
        iteration := 0
1✔
938
        for {
2✔
939
                // To avoid noisy logs, only printing once in 5 minutes
1✔
940
                if iteration%5 == 0 {
2✔
941
                        cont.log.Debug("Syncing GlobalInfo with Node infos")
1✔
942
                }
1✔
943
                var nodeInfos []*nodeinfo.NodeInfo
1✔
944
                cont.indexMutex.Lock()
1✔
945
                cache.ListAll(cont.snatNodeInfoIndexer, labels.Everything(),
1✔
946
                        func(nodeInfoObj interface{}) {
2✔
947
                                nodeInfo := nodeInfoObj.(*nodeinfo.NodeInfo)
1✔
948
                                nodeInfos = append(nodeInfos, nodeInfo)
1✔
949
                        })
1✔
950
                expectedmap := make(map[string]map[string]bool)
1✔
951
                for _, glinfo := range cont.snatGlobalInfoCache {
2✔
952
                        for nodename, entry := range glinfo {
2✔
953
                                if _, found := expectedmap[nodename]; !found {
2✔
954
                                        newentry := make(map[string]bool)
1✔
955
                                        newentry[entry.SnatPolicyName] = true
1✔
956
                                        expectedmap[nodename] = newentry
1✔
957
                                } else {
2✔
958
                                        currententry := expectedmap[nodename]
1✔
959
                                        currententry[entry.SnatPolicyName] = true
1✔
960
                                        expectedmap[nodename] = currententry
1✔
961
                                }
1✔
962
                        }
963
                }
964
                cont.indexMutex.Unlock()
1✔
965

1✔
966
                for _, value := range nodeInfos {
2✔
967
                        marked := false
1✔
968
                        policyNames := value.Spec.SnatPolicyNames
1✔
969
                        nodeName := value.ObjectMeta.Name
1✔
970
                        _, ok := expectedmap[nodeName]
1✔
971
                        if !ok && len(policyNames) > 0 {
2✔
972
                                cont.log.Info("Adding missing entry in snatglobalinfo for node: ", nodeName)
1✔
973
                                cont.log.Debug("No snat policies found in snatglobalinfo")
1✔
974
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
975
                                marked = true
1✔
976
                        } else if len(policyNames) != len(expectedmap[nodeName]) {
3✔
977
                                cont.log.Info("Adding missing snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
978
                                cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
979
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
980
                                marked = true
1✔
981
                        } else {
2✔
982
                                if len(policyNames) == 0 && len(expectedmap[nodeName]) == 0 {
1✔
UNCOV
983
                                        // No snatpolicies present
×
UNCOV
984
                                        continue
×
985
                                }
986
                                eq := reflect.DeepEqual(expectedmap[nodeName], policyNames)
1✔
987
                                if !eq {
2✔
988
                                        cont.log.Debug("Syncing inconsistent snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
989
                                        cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
990
                                        cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
991
                                        marked = true
1✔
992
                                }
1✔
993
                        }
994
                        if marked {
2✔
995
                                cont.log.Info("Nodeinfo and globalinfo out of sync for node: ", nodeName)
1✔
996
                                nodeinfokey, err := cache.MetaNamespaceKeyFunc(value)
1✔
997
                                if err != nil {
1✔
UNCOV
998
                                        cont.log.Error("Not able to get key for node: ", nodeName)
×
UNCOV
999
                                        continue
×
1000
                                }
1001
                                cont.log.Info("Queuing nodeinfokey for globalinfo sync: ", nodeinfokey)
1✔
1002
                                cont.queueNodeInfoUpdateByKey(nodeinfokey)
1✔
1003
                        } else if iteration%5 == 0 {
2✔
1004
                                cont.log.Info("Nodeinfo and globalinfo in sync for node: ", nodeName)
1✔
1005
                        }
1✔
1006
                }
1007
                time.Sleep(time.Duration(seconds) * time.Second)
1✔
1008
                iteration++
1✔
1009
        }
1010
}
1011

1012
func (cont *AciController) processSyncQueue(queue workqueue.RateLimitingInterface,
1013
        queueStop <-chan struct{}) {
1✔
1014
        go wait.Until(func() {
2✔
1015
                for {
2✔
1016
                        syncType, quit := queue.Get()
1✔
1017
                        if quit {
2✔
1018
                                break
1✔
1019
                        }
1020
                        var requeue bool
1✔
1021
                        if sType, ok := syncType.(string); ok {
2✔
1022
                                if f, ok := cont.syncProcessors[sType]; ok {
2✔
1023
                                        requeue = f()
1✔
1024
                                }
1✔
1025
                        }
1026
                        if requeue {
1✔
UNCOV
1027
                                queue.AddRateLimited(syncType)
×
1028
                        } else {
1✔
1029
                                queue.Forget(syncType)
1✔
1030
                        }
1✔
1031
                        queue.Done(syncType)
1✔
1032
                }
1033
        }, time.Second, queueStop)
1034
        <-queueStop
1✔
1035
        queue.ShutDown()
1✔
1036
}
1037

1038
func (cont *AciController) scheduleSyncGlobalInfo() {
1✔
1039
        cont.syncQueue.AddRateLimited("snatGlobalInfo")
1✔
1040
}
1✔
1041
func (cont *AciController) scheduleRdConfig() {
×
1042
        cont.syncQueue.AddRateLimited("rdConfig")
×
1043
}
×
1044
func (cont *AciController) scheduleCreateIstioCR() {
×
UNCOV
1045
        cont.syncQueue.AddRateLimited("istioCR")
×
UNCOV
1046
}
×
1047

1048
func (cont *AciController) addVmmInjectedLabel() {
1✔
1049
        if apicapi.ApicVersion >= "5.2" {
1✔
1050
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedLabel")
×
UNCOV
1051
                if err != nil {
×
1052
                        panic(err.Error())
×
1053
                }
1054
                err = apicapi.AddMetaDataChild("vmmInjectedSvc", "vmmInjectedLabel")
×
UNCOV
1055
                if err != nil {
×
UNCOV
1056
                        panic(err.Error())
×
1057
                }
1058
        }
1059
        if apicapi.ApicVersion >= "5.0" {
2✔
1060
                err := apicapi.AddMetaDataChild("vmmInjectedReplSet", "vmmInjectedLabel")
1✔
1061
                if err != nil {
1✔
UNCOV
1062
                        panic(err.Error())
×
1063
                }
1064
                err = apicapi.AddMetaDataChild("vmmInjectedContGrp", "vmmInjectedLabel")
1✔
1065
                if err != nil {
1✔
UNCOV
1066
                        panic(err.Error())
×
1067
                }
1068
                err = apicapi.AddMetaDataChild("vmmInjectedDepl", "vmmInjectedLabel")
1✔
1069
                if err != nil {
1✔
UNCOV
1070
                        panic(err.Error())
×
1071
                }
1072
        }
1073
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc