• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 10542

08 Apr 2025 05:34PM UTC coverage: 69.045% (+0.01%) from 69.032%
10542

push

travis-pro

web-flow
Merge pull request #1508 from noironetworks/upgrade-k8s-go

Upgrade k8s packages and go version

13318 of 19289 relevant lines covered (69.04%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.96
/pkg/controller/controller.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "encoding/json"
19
        "fmt"
20
        "net"
21
        "os"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/sirupsen/logrus"
29
        "github.com/yl2chen/cidranger"
30
        "golang.org/x/time/rate"
31

32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        v1net "k8s.io/api/networking/v1"
35
        "k8s.io/apimachinery/pkg/labels"
36
        "k8s.io/apimachinery/pkg/util/wait"
37
        "k8s.io/client-go/kubernetes"
38
        "k8s.io/client-go/tools/cache"
39
        "k8s.io/client-go/util/workqueue"
40

41
        "github.com/noironetworks/aci-containers/pkg/apicapi"
42
        fabattv1 "github.com/noironetworks/aci-containers/pkg/fabricattachment/apis/aci.fabricattachment/v1"
43
        fabattclset "github.com/noironetworks/aci-containers/pkg/fabricattachment/clientset/versioned"
44
        hppv1 "github.com/noironetworks/aci-containers/pkg/hpp/apis/aci.hpp/v1"
45
        "github.com/noironetworks/aci-containers/pkg/index"
46
        "github.com/noironetworks/aci-containers/pkg/ipam"
47
        istiov1 "github.com/noironetworks/aci-containers/pkg/istiocrd/apis/aci.istio/v1"
48
        "github.com/noironetworks/aci-containers/pkg/metadata"
49
        nodeinfo "github.com/noironetworks/aci-containers/pkg/nodeinfo/apis/aci.snat/v1"
50
        rdConfig "github.com/noironetworks/aci-containers/pkg/rdconfig/apis/aci.snat/v1"
51
        snatglobalinfo "github.com/noironetworks/aci-containers/pkg/snatglobalinfo/apis/aci.snat/v1"
52
        "github.com/noironetworks/aci-containers/pkg/util"
53
)
54

55
type podUpdateFunc func(*v1.Pod) (*v1.Pod, error)
56
type nodeUpdateFunc func(*v1.Node) (*v1.Node, error)
57
type serviceUpdateFunc func(*v1.Service) (*v1.Service, error)
58
type listNetworkPoliciesFunc func(string) (*v1net.NetworkPolicyList, error)
59
type listNamespacesFunc func() (*v1.NamespaceList, error)
60

61
type AciController struct {
62
        log    *logrus.Logger
63
        config *ControllerConfig
64
        env    Environment
65

66
        defaultEg string
67
        defaultSg string
68

69
        unitTestMode bool
70

71
        podQueue              workqueue.RateLimitingInterface
72
        netPolQueue           workqueue.RateLimitingInterface
73
        qosQueue              workqueue.RateLimitingInterface
74
        serviceQueue          workqueue.RateLimitingInterface
75
        snatQueue             workqueue.RateLimitingInterface
76
        netflowQueue          workqueue.RateLimitingInterface
77
        erspanQueue           workqueue.RateLimitingInterface
78
        snatNodeInfoQueue     workqueue.RateLimitingInterface
79
        rdConfigQueue         workqueue.RateLimitingInterface
80
        istioQueue            workqueue.RateLimitingInterface
81
        nodeFabNetAttQueue    workqueue.RateLimitingInterface
82
        netFabConfigQueue     workqueue.RateLimitingInterface
83
        nadVlanMapQueue       workqueue.RateLimitingInterface
84
        fabricVlanPoolQueue   workqueue.RateLimitingInterface
85
        netFabL3ConfigQueue   workqueue.RateLimitingInterface
86
        remIpContQueue        workqueue.RateLimitingInterface
87
        epgDnCacheUpdateQueue workqueue.RateLimitingInterface
88

89
        namespaceIndexer                     cache.Indexer
90
        namespaceInformer                    cache.Controller
91
        podIndexer                           cache.Indexer
92
        podInformer                          cache.Controller
93
        endpointsIndexer                     cache.Indexer
94
        endpointsInformer                    cache.Controller
95
        serviceIndexer                       cache.Indexer
96
        serviceInformer                      cache.Controller
97
        replicaSetIndexer                    cache.Indexer
98
        replicaSetInformer                   cache.Controller
99
        deploymentIndexer                    cache.Indexer
100
        deploymentInformer                   cache.Controller
101
        nodeIndexer                          cache.Indexer
102
        nodeInformer                         cache.Controller
103
        networkPolicyIndexer                 cache.Indexer
104
        networkPolicyInformer                cache.Controller
105
        snatIndexer                          cache.Indexer
106
        snatInformer                         cache.Controller
107
        snatNodeInfoIndexer                  cache.Indexer
108
        snatNodeInformer                     cache.Controller
109
        snatLocalInfoInformer                cache.Controller
110
        crdInformer                          cache.Controller
111
        rdConfigInformer                     cache.Controller
112
        rdConfigIndexer                      cache.Indexer
113
        qosIndexer                           cache.Indexer
114
        qosInformer                          cache.Controller
115
        netflowIndexer                       cache.Indexer
116
        netflowInformer                      cache.Controller
117
        erspanIndexer                        cache.Indexer
118
        erspanInformer                       cache.Controller
119
        nodePodIfIndexer                     cache.Indexer
120
        nodePodIfInformer                    cache.Controller
121
        istioIndexer                         cache.Indexer
122
        istioInformer                        cache.Controller
123
        endpointSliceIndexer                 cache.Indexer
124
        endpointSliceInformer                cache.Controller
125
        snatCfgInformer                      cache.Controller
126
        updatePod                            podUpdateFunc
127
        updateNode                           nodeUpdateFunc
128
        updateServiceStatus                  serviceUpdateFunc
129
        listNetworkPolicies                  listNetworkPoliciesFunc
130
        listNamespaces                       listNamespacesFunc
131
        nodeFabNetAttInformer                cache.SharedIndexInformer
132
        netFabConfigInformer                 cache.SharedIndexInformer
133
        nadVlanMapInformer                   cache.SharedIndexInformer
134
        fabricVlanPoolInformer               cache.SharedIndexInformer
135
        networkFabricL3ConfigurationInformer cache.SharedIndexInformer
136
        fabNetAttClient                      *fabattclset.Clientset
137
        proactiveConfInformer                cache.SharedIndexInformer
138

139
        indexMutex sync.Mutex
140
        hppMutex   sync.Mutex
141

142
        configuredPodNetworkIps *netIps
143
        podNetworkIps           *netIps
144
        serviceIps              *ipam.IpCache
145
        staticServiceIps        *netIps
146
        nodeServiceIps          *netIps
147

148
        // index of pods matched by deployments
149
        depPods *index.PodSelectorIndex
150
        // index of pods matched by network policies
151
        netPolPods *index.PodSelectorIndex
152
        // index of pods matched by network policy ingress rules
153
        netPolIngressPods *index.PodSelectorIndex
154
        // index of pods matched by network policy egress rules
155
        netPolEgressPods *index.PodSelectorIndex
156
        // index of IP addresses contained in endpoints objects
157
        endpointsIpIndex cidranger.Ranger
158
        // index of service target ports
159
        targetPortIndex map[string]*portIndexEntry
160
        // index of ip blocks referenced by network policy egress rules
161
        netPolSubnetIndex cidranger.Ranger
162
        // index of pods matched by erspan policies
163
        erspanPolPods *index.PodSelectorIndex
164

165
        apicConn *apicapi.ApicConnection
166

167
        nodeServiceMetaCache map[string]*nodeServiceMeta
168
        nodeACIPod           map[string]aciPodAnnot
169
        nodeACIPodAnnot      map[string]aciPodAnnot
170
        nodeOpflexDevice     map[string]apicapi.ApicSlice
171
        nodePodNetCache      map[string]*nodePodNetMeta
172
        serviceMetaCache     map[string]*serviceMeta
173
        snatPolicyCache      map[string]*ContSnatPolicy
174
        delayedEpSlices      []*DelayedEpSlice
175
        snatServices         map[string]bool
176
        snatNodeInfoCache    map[string]*nodeinfo.NodeInfo
177
        rdConfigCache        map[string]*rdConfig.RdConfig
178
        rdConfigSubnetCache  map[string]*rdConfig.RdConfigSpec
179
        istioCache           map[string]*istiov1.AciIstioOperator
180
        podIftoEp            map[string]*EndPointData
181
        // Node Name and Policy Name
182
        snatGlobalInfoCache map[string]map[string]*snatglobalinfo.GlobalInfo
183
        nodeSyncEnabled     bool
184
        serviceSyncEnabled  bool
185
        snatSyncEnabled     bool
186
        syncQueue           workqueue.RateLimitingInterface
187
        syncProcessors      map[string]func() bool
188
        serviceEndPoints    ServiceEndPointType
189
        crdHandlers         map[string]func(*AciController, <-chan struct{})
190
        stopCh              <-chan struct{}
191
        //index of containerportname to ctrPortNameEntry
192
        ctrPortNameCache map[string]*ctrPortNameEntry
193
        // named networkPolicies
194
        nmPortNp map[string]bool
195
        //maps network policy hash to hpp
196
        hppRef map[string]hppReference
197
        //map for ns to remoteIpConts
198
        nsRemoteIpCont map[string]remoteIpConts
199
        // cache to look for Epg DNs which are bound to Vmm domain
200
        cachedEpgDns             []string
201
        vmmClusterFaultSupported bool
202
        additionalNetworkCache   map[string]*AdditionalNetworkMeta
203
        //Used in Shared mode
204
        sharedEncapCache       map[int]*sharedEncapData
205
        sharedEncapAepCache    map[string]map[int]bool
206
        sharedEncapSviCache    map[int]*NfL3Data
207
        sharedEncapVrfCache    map[string]*NfVrfData
208
        sharedEncapTenantCache map[string]*NfTenantData
209
        nfl3configGenerationId int64
210
        // vlan to propertiesList
211
        sharedEncapNfcCache         map[int]*NfcData
212
        sharedEncapNfcVlanMap       map[int]*NfcData
213
        sharedEncapNfcLabelMap      map[string]*NfcData
214
        sharedEncapNfcAppProfileMap map[string]bool
215
        // nadVlanMap encapLabel to vlan
216
        sharedEncapLabelMap      map[string][]int
217
        lldpIfCache              map[string]*NfLLDPIfData
218
        globalVlanConfig         globalVlanConfig
219
        fabricVlanPoolMap        map[string]map[string]string
220
        openStackFabricPathDnMap map[string]openstackOpflexOdevInfo
221
        openStackSystemId        string
222
}
223

224
type NfLLDPIfData struct {
225
        LLDPIf string
226
        // As of now, manage at the NAD level
227
        // more granular introduces intf tracking complexities
228
        // for not sufficient benefits
229
        Refs map[string]bool
230
}
231

232
type NfL3OutData struct {
233
        // +kubebuilder:validation:Enum:"import"
234
        RtCtrl     string
235
        PodId      int
236
        RtrNodeMap map[int]*fabattv1.FabricL3OutRtrNode
237
        ExtEpgMap  map[string]*fabattv1.PolicyPrefixGroup
238
        SviMap     map[int]bool
239
}
240

241
type NfTenantData struct {
242
        CommonTenant     bool
243
        L3OutConfig      map[string]*NfL3OutData
244
        BGPPeerPfxConfig map[string]*fabattv1.BGPPeerPrefixPolicy
245
}
246

247
type NfVrfData struct {
248
        TenantConfig map[string]*NfTenantData
249
}
250

251
type NfL3Networks struct {
252
        fabattv1.PrimaryNetwork
253
        Subnets map[string]*fabattv1.FabricL3Subnet
254
}
255

256
type NfL3Data struct {
257
        Tenant      string
258
        Vrf         fabattv1.VRF
259
        PodId       int
260
        ConnectedNw *NfL3Networks
261
        NetAddr     map[string]*RoutedNetworkData
262
        Nodes       map[int]fabattv1.FabricL3OutNode
263
}
264

265
// maps pod name to remoteIpCont
266
type remoteIpConts map[string]remoteIpCont
267

268
// remoteIpCont maps ip to pod labels
269
type remoteIpCont map[string]map[string]string
270

271
type NfcData struct {
272
        Aeps map[string]bool
273
        Epg  fabattv1.Epg
274
}
275

276
type sharedEncapData struct {
277
        //node to NAD to pods
278
        Pods   map[string]map[string][]string
279
        NetRef map[string]*AdditionalNetworkMeta
280
        Aeps   map[string]bool
281
}
282

283
type globalVlanConfig struct {
284
        SharedPhysDom apicapi.ApicObject
285
        SharedL3Dom   apicapi.ApicObject
286
}
287

288
type hppReference struct {
289
        RefCount uint              `json:"ref-count,omitempty"`
290
        Npkeys   []string          `json:"npkeys,omitempty"`
291
        HppObj   apicapi.ApicSlice `json:"hpp-obj,omitempty"`
292
        HppCr    hppv1.HostprotPol `json:"hpp-cr,omitempty"`
293
}
294

295
type DelayedEpSlice struct {
296
        ServiceKey  string
297
        OldEpSlice  *discovery.EndpointSlice
298
        NewEpSlice  *discovery.EndpointSlice
299
        DelayedTime time.Time
300
}
301

302
type aciPodAnnot struct {
303
        aciPod             string
304
        isSingleOpflexOdev bool
305
        disconnectTime     time.Time
306
        connectTime        time.Time
307
        lastErrorTime      time.Time
308
}
309

310
type nodeServiceMeta struct {
311
        serviceEp metadata.ServiceEndpoint
312
}
313

314
type nodePodNetMeta struct {
315
        nodePods            map[string]bool
316
        podNetIps           metadata.NetIps
317
        podNetIpsAnnotation string
318
}
319

320
type openstackOpflexOdevInfo struct {
321
        opflexODevDn map[string]struct{}
322
        fabricPathDn string
323
}
324

325
type serviceMeta struct {
326
        requestedIps     []net.IP
327
        ingressIps       []net.IP
328
        staticIngressIps []net.IP
329
}
330

331
type ipIndexEntry struct {
332
        ipNet net.IPNet
333
        keys  map[string]bool
334
}
335

336
type targetPort struct {
337
        proto v1.Protocol
338
        ports []int
339
}
340

341
type portIndexEntry struct {
342
        port              targetPort
343
        serviceKeys       map[string]bool
344
        networkPolicyKeys map[string]bool
345
}
346

347
type portRangeSnat struct {
348
        start int
349
        end   int
350
}
351

352
// EndPointData holds PodIF data in controller.
353
type EndPointData struct {
354
        MacAddr    string
355
        EPG        string
356
        Namespace  string
357
        AppProfile string
358
}
359

360
type ctrPortNameEntry struct {
361
        // Proto+port->pods
362
        ctrNmpToPods map[string]map[string]bool
363
}
364

365
type LinkData struct {
366
        Link []string
367
        Pods []string
368
}
369

370
type RoutedNodeData struct {
371
        addr string
372
        idx  int
373
}
374

375
type RoutedNetworkData struct {
376
        subnet       string
377
        netAddress   string
378
        maskLen      int
379
        numAllocated int
380
        maxAddresses int
381
        baseAddress  net.IP
382
        nodeMap      map[string]RoutedNodeData
383
        availableMap map[int]bool
384
}
385

386
type AdditionalNetworkMeta struct {
387
        NetworkName string
388
        EncapVlan   string
389
        //node+localiface->fabricLinks
390
        FabricLink map[string]map[string]LinkData
391
        NodeCache  map[string]*fabattv1.NodeFabricNetworkAttachment
392
        Mode       util.EncapMode
393
}
394

395
type ServiceEndPointType interface {
396
        InitClientInformer(kubeClient *kubernetes.Clientset)
397
        Run(stopCh <-chan struct{})
398
        Wait(stopCh <-chan struct{})
399
        UpdateServicesForNode(nodename string)
400
        GetnodesMetadata(key string, service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint)
401
        SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool
402
        SetNpServiceAugmentForService(servicekey string, service *v1.Service, prs *portRemoteSubnet,
403
                portAugments map[string]*portServiceAugment, subnetIndex cidranger.Ranger, logger *logrus.Entry)
404
}
405

406
type serviceEndpoint struct {
407
        cont *AciController
408
}
409
type serviceEndpointSlice struct {
410
        cont *AciController
411
}
412

413
func (sep *serviceEndpoint) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
414
        sep.cont.initEndpointsInformerFromClient(kubeClient)
×
415
}
×
416

417
func (seps *serviceEndpointSlice) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
418
        seps.cont.initEndpointSliceInformerFromClient(kubeClient)
×
419
}
×
420

421
func (sep *serviceEndpoint) Run(stopCh <-chan struct{}) {
1✔
422
        go sep.cont.endpointsInformer.Run(stopCh)
1✔
423
}
1✔
424

425
func (seps *serviceEndpointSlice) Run(stopCh <-chan struct{}) {
1✔
426
        go seps.cont.endpointSliceInformer.Run(stopCh)
1✔
427
}
1✔
428

429
func (sep *serviceEndpoint) Wait(stopCh <-chan struct{}) {
1✔
430
        cache.WaitForCacheSync(stopCh,
1✔
431
                sep.cont.endpointsInformer.HasSynced,
1✔
432
                sep.cont.serviceInformer.HasSynced)
1✔
433
}
1✔
434

435
func (seps *serviceEndpointSlice) Wait(stopCh <-chan struct{}) {
1✔
436
        seps.cont.log.Debug("Waiting for EndPointSlicecache sync")
1✔
437
        cache.WaitForCacheSync(stopCh,
1✔
438
                seps.cont.endpointSliceInformer.HasSynced,
1✔
439
                seps.cont.serviceInformer.HasSynced)
1✔
440
}
1✔
441

442
func (e *ipIndexEntry) Network() net.IPNet {
1✔
443
        return e.ipNet
1✔
444
}
1✔
445

446
func newNodePodNetMeta() *nodePodNetMeta {
1✔
447
        return &nodePodNetMeta{
1✔
448
                nodePods: make(map[string]bool),
1✔
449
        }
1✔
450
}
1✔
451

452
func createQueue(name string) workqueue.RateLimitingInterface {
1✔
453
        return workqueue.NewNamedRateLimitingQueue(
1✔
454
                workqueue.NewMaxOfRateLimiter(
1✔
455
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
456
                                10*time.Second),
1✔
457
                        &workqueue.BucketRateLimiter{
1✔
458
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
459
                        },
1✔
460
                ),
1✔
461
                "delta")
1✔
462
}
1✔
463

464
func NewController(config *ControllerConfig, env Environment, log *logrus.Logger, unittestmode bool) *AciController {
1✔
465
        cont := &AciController{
1✔
466
                log:          log,
1✔
467
                config:       config,
1✔
468
                env:          env,
1✔
469
                defaultEg:    "",
1✔
470
                defaultSg:    "",
1✔
471
                unitTestMode: unittestmode,
1✔
472

1✔
473
                podQueue:              createQueue("pod"),
1✔
474
                netPolQueue:           createQueue("networkPolicy"),
1✔
475
                qosQueue:              createQueue("qos"),
1✔
476
                netflowQueue:          createQueue("netflow"),
1✔
477
                erspanQueue:           createQueue("erspan"),
1✔
478
                serviceQueue:          createQueue("service"),
1✔
479
                snatQueue:             createQueue("snat"),
1✔
480
                snatNodeInfoQueue:     createQueue("snatnodeinfo"),
1✔
481
                rdConfigQueue:         createQueue("rdconfig"),
1✔
482
                istioQueue:            createQueue("istio"),
1✔
483
                nodeFabNetAttQueue:    createQueue("nodefabricnetworkattachment"),
1✔
484
                netFabConfigQueue:     createQueue("networkfabricconfiguration"),
1✔
485
                nadVlanMapQueue:       createQueue("nadvlanmap"),
1✔
486
                fabricVlanPoolQueue:   createQueue("fabricvlanpool"),
1✔
487
                netFabL3ConfigQueue:   createQueue("networkfabricl3configuration"),
1✔
488
                remIpContQueue:        createQueue("remoteIpContainer"),
1✔
489
                epgDnCacheUpdateQueue: createQueue("epgDnCache"),
1✔
490
                syncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
491
                        &workqueue.BucketRateLimiter{
1✔
492
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
493
                        }, "sync"),
1✔
494

1✔
495
                configuredPodNetworkIps: newNetIps(),
1✔
496
                podNetworkIps:           newNetIps(),
1✔
497
                serviceIps:              ipam.NewIpCache(),
1✔
498
                staticServiceIps:        newNetIps(),
1✔
499
                nodeServiceIps:          newNetIps(),
1✔
500

1✔
501
                nodeACIPod:       make(map[string]aciPodAnnot),
1✔
502
                nodeACIPodAnnot:  make(map[string]aciPodAnnot),
1✔
503
                nodeOpflexDevice: make(map[string]apicapi.ApicSlice),
1✔
504

1✔
505
                nodeServiceMetaCache:        make(map[string]*nodeServiceMeta),
1✔
506
                nodePodNetCache:             make(map[string]*nodePodNetMeta),
1✔
507
                serviceMetaCache:            make(map[string]*serviceMeta),
1✔
508
                snatPolicyCache:             make(map[string]*ContSnatPolicy),
1✔
509
                snatServices:                make(map[string]bool),
1✔
510
                snatNodeInfoCache:           make(map[string]*nodeinfo.NodeInfo),
1✔
511
                rdConfigCache:               make(map[string]*rdConfig.RdConfig),
1✔
512
                rdConfigSubnetCache:         make(map[string]*rdConfig.RdConfigSpec),
1✔
513
                podIftoEp:                   make(map[string]*EndPointData),
1✔
514
                snatGlobalInfoCache:         make(map[string]map[string]*snatglobalinfo.GlobalInfo),
1✔
515
                istioCache:                  make(map[string]*istiov1.AciIstioOperator),
1✔
516
                crdHandlers:                 make(map[string]func(*AciController, <-chan struct{})),
1✔
517
                ctrPortNameCache:            make(map[string]*ctrPortNameEntry),
1✔
518
                nmPortNp:                    make(map[string]bool),
1✔
519
                hppRef:                      make(map[string]hppReference),
1✔
520
                additionalNetworkCache:      make(map[string]*AdditionalNetworkMeta),
1✔
521
                sharedEncapCache:            make(map[int]*sharedEncapData),
1✔
522
                sharedEncapAepCache:         make(map[string]map[int]bool),
1✔
523
                sharedEncapSviCache:         make(map[int]*NfL3Data),
1✔
524
                sharedEncapVrfCache:         make(map[string]*NfVrfData),
1✔
525
                sharedEncapTenantCache:      make(map[string]*NfTenantData),
1✔
526
                sharedEncapNfcCache:         make(map[int]*NfcData),
1✔
527
                sharedEncapNfcVlanMap:       make(map[int]*NfcData),
1✔
528
                sharedEncapNfcLabelMap:      make(map[string]*NfcData),
1✔
529
                sharedEncapNfcAppProfileMap: make(map[string]bool),
1✔
530
                sharedEncapLabelMap:         make(map[string][]int),
1✔
531
                lldpIfCache:                 make(map[string]*NfLLDPIfData),
1✔
532
                fabricVlanPoolMap:           make(map[string]map[string]string),
1✔
533
                openStackFabricPathDnMap:    make(map[string]openstackOpflexOdevInfo),
1✔
534
                nsRemoteIpCont:              make(map[string]remoteIpConts),
1✔
535
        }
1✔
536
        cont.syncProcessors = map[string]func() bool{
1✔
537
                "snatGlobalInfo": cont.syncSnatGlobalInfo,
1✔
538
                "rdConfig":       cont.syncRdConfig,
1✔
539
                /* Commenting code to remove dependency from istio.io/istio package.
1✔
540
                   Vulnerabilties were detected by quay.io security scan of aci-containers-controller
1✔
541
                   and aci-containers-operator images for istio.io/istio package
1✔
542

1✔
543
                "istioCR":        cont.createIstioCR,
1✔
544
                */
1✔
545
        }
1✔
546
        return cont
1✔
547
}
1✔
548

549
func (cont *AciController) Init() {
×
550
        if cont.config.ChainedMode {
×
551
                cont.log.Info("In chained mode")
×
552
        }
×
553

554
        egdata, err := json.Marshal(cont.config.DefaultEg)
×
555
        if err != nil {
×
556
                cont.log.Error("Could not serialize default endpoint group")
×
557
                panic(err.Error())
×
558
        }
559
        cont.defaultEg = string(egdata)
×
560

×
561
        sgdata, err := json.Marshal(cont.config.DefaultSg)
×
562
        if err != nil {
×
563
                cont.log.Error("Could not serialize default security groups")
×
564
                panic(err.Error())
×
565
        }
566
        cont.defaultSg = string(sgdata)
×
567

×
568
        cont.log.Debug("Initializing IPAM")
×
569
        cont.initIpam()
×
570
        // check if the cluster supports endpoint slices
×
571
        // if cluster doesn't have the support fallback to endpoints
×
572
        kubeClient := cont.env.(*K8sEnvironment).kubeClient
×
573
        if util.IsEndPointSlicesSupported(kubeClient) {
×
574
                cont.serviceEndPoints = &serviceEndpointSlice{}
×
575
                cont.serviceEndPoints.(*serviceEndpointSlice).cont = cont
×
576
                cont.log.Info("Initializing ServiceEndpointSlices")
×
577
        } else {
×
578
                cont.serviceEndPoints = &serviceEndpoint{}
×
579
                cont.serviceEndPoints.(*serviceEndpoint).cont = cont
×
580
                cont.log.Info("Initializing ServiceEndpoints")
×
581
        }
×
582

583
        err = cont.env.Init(cont)
×
584
        if err != nil {
×
585
                panic(err.Error())
×
586
        }
587
}
588

589
func (cont *AciController) processQueue(queue workqueue.RateLimitingInterface,
590
        store cache.Store, handler func(interface{}) bool,
591
        deleteHandler func(string) bool,
592
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
593
        go wait.Until(func() {
2✔
594
                for {
2✔
595
                        key, quit := queue.Get()
1✔
596
                        if quit {
2✔
597
                                break
1✔
598
                        }
599

600
                        var requeue bool
1✔
601
                        switch key := key.(type) {
1✔
602
                        case chan struct{}:
×
603
                                close(key)
×
604
                        case string:
1✔
605
                                if strings.HasPrefix(key, "DELETED_") {
2✔
606
                                        delKey := strings.Trim(key, "DELETED_")
1✔
607
                                        requeue = deleteHandler(delKey)
1✔
608
                                } else {
2✔
609
                                        obj, exists, err := store.GetByKey(key)
1✔
610
                                        if err != nil {
1✔
611
                                                cont.log.Debugf("Error fetching object with key %s from store: %v", key, err)
×
612
                                        }
×
613
                                        //Handle Add/Update/Delete
614
                                        if exists && handler != nil {
2✔
615
                                                requeue = handler(obj)
1✔
616
                                        }
1✔
617
                                        //Handle Post Delete
618
                                        if !exists && postDelHandler != nil {
1✔
619
                                                requeue = postDelHandler()
×
620
                                        }
×
621
                                }
622
                        }
623
                        if requeue {
2✔
624
                                queue.AddRateLimited(key)
1✔
625
                        } else {
2✔
626
                                queue.Forget(key)
1✔
627
                        }
1✔
628
                        queue.Done(key)
1✔
629
                }
630
        }, time.Second, stopCh)
631
        <-stopCh
1✔
632
        queue.ShutDown()
1✔
633
}
634

635
func (cont *AciController) processRemIpContQueue(queue workqueue.RateLimitingInterface,
636
        handler func(interface{}) bool,
637
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
638
        go wait.Until(func() {
2✔
639
                for {
2✔
640
                        key, quit := queue.Get()
1✔
641
                        if quit {
2✔
642
                                break
1✔
643
                        }
644

645
                        var requeue bool
1✔
646
                        switch key := key.(type) {
1✔
647
                        case chan struct{}:
×
648
                                close(key)
×
649
                        case string:
1✔
650
                                if handler != nil {
2✔
651
                                        requeue = handler(key)
1✔
652
                                }
1✔
653
                                if postDelHandler != nil {
2✔
654
                                        requeue = postDelHandler()
1✔
655
                                }
1✔
656
                        }
657
                        if requeue {
1✔
658
                                queue.AddRateLimited(key)
×
659
                        } else {
1✔
660
                                queue.Forget(key)
1✔
661
                        }
1✔
662
                        queue.Done(key)
1✔
663

664
                }
665
        }, time.Second, stopCh)
666
        <-stopCh
1✔
667
        queue.ShutDown()
1✔
668
}
669

670
func (cont *AciController) processEpgDnCacheUpdateQueue(queue workqueue.RateLimitingInterface,
671
        handler func(interface{}) bool,
672
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
673
        go wait.Until(func() {
2✔
674
                for {
2✔
675
                        key, quit := queue.Get()
1✔
676
                        if quit {
2✔
677
                                break
1✔
678
                        }
679

680
                        var requeue bool
1✔
681
                        switch key := key.(type) {
1✔
682
                        case chan struct{}:
×
683
                                close(key)
×
684
                        case bool:
1✔
685
                                if handler != nil {
2✔
686
                                        requeue = handler(key)
1✔
687
                                }
1✔
688
                                if postDelHandler != nil {
1✔
689
                                        requeue = postDelHandler()
×
690
                                }
×
691
                        }
692
                        if requeue {
1✔
693
                                queue.AddRateLimited(key)
×
694
                        } else {
1✔
695
                                queue.Forget(key)
1✔
696
                        }
1✔
697
                        queue.Done(key)
1✔
698

699
                }
700
        }, time.Second, stopCh)
701
        <-stopCh
1✔
702
        queue.ShutDown()
1✔
703
}
704

705
func (cont *AciController) globalStaticObjs() apicapi.ApicSlice {
1✔
706
        return apicapi.ApicSlice{}
1✔
707
}
1✔
708

709
func (cont *AciController) aciNameForKey(ktype, key string) string {
1✔
710
        return util.AciNameForKey(cont.config.AciPrefix, ktype, key)
1✔
711
}
1✔
712

713
func (cont *AciController) initStaticObjs() {
1✔
714
        cont.env.InitStaticAciObjects()
1✔
715
        cont.apicConn.WriteStaticApicObjects(cont.config.AciPrefix+"_static",
1✔
716
                cont.globalStaticObjs())
1✔
717
}
1✔
718

719
func (cont *AciController) vmmDomainProvider() (vmmProv string) {
1✔
720
        vmmProv = "Kubernetes"
1✔
721
        if strings.ToLower(cont.config.AciVmmDomainType) == "openshift" {
1✔
722
                vmmProv = "OpenShift"
×
723
        }
×
724
        return
1✔
725
}
726

727
func (cont *AciController) Run(stopCh <-chan struct{}) {
1✔
728
        var err error
1✔
729
        var privKey []byte
1✔
730
        var apicCert []byte
1✔
731

1✔
732
        cont.config.AciVrfDn = "uni/tn-" + cont.config.AciVrfTenant + "/ctx-" + cont.config.AciVrf
1✔
733

1✔
734
        if cont.config.ApicPrivateKeyPath != "" {
1✔
735
                privKey, err = os.ReadFile(cont.config.ApicPrivateKeyPath)
×
736
                if err != nil {
×
737
                        panic(err)
×
738
                }
739
        }
740
        if cont.config.ApicCertPath != "" {
1✔
741
                apicCert, err = os.ReadFile(cont.config.ApicCertPath)
×
742
                if err != nil {
×
743
                        panic(err)
×
744
                }
745
        }
746
        // If not defined, default is 1800
747
        if cont.config.ApicRefreshTimer == "" {
2✔
748
                cont.config.ApicRefreshTimer = "1800"
1✔
749
        }
1✔
750
        refreshTimeout, err := strconv.Atoi(cont.config.ApicRefreshTimer)
1✔
751
        if err != nil {
1✔
752
                panic(err)
×
753
        }
754
        cont.log.Info("ApicRefreshTimer conf is set to: ", refreshTimeout)
1✔
755

1✔
756
        // Bailout if the refreshTimeout is more than 12Hours or less than 5Mins
1✔
757
        if refreshTimeout > (12*60*60) || refreshTimeout < (5*60) {
1✔
758
                cont.log.Info("ApicRefreshTimer can't be more than 12Hrs or less than 5Mins")
×
759
                panic(err)
×
760
        }
761

762
        // If RefreshTickerAdjustInterval is not defined, default to 150Sec.
763
        if cont.config.ApicRefreshTickerAdjust == "" {
2✔
764
                cont.config.ApicRefreshTickerAdjust = "210"
1✔
765
        }
1✔
766
        refreshTickerAdjust, err := strconv.Atoi(cont.config.ApicRefreshTickerAdjust)
1✔
767
        if err != nil {
1✔
768
                panic(err)
×
769
        }
770

771
        //If ApicSubscriptionDelay is not defined, default to 100ms
772
        if cont.config.ApicSubscriptionDelay == 0 {
2✔
773
                cont.config.ApicSubscriptionDelay = 100
1✔
774
        }
1✔
775
        cont.log.Info("ApicSubscriptionDelay conf is set to: ", cont.config.ApicSubscriptionDelay)
1✔
776

1✔
777
        // If OpflexDeviceDeleteTimeout is not defined, default to 1800s
1✔
778
        if cont.config.OpflexDeviceDeleteTimeout == 0 {
2✔
779
                cont.config.OpflexDeviceDeleteTimeout = 1800
1✔
780
        }
1✔
781

782
        // If OpflexDeviceReconnectWaitTimeout is not defined, default to 25s
783
        if cont.config.OpflexDeviceReconnectWaitTimeout == 0 {
2✔
784
                cont.config.OpflexDeviceReconnectWaitTimeout = 25
1✔
785
        }
1✔
786
        cont.log.Debug("OpflexDeviceReconnectWaitTimeout set to: ", cont.config.OpflexDeviceReconnectWaitTimeout)
1✔
787

1✔
788
        // If SleepTimeSnatGlobalInfoSync is not defined, default to 60
1✔
789
        if cont.config.SleepTimeSnatGlobalInfoSync == 0 {
2✔
790
                cont.config.SleepTimeSnatGlobalInfoSync = 60
1✔
791
        }
1✔
792

793
        // If not defined, default to 32
794
        if cont.config.PodIpPoolChunkSize == 0 {
2✔
795
                cont.config.PodIpPoolChunkSize = 32
1✔
796
        }
1✔
797
        if !cont.config.ChainedMode {
2✔
798
                cont.log.Info("PodIpPoolChunkSize conf is set to: ", cont.config.PodIpPoolChunkSize)
1✔
799
        }
1✔
800

801
        // If ApicConnectionRetryLimit is not defined, default to 5
802
        if cont.config.ApicConnectionRetryLimit == 0 {
2✔
803
                cont.config.ApicConnectionRetryLimit = 5
1✔
804
        }
1✔
805
        cont.log.Debug("ApicConnectionRetryLimit set to: ", cont.config.ApicConnectionRetryLimit)
1✔
806

1✔
807
        // If not valid, default to 5000-65000
1✔
808
        // other permissible values 1-65000
1✔
809
        defStart := 5000
1✔
810
        defEnd := 65000
1✔
811
        if cont.config.SnatDefaultPortRangeStart == 0 {
2✔
812
                cont.config.SnatDefaultPortRangeStart = defStart
1✔
813
        }
1✔
814
        if cont.config.SnatDefaultPortRangeEnd == 0 {
2✔
815
                cont.config.SnatDefaultPortRangeEnd = defEnd
1✔
816
        }
1✔
817
        if cont.config.SnatDefaultPortRangeStart < 0 || cont.config.SnatDefaultPortRangeEnd < 0 ||
1✔
818
                cont.config.SnatDefaultPortRangeStart > defEnd || cont.config.SnatDefaultPortRangeEnd > defEnd ||
1✔
819
                cont.config.SnatDefaultPortRangeStart > cont.config.SnatDefaultPortRangeEnd {
1✔
820
                cont.config.SnatDefaultPortRangeStart = defStart
×
821
                cont.config.SnatDefaultPortRangeEnd = defEnd
×
822
        }
×
823

824
        // Set default value for pbr programming delay if services list is not empty
825
        // and delay value is empty
826
        if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 &&
1✔
827
                cont.config.ServiceGraphEndpointAddDelay.Services != nil &&
1✔
828
                len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 {
1✔
829
                cont.config.ServiceGraphEndpointAddDelay.Delay = 90
×
830
        }
×
831
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
832
                cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay)
×
833
        }
×
834

835
        // Set contract scope for snat svc graph to global by default
836
        if cont.config.SnatSvcContractScope == "" {
2✔
837
                cont.config.SnatSvcContractScope = "global"
1✔
838
        }
1✔
839
        if cont.config.MaxSvcGraphNodes == 0 {
2✔
840
                cont.config.MaxSvcGraphNodes = 32
1✔
841
        }
1✔
842
        if !cont.config.ChainedMode {
2✔
843
                cont.log.Info("Max number of nodes per svc graph is set to: ", cont.config.MaxSvcGraphNodes)
1✔
844
        }
1✔
845
        cont.apicConn, err = apicapi.New(cont.log, cont.config.ApicHosts,
1✔
846
                cont.config.ApicUsername, cont.config.ApicPassword,
1✔
847
                privKey, apicCert, cont.config.AciPrefix,
1✔
848
                refreshTimeout, refreshTickerAdjust, cont.config.ApicSubscriptionDelay,
1✔
849
                cont.config.AciVrfTenant, cont.UpdateLLDPIfLocked)
1✔
850
        if err != nil {
1✔
851
                panic(err)
×
852
        }
853

854
        cont.apicConn.ReconnectRetryLimit = cont.config.ApicConnectionRetryLimit
1✔
855

1✔
856
        if len(cont.config.ApicHosts) != 0 {
1✔
857
        APIC_SWITCH:
×
858
                cont.log.WithFields(logrus.Fields{
×
859
                        "mod":  "APICAPI",
×
860
                        "host": cont.apicConn.Apic[cont.apicConn.ApicIndex],
×
861
                }).Debug("Connecting to APIC to determine the Version")
×
862

×
863
                version, err := cont.apicConn.GetVersion()
×
864
                if err != nil {
×
865
                        cont.log.Error("Could not get APIC version, switching to next APIC")
×
866
                        cont.apicConn.ApicIndex = (cont.apicConn.ApicIndex + 1) % len(cont.apicConn.Apic)
×
867
                        time.Sleep(cont.apicConn.ReconnectInterval)
×
868
                        goto APIC_SWITCH
×
869
                }
870
                cont.apicConn.CachedVersion = version
×
871
                apicapi.ApicVersion = version
×
872
                if version >= "4.2(4i)" {
×
873
                        cont.apicConn.SnatPbrFltrChain = true
×
874
                } else {
×
875
                        cont.apicConn.SnatPbrFltrChain = false
×
876
                }
×
877
                if version >= "5.2" {
×
878
                        cont.vmmClusterFaultSupported = true
×
879
                }
×
880
        } else { // For unit-tests
1✔
881
                cont.apicConn.SnatPbrFltrChain = true
1✔
882
        }
1✔
883

884
        if !cont.config.ChainedMode {
2✔
885
                cont.log.Debug("SnatPbrFltrChain set to:", cont.apicConn.SnatPbrFltrChain)
1✔
886
                // Make sure Pod/NodeBDs are assoicated to same VRF.
1✔
887
                if len(cont.config.ApicHosts) != 0 && cont.config.AciPodBdDn != "" && cont.config.AciNodeBdDn != "" {
1✔
888
                        var expectedVrfRelations []string
×
889
                        expectedVrfRelations = append(expectedVrfRelations, cont.config.AciPodBdDn, cont.config.AciNodeBdDn)
×
890
                        cont.log.Debug("expectedVrfRelations:", expectedVrfRelations)
×
891
                        err = cont.apicConn.ValidateAciVrfAssociation(cont.config.AciVrfDn, expectedVrfRelations)
×
892
                        if err != nil {
×
893
                                cont.log.Error("Pod/NodeBDs and AciL3Out VRF association is incorrect")
×
894
                                panic(err)
×
895
                        }
896
                }
897
        }
898

899
        if len(cont.config.ApicHosts) != 0 && cont.vmmClusterFaultSupported && !cont.config.ChainedMode {
1✔
900
                //Clear fault instances when the controller starts
×
901
                cont.clearFaultInstances()
×
902
                //Subscribe for vmmEpPD for a given domain
×
903
                var tnTargetFilterEpg string
×
904
                tnTargetFilterEpg += fmt.Sprintf("uni/vmmp-%s/dom-%s/", cont.vmmDomainProvider(), cont.config.AciVmmDomain)
×
905
                subnetTargetFilterEpg := fmt.Sprintf("and(wcard(vmmEpPD.dn,\"%s\"))", tnTargetFilterEpg)
×
906
                cont.apicConn.AddSubscriptionClass("vmmEpPD",
×
907
                        []string{"vmmEpPD"}, subnetTargetFilterEpg)
×
908
                cont.apicConn.SetSubscriptionHooks("vmmEpPD",
×
909
                        func(obj apicapi.ApicObject) bool {
×
910
                                cont.vmmEpPDChanged(obj)
×
911
                                return true
×
912
                        },
×
913
                        func(dn string) {
×
914
                                cont.vmmEpPDDeleted(dn)
×
915
                        })
×
916
        }
917

918
        cont.initStaticObjs()
1✔
919

1✔
920
        err = cont.env.PrepareRun(stopCh)
1✔
921
        if err != nil {
1✔
922
                panic(err.Error())
×
923
        }
924

925
        cont.apicConn.FullSyncHook = func() {
1✔
926
                // put a channel into each work queue and wait on it to
×
927
                // checkpoint object syncing in response to new subscription
×
928
                // updates
×
929
                cont.log.Debug("Starting checkpoint")
×
930
                var chans []chan struct{}
×
931
                qs := make([]workqueue.RateLimitingInterface, 0)
×
932
                _, ok := cont.env.(*K8sEnvironment)
×
933
                if ok {
×
934
                        qs = []workqueue.RateLimitingInterface{cont.podQueue}
×
935
                        if !cont.config.ChainedMode {
×
936
                                if !cont.config.DisableHppRendering {
×
937
                                        qs = append(qs, cont.netPolQueue)
×
938
                                }
×
939
                                if cont.config.EnableHppDirect {
×
940
                                        qs = append(qs, cont.remIpContQueue)
×
941
                                }
×
942
                                qs = append(qs, cont.qosQueue, cont.serviceQueue,
×
943
                                        cont.snatQueue, cont.netflowQueue, cont.snatNodeInfoQueue,
×
944
                                        cont.rdConfigQueue, cont.erspanQueue,
×
945
                                        cont.epgDnCacheUpdateQueue)
×
946
                        }
947
                }
948
                for _, q := range qs {
×
949
                        c := make(chan struct{})
×
950
                        chans = append(chans, c)
×
951
                        q.Add(c)
×
952
                }
×
953
                for _, c := range chans {
×
954
                        <-c
×
955
                }
×
956
                cont.log.Debug("Checkpoint complete")
×
957
        }
958

959
        if len(cont.config.ApicHosts) != 0 && !cont.config.ChainedMode {
1✔
960
                cont.BuildSubnetDnCache(cont.config.AciVrfDn, cont.config.AciVrfDn)
×
961
                cont.scheduleRdConfig()
×
962
                if strings.Contains(cont.config.Flavor, "openstack") {
×
963
                        cont.setOpenStackSystemId()
×
964
                }
×
965
        }
966

967
        if !cont.config.ChainedMode {
2✔
968
                if cont.config.AciPolicyTenant != cont.config.AciVrfTenant {
2✔
969
                        cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
970
                                []string{"hostprotPol"})
1✔
971
                }
1✔
972
        } else {
1✔
973
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
974
                        []string{"fvBD", "fvAp"})
1✔
975
                cont.apicConn.AddSubscriptionClass("fvnsVlanInstP",
1✔
976
                        []string{"fvnsVlanInstP"}, "")
1✔
977
                cont.apicConn.AddSubscriptionClass("infraRsDomP",
1✔
978
                        []string{"infraRsDomP"}, "")
1✔
979
                cont.apicConn.AddSubscriptionClass("physDomP",
1✔
980
                        []string{"physDomP"}, "")
1✔
981
                cont.apicConn.AddSubscriptionClass("l3extDomP",
1✔
982
                        []string{"l3extDomP"}, "")
1✔
983
                cont.apicConn.AddSubscriptionClass("infraRsVlanNs",
1✔
984
                        []string{"infraRsVlanNs"}, "")
1✔
985
                cont.apicConn.AddSubscriptionClass("infraGeneric",
1✔
986
                        []string{"infraGeneric", "infraRsFuncToEpg"}, "")
1✔
987
                cont.apicConn.AddSubscriptionClass("l3extOut",
1✔
988
                        []string{"l3extInstP", "l3extSubnet", "fvRsCons", "fvRsProv", "l3extRsEctx", "l3extRsL3DomAtt", "l3extLNodeP", "l3extRsNodeL3OutAtt", "ipRouteP", "ipNexthopP", "l3extLIfP", "l3extVirtualLIfP", "l3extRsDynPathAtt",
1✔
989
                                "l3extRsPathL3OutAtt", "l3extMember", "l3extIp", "bgpExtP", "bgpPeerP", "bgpAsP", "bgpLocalAsnP", "bgpRsPeerPfxPol"}, "")
1✔
990
                cont.apicConn.AddSubscriptionClass("bgpPeerPfxPol",
1✔
991
                        []string{"bgpPeerPfxPol"}, "")
1✔
992
        }
1✔
993
        if !cont.config.ChainedMode {
2✔
994
                // When a new class is added for subscriptio, check if its name attribute
1✔
995
                // is in the format aciPrefix-<some value>, if so add it in nameAttrClass
1✔
996
                // in apicapi.go
1✔
997
                subscribeMo := []string{"fvBD", "vnsLDevVip", "vnsAbsGraph", "vnsLDevCtx",
1✔
998
                        "vzFilter", "vzBrCP", "l3extInstP", "vnsSvcRedirectPol",
1✔
999
                        "vnsRedirectHealthGroup", "fvIPSLAMonitoringPol"}
1✔
1000
                if cont.config.AciPolicyTenant == cont.config.AciVrfTenant {
1✔
1001
                        subscribeMo = append(subscribeMo, "hostprotPol")
×
1002
                }
×
1003
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciVrfTenant,
1✔
1004
                        subscribeMo)
1✔
1005
                cont.apicConn.AddSubscriptionDn(fmt.Sprintf("uni/tn-%s/out-%s",
1✔
1006
                        cont.config.AciVrfTenant, cont.config.AciL3Out),
1✔
1007
                        []string{"fvRsCons"})
1✔
1008
                vmmDn := fmt.Sprintf("comp/prov-%s/ctrlr-[%s]-%s/injcont",
1✔
1009
                        cont.env.VmmPolicy(), cont.config.AciVmmDomain,
1✔
1010
                        cont.config.AciVmmController)
1✔
1011
                // Before subscribing to vmm objects, add vmmInjectedLabel as a child after explicit APIC version check
1✔
1012
                // Since it is not supported for APIC versions < "5.0"
1✔
1013
                cont.addVmmInjectedLabel()
1✔
1014
                cont.apicConn.AddSubscriptionDn(vmmDn,
1✔
1015
                        []string{"vmmInjectedHost", "vmmInjectedNs"})
1✔
1016

1✔
1017
                var tnTargetFilter string
1✔
1018
                if len(cont.config.AciVrfRelatedTenants) > 0 {
1✔
1019
                        for _, tn := range cont.config.AciVrfRelatedTenants {
×
1020
                                tnTargetFilter += fmt.Sprintf("tn-%s/|", tn)
×
1021
                        }
×
1022
                } else {
1✔
1023
                        tnTargetFilter += fmt.Sprintf("tn-%s/|tn-%s/",
1✔
1024
                                cont.config.AciPolicyTenant, cont.config.AciVrfTenant)
1✔
1025
                }
1✔
1026
                subnetTargetFilter := fmt.Sprintf("and(wcard(fvSubnet.dn,\"%s\"))",
1✔
1027
                        tnTargetFilter)
1✔
1028
                cont.apicConn.AddSubscriptionClass("fvSubnet",
1✔
1029
                        []string{"fvSubnet"}, subnetTargetFilter)
1✔
1030

1✔
1031
                cont.apicConn.SetSubscriptionHooks("fvSubnet",
1✔
1032
                        func(obj apicapi.ApicObject) bool {
1✔
1033
                                cont.SubnetChanged(obj, cont.config.AciVrfDn)
×
1034
                                return true
×
1035
                        },
×
1036
                        func(dn string) {
×
1037
                                cont.SubnetDeleted(dn)
×
1038
                        })
×
1039

1040
                cont.apicConn.AddSubscriptionClass("opflexODev",
1✔
1041
                        []string{"opflexODev"}, "")
1✔
1042

1✔
1043
                cont.apicConn.SetSubscriptionHooks("opflexODev",
1✔
1044
                        func(obj apicapi.ApicObject) bool {
1✔
1045
                                cont.opflexDeviceChanged(obj)
×
1046
                                return true
×
1047
                        },
×
1048
                        func(dn string) {
×
1049
                                cont.opflexDeviceDeleted(dn)
×
1050
                        })
×
1051

1052
                cont.apicConn.VersionUpdateHook =
1✔
1053
                        func() {
1✔
1054
                                cont.initStaticServiceObjs()
×
1055
                        }
×
1056
        }
1057
        go cont.apicConn.Run(stopCh)
1✔
1058
}
1059

1060
func (cont *AciController) syncNodeAciPods(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1061
        cont.log.Debug("Go routine to periodically check the aci pod information in the opflexOdev of a node")
1✔
1062
        ticker := time.NewTicker(seconds * time.Second)
1✔
1063
        defer ticker.Stop()
1✔
1064

1✔
1065
        for {
2✔
1066
                select {
1✔
1067
                case <-ticker.C:
1✔
1068
                        if cont.config.EnableOpflexAgentReconnect {
1✔
1069
                                cont.checkChangeOfOpflexOdevAciPod()
×
1070
                        }
×
1071
                        if cont.config.AciMultipod {
1✔
1072
                                cont.checkChangeOfOdevAciPod()
×
1073
                        }
×
1074
                case <-stopCh:
1✔
1075
                        return
1✔
1076
                }
1077
        }
1078
}
1079

1080
func (cont *AciController) syncOpflexDevices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1081
        cont.log.Debug("Go routine to periodically delete old opflexdevices started")
1✔
1082
        ticker := time.NewTicker(seconds * time.Second)
1✔
1083
        defer ticker.Stop()
1✔
1084

1✔
1085
        for {
2✔
1086
                select {
1✔
1087
                case <-ticker.C:
1✔
1088
                        cont.deleteOldOpflexDevices()
1✔
1089
                case <-stopCh:
1✔
1090
                        return
1✔
1091
                }
1092
        }
1093
}
1094

1095
func (cont *AciController) syncDelayedEpSlices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1096
        cont.log.Debug("Go routine to periodically check and process the epslices having delay adding in service")
1✔
1097
        ticker := time.NewTicker(seconds * time.Second)
1✔
1098
        defer ticker.Stop()
1✔
1099

1✔
1100
        for {
2✔
1101
                select {
1✔
1102
                case <-ticker.C:
1✔
1103
                        cont.processDelayedEpSlices()
1✔
1104
                case <-stopCh:
1✔
1105
                        return
1✔
1106
                }
1107
        }
1108
}
1109

1110
func (cont *AciController) snatGlobalInfoSync(stopCh <-chan struct{}, seconds int) {
1✔
1111
        time.Sleep(time.Duration(seconds) * time.Second)
1✔
1112
        cont.log.Debug("Go routine to periodically sync globalinfo and nodeinfo started")
1✔
1113
        iteration := 0
1✔
1114
        for {
2✔
1115
                // To avoid noisy logs, only printing once in 5 minutes
1✔
1116
                if iteration%5 == 0 {
2✔
1117
                        cont.log.Debug("Syncing GlobalInfo with Node infos")
1✔
1118
                }
1✔
1119
                var nodeInfos []*nodeinfo.NodeInfo
1✔
1120
                cont.indexMutex.Lock()
1✔
1121
                cache.ListAll(cont.snatNodeInfoIndexer, labels.Everything(),
1✔
1122
                        func(nodeInfoObj interface{}) {
2✔
1123
                                nodeInfo := nodeInfoObj.(*nodeinfo.NodeInfo)
1✔
1124
                                nodeInfos = append(nodeInfos, nodeInfo)
1✔
1125
                        })
1✔
1126
                expectedmap := make(map[string]map[string]bool)
1✔
1127
                for _, glinfo := range cont.snatGlobalInfoCache {
2✔
1128
                        for nodename, entry := range glinfo {
2✔
1129
                                if _, found := expectedmap[nodename]; !found {
2✔
1130
                                        newentry := make(map[string]bool)
1✔
1131
                                        newentry[entry.SnatPolicyName] = true
1✔
1132
                                        expectedmap[nodename] = newentry
1✔
1133
                                } else {
2✔
1134
                                        currententry := expectedmap[nodename]
1✔
1135
                                        currententry[entry.SnatPolicyName] = true
1✔
1136
                                        expectedmap[nodename] = currententry
1✔
1137
                                }
1✔
1138
                        }
1139
                }
1140
                cont.indexMutex.Unlock()
1✔
1141

1✔
1142
                for _, value := range nodeInfos {
2✔
1143
                        marked := false
1✔
1144
                        policyNames := value.Spec.SnatPolicyNames
1✔
1145
                        nodeName := value.ObjectMeta.Name
1✔
1146
                        _, ok := expectedmap[nodeName]
1✔
1147
                        if !ok && len(policyNames) > 0 {
2✔
1148
                                cont.log.Info("Adding missing entry in snatglobalinfo for node: ", nodeName)
1✔
1149
                                cont.log.Debug("No snat policies found in snatglobalinfo")
1✔
1150
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1151
                                marked = true
1✔
1152
                        } else if len(policyNames) != len(expectedmap[nodeName]) {
3✔
1153
                                cont.log.Info("Adding missing snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1154
                                cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1155
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1156
                                marked = true
1✔
1157
                        } else {
2✔
1158
                                if len(policyNames) == 0 && len(expectedmap[nodeName]) == 0 {
1✔
1159
                                        // No snatpolicies present
×
1160
                                        continue
×
1161
                                }
1162
                                eq := reflect.DeepEqual(expectedmap[nodeName], policyNames)
1✔
1163
                                if !eq {
2✔
1164
                                        cont.log.Debug("Syncing inconsistent snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1165
                                        cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1166
                                        cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1167
                                        marked = true
1✔
1168
                                }
1✔
1169
                        }
1170
                        if marked {
2✔
1171
                                cont.log.Info("Nodeinfo and globalinfo out of sync for node: ", nodeName)
1✔
1172
                                nodeinfokey, err := cache.MetaNamespaceKeyFunc(value)
1✔
1173
                                if err != nil {
1✔
1174
                                        cont.log.Error("Not able to get key for node: ", nodeName)
×
1175
                                        continue
×
1176
                                }
1177
                                cont.log.Info("Queuing nodeinfokey for globalinfo sync: ", nodeinfokey)
1✔
1178
                                cont.queueNodeInfoUpdateByKey(nodeinfokey)
1✔
1179
                        } else if iteration%5 == 0 {
2✔
1180
                                cont.log.Info("Nodeinfo and globalinfo in sync for node: ", nodeName)
1✔
1181
                        }
1✔
1182
                }
1183
                time.Sleep(time.Duration(seconds) * time.Second)
1✔
1184
                iteration++
1✔
1185
        }
1186
}
1187

1188
func (cont *AciController) processSyncQueue(queue workqueue.RateLimitingInterface,
1189
        queueStop <-chan struct{}) {
1✔
1190
        go wait.Until(func() {
2✔
1191
                for {
2✔
1192
                        syncType, quit := queue.Get()
1✔
1193
                        if quit {
2✔
1194
                                break
1✔
1195
                        }
1196
                        var requeue bool
1✔
1197
                        if sType, ok := syncType.(string); ok {
2✔
1198
                                if f, ok := cont.syncProcessors[sType]; ok {
2✔
1199
                                        requeue = f()
1✔
1200
                                }
1✔
1201
                        }
1202
                        if requeue {
1✔
1203
                                queue.AddRateLimited(syncType)
×
1204
                        } else {
1✔
1205
                                queue.Forget(syncType)
1✔
1206
                        }
1✔
1207
                        queue.Done(syncType)
1✔
1208
                }
1209
        }, time.Second, queueStop)
1210
        <-queueStop
1✔
1211
        queue.ShutDown()
1✔
1212
}
1213

1214
func (cont *AciController) scheduleSyncGlobalInfo() {
1✔
1215
        cont.syncQueue.AddRateLimited("snatGlobalInfo")
1✔
1216
}
1✔
1217
func (cont *AciController) scheduleRdConfig() {
×
1218
        cont.syncQueue.AddRateLimited("rdConfig")
×
1219
}
×
1220
func (cont *AciController) scheduleCreateIstioCR() {
×
1221
        cont.syncQueue.AddRateLimited("istioCR")
×
1222
}
×
1223

1224
func (cont *AciController) addVmmInjectedLabel() {
1✔
1225
        if apicapi.ApicVersion >= "5.2" {
1✔
1226
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedLabel")
×
1227
                if err != nil {
×
1228
                        panic(err.Error())
×
1229
                }
1230
                err = apicapi.AddMetaDataChild("vmmInjectedSvc", "vmmInjectedLabel")
×
1231
                if err != nil {
×
1232
                        panic(err.Error())
×
1233
                }
1234
        }
1235
        if apicapi.ApicVersion >= "5.0" {
2✔
1236
                err := apicapi.AddMetaDataChild("vmmInjectedReplSet", "vmmInjectedLabel")
1✔
1237
                if err != nil {
1✔
1238
                        panic(err.Error())
×
1239
                }
1240
                err = apicapi.AddMetaDataChild("vmmInjectedContGrp", "vmmInjectedLabel")
1✔
1241
                if err != nil {
1✔
1242
                        panic(err.Error())
×
1243
                }
1244
                err = apicapi.AddMetaDataChild("vmmInjectedDepl", "vmmInjectedLabel")
1✔
1245
                if err != nil {
1✔
1246
                        panic(err.Error())
×
1247
                }
1248
        }
1249
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc