• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 11373

17 Nov 2025 04:25AM UTC coverage: 63.619% (-0.2%) from 63.784%
11373

push

travis-pro

web-flow
Merge pull request #1636 from noironetworks/clean_sync_nads_and_apic

Fixed following things:

0 of 174 new or added lines in 2 files covered. (0.0%)

6 existing lines in 3 files now uncovered.

13369 of 21014 relevant lines covered (63.62%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.24
/pkg/controller/controller.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "encoding/json"
19
        "fmt"
20
        "net"
21
        "os"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/sirupsen/logrus"
29
        "github.com/yl2chen/cidranger"
30
        "golang.org/x/time/rate"
31

32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        v1net "k8s.io/api/networking/v1"
35
        "k8s.io/apimachinery/pkg/labels"
36
        "k8s.io/apimachinery/pkg/util/wait"
37
        "k8s.io/client-go/kubernetes"
38
        "k8s.io/client-go/tools/cache"
39
        "k8s.io/client-go/util/workqueue"
40

41
        "github.com/noironetworks/aci-containers/pkg/apicapi"
42
        fabattv1 "github.com/noironetworks/aci-containers/pkg/fabricattachment/apis/aci.fabricattachment/v1"
43
        fabattclset "github.com/noironetworks/aci-containers/pkg/fabricattachment/clientset/versioned"
44
        hppv1 "github.com/noironetworks/aci-containers/pkg/hpp/apis/aci.hpp/v1"
45
        "github.com/noironetworks/aci-containers/pkg/index"
46
        "github.com/noironetworks/aci-containers/pkg/ipam"
47
        istiov1 "github.com/noironetworks/aci-containers/pkg/istiocrd/apis/aci.istio/v1"
48
        "github.com/noironetworks/aci-containers/pkg/metadata"
49
        nodeinfo "github.com/noironetworks/aci-containers/pkg/nodeinfo/apis/aci.snat/v1"
50
        rdConfig "github.com/noironetworks/aci-containers/pkg/rdconfig/apis/aci.snat/v1"
51
        snatglobalinfo "github.com/noironetworks/aci-containers/pkg/snatglobalinfo/apis/aci.snat/v1"
52
        "github.com/noironetworks/aci-containers/pkg/util"
53
)
54

55
type podUpdateFunc func(*v1.Pod) (*v1.Pod, error)
56
type nodeUpdateFunc func(*v1.Node) (*v1.Node, error)
57
type serviceUpdateFunc func(*v1.Service) (*v1.Service, error)
58
type listNetworkPoliciesFunc func(string) (*v1net.NetworkPolicyList, error)
59
type listNamespacesFunc func() (*v1.NamespaceList, error)
60

61
type AciController struct {
62
        log    *logrus.Logger
63
        config *ControllerConfig
64
        env    Environment
65

66
        defaultEg string
67
        defaultSg string
68

69
        unitTestMode bool
70

71
        podQueue               workqueue.RateLimitingInterface
72
        netPolQueue            workqueue.RateLimitingInterface
73
        qosQueue               workqueue.RateLimitingInterface
74
        serviceQueue           workqueue.RateLimitingInterface
75
        snatQueue              workqueue.RateLimitingInterface
76
        netflowQueue           workqueue.RateLimitingInterface
77
        erspanQueue            workqueue.RateLimitingInterface
78
        snatNodeInfoQueue      workqueue.RateLimitingInterface
79
        rdConfigQueue          workqueue.RateLimitingInterface
80
        istioQueue             workqueue.RateLimitingInterface
81
        nodeFabNetAttQueue     workqueue.RateLimitingInterface
82
        netFabConfigQueue      workqueue.RateLimitingInterface
83
        nadVlanMapQueue        workqueue.RateLimitingInterface
84
        fabricVlanPoolQueue    workqueue.RateLimitingInterface
85
        netFabL3ConfigQueue    workqueue.RateLimitingInterface
86
        remIpContQueue         workqueue.RateLimitingInterface
87
        epgDnCacheUpdateQueue  workqueue.RateLimitingInterface
88
        aaepMonitorConfigQueue workqueue.RateLimitingInterface
89

90
        namespaceIndexer                     cache.Indexer
91
        namespaceInformer                    cache.Controller
92
        podIndexer                           cache.Indexer
93
        podInformer                          cache.Controller
94
        endpointsIndexer                     cache.Indexer
95
        endpointsInformer                    cache.Controller
96
        serviceIndexer                       cache.Indexer
97
        serviceInformer                      cache.Controller
98
        replicaSetIndexer                    cache.Indexer
99
        replicaSetInformer                   cache.Controller
100
        deploymentIndexer                    cache.Indexer
101
        deploymentInformer                   cache.Controller
102
        nodeIndexer                          cache.Indexer
103
        nodeInformer                         cache.Controller
104
        networkPolicyIndexer                 cache.Indexer
105
        networkPolicyInformer                cache.Controller
106
        snatIndexer                          cache.Indexer
107
        snatInformer                         cache.Controller
108
        snatNodeInfoIndexer                  cache.Indexer
109
        snatNodeInformer                     cache.Controller
110
        snatLocalInfoInformer                cache.Controller
111
        crdInformer                          cache.Controller
112
        rdConfigInformer                     cache.Controller
113
        rdConfigIndexer                      cache.Indexer
114
        qosIndexer                           cache.Indexer
115
        qosInformer                          cache.Controller
116
        netflowIndexer                       cache.Indexer
117
        netflowInformer                      cache.Controller
118
        erspanIndexer                        cache.Indexer
119
        erspanInformer                       cache.Controller
120
        nodePodIfIndexer                     cache.Indexer
121
        nodePodIfInformer                    cache.Controller
122
        istioIndexer                         cache.Indexer
123
        istioInformer                        cache.Controller
124
        endpointSliceIndexer                 cache.Indexer
125
        endpointSliceInformer                cache.Controller
126
        snatCfgInformer                      cache.Controller
127
        updatePod                            podUpdateFunc
128
        updateNode                           nodeUpdateFunc
129
        updateServiceStatus                  serviceUpdateFunc
130
        listNetworkPolicies                  listNetworkPoliciesFunc
131
        listNamespaces                       listNamespacesFunc
132
        nodeFabNetAttInformer                cache.SharedIndexInformer
133
        netFabConfigInformer                 cache.SharedIndexInformer
134
        nadVlanMapInformer                   cache.SharedIndexInformer
135
        fabricVlanPoolInformer               cache.SharedIndexInformer
136
        networkFabricL3ConfigurationInformer cache.SharedIndexInformer
137
        fabNetAttClient                      *fabattclset.Clientset
138
        proactiveConfInformer                cache.SharedIndexInformer
139
        aaepMonitorInformer                  cache.SharedIndexInformer
140
        poster                               *EventPoster
141

142
        indexMutex sync.Mutex
143
        hppMutex   sync.Mutex
144

145
        configuredPodNetworkIps *netIps
146
        podNetworkIps           *netIps
147
        serviceIps              *ipam.IpCache
148
        staticServiceIps        *netIps
149
        nodeServiceIps          *netIps
150

151
        // index of pods matched by deployments
152
        depPods *index.PodSelectorIndex
153
        // index of pods matched by network policies
154
        netPolPods *index.PodSelectorIndex
155
        // index of pods matched by network policy ingress rules
156
        netPolIngressPods *index.PodSelectorIndex
157
        // index of pods matched by network policy egress rules
158
        netPolEgressPods *index.PodSelectorIndex
159
        // index of IP addresses contained in endpoints objects
160
        endpointsIpIndex cidranger.Ranger
161
        // index of service target ports
162
        targetPortIndex map[string]*portIndexEntry
163
        // index of ip blocks referenced by network policy egress rules
164
        netPolSubnetIndex cidranger.Ranger
165
        // index of pods matched by erspan policies
166
        erspanPolPods *index.PodSelectorIndex
167

168
        apicConn *apicapi.ApicConnection
169

170
        nodeServiceMetaCache map[string]*nodeServiceMeta
171
        nodeACIPod           map[string]aciPodAnnot
172
        nodeACIPodAnnot      map[string]aciPodAnnot
173
        nodeOpflexDevice     map[string]apicapi.ApicSlice
174
        nodePodNetCache      map[string]*nodePodNetMeta
175
        serviceMetaCache     map[string]*serviceMeta
176
        snatPolicyCache      map[string]*ContSnatPolicy
177
        delayedEpSlices      []*DelayedEpSlice
178
        snatServices         map[string]bool
179
        snatNodeInfoCache    map[string]*nodeinfo.NodeInfo
180
        rdConfigCache        map[string]*rdConfig.RdConfig
181
        rdConfigSubnetCache  map[string]*rdConfig.RdConfigSpec
182
        istioCache           map[string]*istiov1.AciIstioOperator
183
        podIftoEp            map[string]*EndPointData
184
        // Node Name and Policy Name
185
        snatGlobalInfoCache map[string]map[string]*snatglobalinfo.GlobalInfo
186
        nodeSyncEnabled     bool
187
        serviceSyncEnabled  bool
188
        snatSyncEnabled     bool
189
        syncQueue           workqueue.RateLimitingInterface
190
        syncProcessors      map[string]func() bool
191
        serviceEndPoints    ServiceEndPointType
192
        crdHandlers         map[string]func(*AciController, <-chan struct{})
193
        stopCh              <-chan struct{}
194
        //index of containerportname to ctrPortNameEntry
195
        ctrPortNameCache map[string]*ctrPortNameEntry
196
        // named networkPolicies
197
        nmPortNp map[string]bool
198
        //maps network policy hash to hpp
199
        hppRef map[string]hppReference
200
        //map for ns to remoteIpConts
201
        nsRemoteIpCont map[string]remoteIpConts
202
        // cache to look for Epg DNs which are bound to Vmm domain
203
        cachedEpgDns             []string
204
        vmmClusterFaultSupported bool
205
        additionalNetworkCache   map[string]*AdditionalNetworkMeta
206
        //Used in Shared mode
207
        sharedEncapCache       map[int]*sharedEncapData
208
        sharedEncapAepCache    map[string]map[int]bool
209
        sharedEncapSviCache    map[int]*NfL3Data
210
        sharedEncapVrfCache    map[string]*NfVrfData
211
        sharedEncapTenantCache map[string]*NfTenantData
212
        nfl3configGenerationId int64
213
        // vlan to propertiesList
214
        sharedEncapNfcCache         map[int]*NfcData
215
        sharedEncapNfcVlanMap       map[int]*NfcData
216
        sharedEncapNfcLabelMap      map[string]*NfcData
217
        sharedEncapNfcAppProfileMap map[string]bool
218
        // nadVlanMap encapLabel to vlan
219
        sharedEncapLabelMap      map[string][]int
220
        lldpIfCache              map[string]*NfLLDPIfData
221
        globalVlanConfig         globalVlanConfig
222
        fabricVlanPoolMap        map[string]map[string]string
223
        openStackFabricPathDnMap map[string]openstackOpflexOdevInfo
224
        hostFabricPathDnMap      map[string]hostFabricInfo
225
        openStackSystemId        string
226
        sharedAaepMonitor        map[string]map[string]*AaepEpgAttachData
227
}
228

229
type hostFabricInfo struct {
230
        fabricPathDn string
231
        host         string
232
        vpcIfDn      map[string]struct{}
233
}
234

235
type NfLLDPIfData struct {
236
        LLDPIf string
237
        // As of now, manage at the NAD level
238
        // more granular introduces intf tracking complexities
239
        // for not sufficient benefits
240
        Refs map[string]bool
241
}
242

243
type NfL3OutData struct {
244
        // +kubebuilder:validation:Enum:"import"
245
        RtCtrl     string
246
        PodId      int
247
        RtrNodeMap map[int]*fabattv1.FabricL3OutRtrNode
248
        ExtEpgMap  map[string]*fabattv1.PolicyPrefixGroup
249
        SviMap     map[int]bool
250
}
251

252
type NfTenantData struct {
253
        CommonTenant     bool
254
        L3OutConfig      map[string]*NfL3OutData
255
        BGPPeerPfxConfig map[string]*fabattv1.BGPPeerPrefixPolicy
256
}
257

258
type NfVrfData struct {
259
        TenantConfig map[string]*NfTenantData
260
}
261

262
type NfL3Networks struct {
263
        fabattv1.PrimaryNetwork
264
        Subnets map[string]*fabattv1.FabricL3Subnet
265
}
266

267
type NfL3Data struct {
268
        Tenant      string
269
        Vrf         fabattv1.VRF
270
        PodId       int
271
        ConnectedNw *NfL3Networks
272
        NetAddr     map[string]*RoutedNetworkData
273
        Nodes       map[int]fabattv1.FabricL3OutNode
274
}
275

276
// maps pod name to remoteIpCont
277
type remoteIpConts map[string]remoteIpCont
278

279
// remoteIpCont maps ip to pod labels
280
type remoteIpCont map[string]map[string]string
281

282
type NfcData struct {
283
        Aeps map[string]bool
284
        Epg  fabattv1.Epg
285
}
286

287
type sharedEncapData struct {
288
        //node to NAD to pods
289
        Pods   map[string]map[string][]string
290
        NetRef map[string]*AdditionalNetworkMeta
291
        Aeps   map[string]bool
292
}
293

294
type globalVlanConfig struct {
295
        SharedPhysDom apicapi.ApicObject
296
        SharedL3Dom   apicapi.ApicObject
297
}
298

299
type hppReference struct {
300
        RefCount uint              `json:"ref-count,omitempty"`
301
        Npkeys   []string          `json:"npkeys,omitempty"`
302
        HppObj   apicapi.ApicSlice `json:"hpp-obj,omitempty"`
303
        HppCr    hppv1.HostprotPol `json:"hpp-cr,omitempty"`
304
}
305

306
type DelayedEpSlice struct {
307
        ServiceKey  string
308
        OldEpSlice  *discovery.EndpointSlice
309
        NewEpSlice  *discovery.EndpointSlice
310
        DelayedTime time.Time
311
}
312

313
type aciPodAnnot struct {
314
        aciPod         string
315
        disconnectTime time.Time
316
        lastErrorTime  time.Time
317
}
318

319
type nodeServiceMeta struct {
320
        serviceEp metadata.ServiceEndpoint
321
}
322

323
type nodePodNetMeta struct {
324
        nodePods            map[string]bool
325
        podNetIps           metadata.NetIps
326
        podNetIpsAnnotation string
327
}
328

329
type openstackOpflexOdevInfo struct {
330
        opflexODevDn map[string]struct{}
331
        fabricPathDn string
332
}
333

334
type serviceMeta struct {
335
        requestedIps     []net.IP
336
        ingressIps       []net.IP
337
        staticIngressIps []net.IP
338
}
339

340
type ipIndexEntry struct {
341
        ipNet net.IPNet
342
        keys  map[string]bool
343
}
344

345
type targetPort struct {
346
        proto v1.Protocol
347
        ports []int
348
}
349

350
type portIndexEntry struct {
351
        port              targetPort
352
        serviceKeys       map[string]bool
353
        networkPolicyKeys map[string]bool
354
}
355

356
type portRangeSnat struct {
357
        start int
358
        end   int
359
}
360

361
// EndPointData holds PodIF data in controller.
362
type EndPointData struct {
363
        MacAddr    string
364
        EPG        string
365
        Namespace  string
366
        AppProfile string
367
}
368

369
type ctrPortNameEntry struct {
370
        // Proto+port->pods
371
        ctrNmpToPods map[string]map[string]bool
372
}
373

374
type LinkData struct {
375
        Link []string
376
        Pods []string
377
}
378

379
type RoutedNodeData struct {
380
        addr string
381
        idx  int
382
}
383

384
type RoutedNetworkData struct {
385
        subnet       string
386
        netAddress   string
387
        maskLen      int
388
        numAllocated int
389
        maxAddresses int
390
        baseAddress  net.IP
391
        nodeMap      map[string]RoutedNodeData
392
        availableMap map[int]bool
393
}
394

395
type AdditionalNetworkMeta struct {
396
        NetworkName string
397
        EncapVlan   string
398
        //node+localiface->fabricLinks
399
        FabricLink map[string]map[string]LinkData
400
        NodeCache  map[string]*fabattv1.NodeFabricNetworkAttachment
401
        Mode       util.EncapMode
402
}
403

404
type ServiceEndPointType interface {
405
        InitClientInformer(kubeClient *kubernetes.Clientset)
406
        Run(stopCh <-chan struct{})
407
        Wait(stopCh <-chan struct{})
408
        UpdateServicesForNode(nodename string)
409
        GetnodesMetadata(key string, service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint)
410
        SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool
411
        SetNpServiceAugmentForService(servicekey string, service *v1.Service, prs *portRemoteSubnet,
412
                portAugments map[string]*portServiceAugment, subnetIndex cidranger.Ranger, logger *logrus.Entry)
413
}
414

415
type serviceEndpoint struct {
416
        cont *AciController
417
}
418
type serviceEndpointSlice struct {
419
        cont *AciController
420
}
421

422
type AaepEpgAttachData struct {
423
        encapVlan     int
424
        nadName       string
425
        namespaceName string
426
        nadCreated    bool
427
}
428

429
type EpgVlanMap struct {
430
        epgDn     string
431
        encapVlan int
432
}
433

434
func (sep *serviceEndpoint) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
435
        sep.cont.initEndpointsInformerFromClient(kubeClient)
×
436
}
×
437

438
func (seps *serviceEndpointSlice) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
439
        seps.cont.initEndpointSliceInformerFromClient(kubeClient)
×
440
}
×
441

442
func (sep *serviceEndpoint) Run(stopCh <-chan struct{}) {
1✔
443
        go sep.cont.endpointsInformer.Run(stopCh)
1✔
444
}
1✔
445

446
func (seps *serviceEndpointSlice) Run(stopCh <-chan struct{}) {
1✔
447
        go seps.cont.endpointSliceInformer.Run(stopCh)
1✔
448
}
1✔
449

450
func (sep *serviceEndpoint) Wait(stopCh <-chan struct{}) {
1✔
451
        cache.WaitForCacheSync(stopCh,
1✔
452
                sep.cont.endpointsInformer.HasSynced,
1✔
453
                sep.cont.serviceInformer.HasSynced)
1✔
454
}
1✔
455

456
func (seps *serviceEndpointSlice) Wait(stopCh <-chan struct{}) {
1✔
457
        seps.cont.log.Debug("Waiting for EndPointSlicecache sync")
1✔
458
        cache.WaitForCacheSync(stopCh,
1✔
459
                seps.cont.endpointSliceInformer.HasSynced,
1✔
460
                seps.cont.serviceInformer.HasSynced)
1✔
461
}
1✔
462

463
func (e *ipIndexEntry) Network() net.IPNet {
1✔
464
        return e.ipNet
1✔
465
}
1✔
466

467
func newNodePodNetMeta() *nodePodNetMeta {
1✔
468
        return &nodePodNetMeta{
1✔
469
                nodePods: make(map[string]bool),
1✔
470
        }
1✔
471
}
1✔
472

473
func createQueue(name string) workqueue.RateLimitingInterface {
1✔
474
        return workqueue.NewNamedRateLimitingQueue(
1✔
475
                workqueue.NewMaxOfRateLimiter(
1✔
476
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
477
                                10*time.Second),
1✔
478
                        &workqueue.BucketRateLimiter{
1✔
479
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
480
                        },
1✔
481
                ),
1✔
482
                "delta")
1✔
483
}
1✔
484

485
func NewController(config *ControllerConfig, env Environment, log *logrus.Logger, unittestmode bool) *AciController {
1✔
486
        cont := &AciController{
1✔
487
                log:          log,
1✔
488
                config:       config,
1✔
489
                env:          env,
1✔
490
                defaultEg:    "",
1✔
491
                defaultSg:    "",
1✔
492
                unitTestMode: unittestmode,
1✔
493

1✔
494
                podQueue:               createQueue("pod"),
1✔
495
                netPolQueue:            createQueue("networkPolicy"),
1✔
496
                qosQueue:               createQueue("qos"),
1✔
497
                netflowQueue:           createQueue("netflow"),
1✔
498
                erspanQueue:            createQueue("erspan"),
1✔
499
                serviceQueue:           createQueue("service"),
1✔
500
                snatQueue:              createQueue("snat"),
1✔
501
                snatNodeInfoQueue:      createQueue("snatnodeinfo"),
1✔
502
                rdConfigQueue:          createQueue("rdconfig"),
1✔
503
                istioQueue:             createQueue("istio"),
1✔
504
                nodeFabNetAttQueue:     createQueue("nodefabricnetworkattachment"),
1✔
505
                netFabConfigQueue:      createQueue("networkfabricconfiguration"),
1✔
506
                nadVlanMapQueue:        createQueue("nadvlanmap"),
1✔
507
                fabricVlanPoolQueue:    createQueue("fabricvlanpool"),
1✔
508
                netFabL3ConfigQueue:    createQueue("networkfabricl3configuration"),
1✔
509
                remIpContQueue:         createQueue("remoteIpContainer"),
1✔
510
                epgDnCacheUpdateQueue:  createQueue("epgDnCache"),
1✔
511
                aaepMonitorConfigQueue: createQueue("aaepepgmap"),
1✔
512
                syncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
513
                        &workqueue.BucketRateLimiter{
1✔
514
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
515
                        }, "sync"),
1✔
516

1✔
517
                configuredPodNetworkIps: newNetIps(),
1✔
518
                podNetworkIps:           newNetIps(),
1✔
519
                serviceIps:              ipam.NewIpCache(),
1✔
520
                staticServiceIps:        newNetIps(),
1✔
521
                nodeServiceIps:          newNetIps(),
1✔
522

1✔
523
                nodeACIPod:       make(map[string]aciPodAnnot),
1✔
524
                nodeACIPodAnnot:  make(map[string]aciPodAnnot),
1✔
525
                nodeOpflexDevice: make(map[string]apicapi.ApicSlice),
1✔
526

1✔
527
                nodeServiceMetaCache:        make(map[string]*nodeServiceMeta),
1✔
528
                nodePodNetCache:             make(map[string]*nodePodNetMeta),
1✔
529
                serviceMetaCache:            make(map[string]*serviceMeta),
1✔
530
                snatPolicyCache:             make(map[string]*ContSnatPolicy),
1✔
531
                snatServices:                make(map[string]bool),
1✔
532
                snatNodeInfoCache:           make(map[string]*nodeinfo.NodeInfo),
1✔
533
                rdConfigCache:               make(map[string]*rdConfig.RdConfig),
1✔
534
                rdConfigSubnetCache:         make(map[string]*rdConfig.RdConfigSpec),
1✔
535
                podIftoEp:                   make(map[string]*EndPointData),
1✔
536
                snatGlobalInfoCache:         make(map[string]map[string]*snatglobalinfo.GlobalInfo),
1✔
537
                istioCache:                  make(map[string]*istiov1.AciIstioOperator),
1✔
538
                crdHandlers:                 make(map[string]func(*AciController, <-chan struct{})),
1✔
539
                ctrPortNameCache:            make(map[string]*ctrPortNameEntry),
1✔
540
                nmPortNp:                    make(map[string]bool),
1✔
541
                hppRef:                      make(map[string]hppReference),
1✔
542
                additionalNetworkCache:      make(map[string]*AdditionalNetworkMeta),
1✔
543
                sharedEncapCache:            make(map[int]*sharedEncapData),
1✔
544
                sharedEncapAepCache:         make(map[string]map[int]bool),
1✔
545
                sharedEncapSviCache:         make(map[int]*NfL3Data),
1✔
546
                sharedEncapVrfCache:         make(map[string]*NfVrfData),
1✔
547
                sharedEncapTenantCache:      make(map[string]*NfTenantData),
1✔
548
                sharedEncapNfcCache:         make(map[int]*NfcData),
1✔
549
                sharedEncapNfcVlanMap:       make(map[int]*NfcData),
1✔
550
                sharedEncapNfcLabelMap:      make(map[string]*NfcData),
1✔
551
                sharedEncapNfcAppProfileMap: make(map[string]bool),
1✔
552
                sharedEncapLabelMap:         make(map[string][]int),
1✔
553
                lldpIfCache:                 make(map[string]*NfLLDPIfData),
1✔
554
                fabricVlanPoolMap:           make(map[string]map[string]string),
1✔
555
                openStackFabricPathDnMap:    make(map[string]openstackOpflexOdevInfo),
1✔
556
                hostFabricPathDnMap:         make(map[string]hostFabricInfo),
1✔
557
                nsRemoteIpCont:              make(map[string]remoteIpConts),
1✔
558
                sharedAaepMonitor:           make(map[string]map[string]*AaepEpgAttachData),
1✔
559
        }
1✔
560
        cont.syncProcessors = map[string]func() bool{
1✔
561
                "snatGlobalInfo": cont.syncSnatGlobalInfo,
1✔
562
                "rdConfig":       cont.syncRdConfig,
1✔
563
                /* Commenting code to remove dependency from istio.io/istio package.
1✔
564
                   Vulnerabilties were detected by quay.io security scan of aci-containers-controller
1✔
565
                   and aci-containers-operator images for istio.io/istio package
1✔
566

1✔
567
                "istioCR":        cont.createIstioCR,
1✔
568
                */
1✔
569
        }
1✔
570
        return cont
1✔
571
}
1✔
572

573
func (cont *AciController) Init() {
×
574
        if cont.config.ChainedMode {
×
575
                cont.log.Info("In chained mode")
×
576
        }
×
577
        if cont.config.VmmLite {
×
578
                cont.log.Info("In VMM lite mode")
×
579
        }
×
580

581
        egdata, err := json.Marshal(cont.config.DefaultEg)
×
582
        if err != nil {
×
583
                cont.log.Error("Could not serialize default endpoint group")
×
584
                panic(err.Error())
×
585
        }
586
        cont.defaultEg = string(egdata)
×
587

×
588
        sgdata, err := json.Marshal(cont.config.DefaultSg)
×
589
        if err != nil {
×
590
                cont.log.Error("Could not serialize default security groups")
×
591
                panic(err.Error())
×
592
        }
593
        cont.defaultSg = string(sgdata)
×
594

×
595
        cont.log.Debug("Initializing IPAM")
×
596
        cont.initIpam()
×
597
        // check if the cluster supports endpoint slices
×
598
        // if cluster doesn't have the support fallback to endpoints
×
599
        kubeClient := cont.env.(*K8sEnvironment).kubeClient
×
600
        if util.IsEndPointSlicesSupported(kubeClient) {
×
601
                cont.serviceEndPoints = &serviceEndpointSlice{}
×
602
                cont.serviceEndPoints.(*serviceEndpointSlice).cont = cont
×
603
                cont.log.Info("Initializing ServiceEndpointSlices")
×
604
        } else {
×
605
                cont.serviceEndPoints = &serviceEndpoint{}
×
606
                cont.serviceEndPoints.(*serviceEndpoint).cont = cont
×
607
                cont.log.Info("Initializing ServiceEndpoints")
×
608
        }
×
609

610
        err = cont.env.Init(cont)
×
611
        if err != nil {
×
612
                panic(err.Error())
×
613
        }
614
}
615

616
func (cont *AciController) processQueue(queue workqueue.RateLimitingInterface,
617
        store cache.Store, handler func(interface{}) bool,
618
        deleteHandler func(string) bool,
619
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
620
        go wait.Until(func() {
2✔
621
                for {
2✔
622
                        key, quit := queue.Get()
1✔
623
                        if quit {
2✔
624
                                break
1✔
625
                        }
626

627
                        var requeue bool
1✔
628
                        switch key := key.(type) {
1✔
629
                        case chan struct{}:
×
630
                                close(key)
×
631
                        case string:
1✔
632
                                if strings.HasPrefix(key, "DELETED_") {
2✔
633
                                        delKey := strings.Trim(key, "DELETED_")
1✔
634
                                        requeue = deleteHandler(delKey)
1✔
635
                                } else {
2✔
636
                                        obj, exists, err := store.GetByKey(key)
1✔
637
                                        if err != nil {
1✔
638
                                                cont.log.Debugf("Error fetching object with key %s from store: %v", key, err)
×
639
                                        }
×
640
                                        //Handle Add/Update/Delete
641
                                        if exists && handler != nil {
2✔
642
                                                requeue = handler(obj)
1✔
643
                                        }
1✔
644
                                        //Handle Post Delete
645
                                        if !exists && postDelHandler != nil {
1✔
646
                                                requeue = postDelHandler()
×
647
                                        }
×
648
                                }
649
                        }
650
                        if requeue {
2✔
651
                                queue.AddRateLimited(key)
1✔
652
                        } else {
2✔
653
                                queue.Forget(key)
1✔
654
                        }
1✔
655
                        queue.Done(key)
1✔
656
                }
657
        }, time.Second, stopCh)
658
        <-stopCh
1✔
659
        queue.ShutDown()
1✔
660
}
661

662
func (cont *AciController) processRemIpContQueue(queue workqueue.RateLimitingInterface,
663
        handler func(interface{}) bool,
664
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
665
        go wait.Until(func() {
2✔
666
                for {
2✔
667
                        key, quit := queue.Get()
1✔
668
                        if quit {
2✔
669
                                break
1✔
670
                        }
671

672
                        var requeue bool
1✔
673
                        switch key := key.(type) {
1✔
674
                        case chan struct{}:
×
675
                                close(key)
×
676
                        case string:
1✔
677
                                if handler != nil {
2✔
678
                                        requeue = handler(key)
1✔
679
                                }
1✔
680
                                if postDelHandler != nil {
2✔
681
                                        requeue = postDelHandler()
1✔
682
                                }
1✔
683
                        }
684
                        if requeue {
1✔
685
                                queue.AddRateLimited(key)
×
686
                        } else {
1✔
687
                                queue.Forget(key)
1✔
688
                        }
1✔
689
                        queue.Done(key)
1✔
690

691
                }
692
        }, time.Second, stopCh)
693
        <-stopCh
1✔
694
        queue.ShutDown()
1✔
695
}
696

697
func (cont *AciController) processEpgDnCacheUpdateQueue(queue workqueue.RateLimitingInterface,
698
        handler func(interface{}) bool,
699
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
700
        go wait.Until(func() {
2✔
701
                for {
2✔
702
                        key, quit := queue.Get()
1✔
703
                        if quit {
2✔
704
                                break
1✔
705
                        }
706

707
                        var requeue bool
1✔
708
                        switch key := key.(type) {
1✔
709
                        case chan struct{}:
×
710
                                close(key)
×
711
                        case bool:
1✔
712
                                if handler != nil {
2✔
713
                                        requeue = handler(key)
1✔
714
                                }
1✔
715
                                if postDelHandler != nil {
1✔
716
                                        requeue = postDelHandler()
×
717
                                }
×
718
                        }
719
                        if requeue {
1✔
720
                                queue.AddRateLimited(key)
×
721
                        } else {
1✔
722
                                queue.Forget(key)
1✔
723
                        }
1✔
724
                        queue.Done(key)
1✔
725

726
                }
727
        }, time.Second, stopCh)
728
        <-stopCh
1✔
729
        queue.ShutDown()
1✔
730
}
731

732
func (cont *AciController) globalStaticObjs() apicapi.ApicSlice {
1✔
733
        return apicapi.ApicSlice{}
1✔
734
}
1✔
735

736
func (cont *AciController) aciNameForKey(ktype, key string) string {
1✔
737
        return util.AciNameForKey(cont.config.AciPrefix, ktype, key)
1✔
738
}
1✔
739

740
func (cont *AciController) initStaticObjs() {
1✔
741
        cont.env.InitStaticAciObjects()
1✔
742
        cont.apicConn.WriteStaticApicObjects(cont.config.AciPrefix+"_static",
1✔
743
                cont.globalStaticObjs())
1✔
744
}
1✔
745

746
func (cont *AciController) vmmDomainProvider() (vmmProv string) {
1✔
747
        vmmProv = "Kubernetes"
1✔
748
        if strings.ToLower(cont.config.AciVmmDomainType) == "openshift" {
1✔
749
                vmmProv = "OpenShift"
×
750
        }
×
751
        return
1✔
752
}
753

754
func (cont *AciController) Run(stopCh <-chan struct{}) {
1✔
755
        var err error
1✔
756
        var privKey []byte
1✔
757
        var apicCert []byte
1✔
758

1✔
759
        cont.config.AciVrfDn = "uni/tn-" + cont.config.AciVrfTenant + "/ctx-" + cont.config.AciVrf
1✔
760

1✔
761
        if cont.config.ApicPrivateKeyPath != "" {
1✔
762
                privKey, err = os.ReadFile(cont.config.ApicPrivateKeyPath)
×
763
                if err != nil {
×
764
                        panic(err)
×
765
                }
766
        }
767
        if cont.config.ApicCertPath != "" {
1✔
768
                apicCert, err = os.ReadFile(cont.config.ApicCertPath)
×
769
                if err != nil {
×
770
                        panic(err)
×
771
                }
772
        }
773
        // If not defined, default is 1800
774
        if cont.config.ApicRefreshTimer == "" {
2✔
775
                cont.config.ApicRefreshTimer = "1800"
1✔
776
        }
1✔
777
        refreshTimeout, err := strconv.Atoi(cont.config.ApicRefreshTimer)
1✔
778
        if err != nil {
1✔
779
                panic(err)
×
780
        }
781
        cont.log.Info("ApicRefreshTimer conf is set to: ", refreshTimeout)
1✔
782

1✔
783
        // Bailout if the refreshTimeout is more than 12Hours or less than 5Mins
1✔
784
        if refreshTimeout > (12*60*60) || refreshTimeout < (5*60) {
1✔
785
                cont.log.Info("ApicRefreshTimer can't be more than 12Hrs or less than 5Mins")
×
786
                panic(err)
×
787
        }
788

789
        // If RefreshTickerAdjustInterval is not defined, default to 150Sec.
790
        if cont.config.ApicRefreshTickerAdjust == "" {
2✔
791
                cont.config.ApicRefreshTickerAdjust = "210"
1✔
792
        }
1✔
793
        refreshTickerAdjust, err := strconv.Atoi(cont.config.ApicRefreshTickerAdjust)
1✔
794
        if err != nil {
1✔
795
                panic(err)
×
796
        }
797

798
        //If ApicSubscriptionDelay is not defined, default to 100ms
799
        if cont.config.ApicSubscriptionDelay == 0 {
2✔
800
                cont.config.ApicSubscriptionDelay = 100
1✔
801
        }
1✔
802
        cont.log.Info("ApicSubscriptionDelay conf is set to: ", cont.config.ApicSubscriptionDelay)
1✔
803

1✔
804
        // If OpflexDeviceDeleteTimeout is not defined, default to 1800s
1✔
805
        if cont.config.OpflexDeviceDeleteTimeout == 0 {
2✔
806
                cont.config.OpflexDeviceDeleteTimeout = 1800
1✔
807
        }
1✔
808

809
        // If OpflexDeviceReconnectWaitTimeout is not defined, default to 25s
810
        if cont.config.OpflexDeviceReconnectWaitTimeout == 0 {
2✔
811
                cont.config.OpflexDeviceReconnectWaitTimeout = 25
1✔
812
        }
1✔
813
        cont.log.Debug("OpflexDeviceReconnectWaitTimeout set to: ", cont.config.OpflexDeviceReconnectWaitTimeout)
1✔
814

1✔
815
        // If SleepTimeSnatGlobalInfoSync is not defined, default to 60
1✔
816
        if cont.config.SleepTimeSnatGlobalInfoSync == 0 {
2✔
817
                cont.config.SleepTimeSnatGlobalInfoSync = 60
1✔
818
        }
1✔
819

820
        // If not defined, default to 32
821
        if cont.config.PodIpPoolChunkSize == 0 {
2✔
822
                cont.config.PodIpPoolChunkSize = 32
1✔
823
        }
1✔
824
        if !cont.isCNOEnabled() {
2✔
825
                cont.log.Info("PodIpPoolChunkSize conf is set to: ", cont.config.PodIpPoolChunkSize)
1✔
826
        }
1✔
827

828
        // If ApicConnectionRetryLimit is not defined, default to 5
829
        if cont.config.ApicConnectionRetryLimit == 0 {
2✔
830
                cont.config.ApicConnectionRetryLimit = 5
1✔
831
        }
1✔
832
        cont.log.Debug("ApicConnectionRetryLimit set to: ", cont.config.ApicConnectionRetryLimit)
1✔
833

1✔
834
        // If not valid, default to 5000-65000
1✔
835
        // other permissible values 1-65000
1✔
836
        defStart := 5000
1✔
837
        defEnd := 65000
1✔
838
        if cont.config.SnatDefaultPortRangeStart == 0 {
2✔
839
                cont.config.SnatDefaultPortRangeStart = defStart
1✔
840
        }
1✔
841
        if cont.config.SnatDefaultPortRangeEnd == 0 {
2✔
842
                cont.config.SnatDefaultPortRangeEnd = defEnd
1✔
843
        }
1✔
844
        if cont.config.SnatDefaultPortRangeStart < 0 || cont.config.SnatDefaultPortRangeEnd < 0 ||
1✔
845
                cont.config.SnatDefaultPortRangeStart > defEnd || cont.config.SnatDefaultPortRangeEnd > defEnd ||
1✔
846
                cont.config.SnatDefaultPortRangeStart > cont.config.SnatDefaultPortRangeEnd {
1✔
847
                cont.config.SnatDefaultPortRangeStart = defStart
×
848
                cont.config.SnatDefaultPortRangeEnd = defEnd
×
849
        }
×
850

851
        // Set default value for pbr programming delay if services list is not empty
852
        // and delay value is empty
853
        if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 &&
1✔
854
                cont.config.ServiceGraphEndpointAddDelay.Services != nil &&
1✔
855
                len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 {
1✔
856
                cont.config.ServiceGraphEndpointAddDelay.Delay = 90
×
857
        }
×
858
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
859
                cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay)
×
860
        }
×
861

862
        // Set contract scope for snat svc graph to global by default
863
        if cont.config.SnatSvcContractScope == "" {
2✔
864
                cont.config.SnatSvcContractScope = "global"
1✔
865
        }
1✔
866
        if cont.config.MaxSvcGraphNodes == 0 {
2✔
867
                cont.config.MaxSvcGraphNodes = 32
1✔
868
        }
1✔
869
        if !cont.isCNOEnabled() {
2✔
870
                cont.log.Info("Max number of nodes per svc graph is set to: ", cont.config.MaxSvcGraphNodes)
1✔
871
        }
1✔
872
        cont.apicConn, err = apicapi.New(cont.log, cont.config.ApicHosts,
1✔
873
                cont.config.ApicUsername, cont.config.ApicPassword,
1✔
874
                privKey, apicCert, cont.config.AciPrefix,
1✔
875
                refreshTimeout, refreshTickerAdjust, cont.config.ApicSubscriptionDelay,
1✔
876
                cont.config.AciVrfTenant, cont.UpdateLLDPIfLocked)
1✔
877
        if err != nil {
1✔
878
                panic(err)
×
879
        }
880

881
        cont.apicConn.FilterOpflexDevice = cont.config.FilterOpflexDevice
1✔
882
        cont.apicConn.Flavor = cont.config.Flavor
1✔
883
        cont.apicConn.VmmDomain = cont.config.AciVmmDomain
1✔
884
        cont.apicConn.ReconnectRetryLimit = cont.config.ApicConnectionRetryLimit
1✔
885
        cont.apicConn.RequestRetryDelay = cont.config.ApicRequestRetryDelay
1✔
886
        cont.apicConn.EnableRequestRetry = cont.config.EnableApicRequestRetry
1✔
887

1✔
888
        if len(cont.config.ApicHosts) != 0 {
1✔
889
        APIC_SWITCH:
×
890
                cont.log.WithFields(logrus.Fields{
×
891
                        "mod":  "APICAPI",
×
892
                        "host": cont.apicConn.Apic[cont.apicConn.ApicIndex],
×
893
                }).Debug("Connecting to APIC to determine the Version")
×
894

×
895
                version, err := cont.apicConn.GetVersion()
×
896
                if err != nil {
×
897
                        cont.log.Error("Could not get APIC version, switching to next APIC")
×
898
                        cont.apicConn.ApicIndex = (cont.apicConn.ApicIndex + 1) % len(cont.apicConn.Apic)
×
899
                        time.Sleep(cont.apicConn.ReconnectInterval)
×
900
                        goto APIC_SWITCH
×
901
                }
902
                cont.apicConn.CachedVersion = version
×
903
                apicapi.ApicVersion = version
×
904
                if version >= "4.2(4i)" {
×
905
                        cont.apicConn.SnatPbrFltrChain = true
×
906
                } else {
×
907
                        cont.apicConn.SnatPbrFltrChain = false
×
908
                }
×
909
                if version >= "5.2" {
×
910
                        cont.vmmClusterFaultSupported = true
×
911
                }
×
912
        } else { // For unit-tests
1✔
913
                cont.apicConn.SnatPbrFltrChain = true
1✔
914
        }
1✔
915

916
        if !cont.isCNOEnabled() {
2✔
917
                cont.log.Debug("SnatPbrFltrChain set to:", cont.apicConn.SnatPbrFltrChain)
1✔
918
                // Make sure Pod/NodeBDs are assoicated to same VRF.
1✔
919
                if len(cont.config.ApicHosts) != 0 && cont.config.AciPodBdDn != "" && cont.config.AciNodeBdDn != "" {
1✔
920
                        var expectedVrfRelations []string
×
921
                        expectedVrfRelations = append(expectedVrfRelations, cont.config.AciPodBdDn, cont.config.AciNodeBdDn)
×
922
                        cont.log.Debug("expectedVrfRelations:", expectedVrfRelations)
×
923
                        err = cont.apicConn.ValidateAciVrfAssociation(cont.config.AciVrfDn, expectedVrfRelations)
×
924
                        if err != nil {
×
925
                                cont.log.Error("Pod/NodeBDs and AciL3Out VRF association is incorrect")
×
926
                                panic(err)
×
927
                        }
928
                }
929
        }
930

931
        if len(cont.config.ApicHosts) != 0 && cont.vmmClusterFaultSupported && !cont.isCNOEnabled() {
1✔
932
                //Clear fault instances when the controller starts
×
933
                cont.clearFaultInstances()
×
934
                //Subscribe for vmmEpPD for a given domain
×
935
                var tnTargetFilterEpg string
×
936
                tnTargetFilterEpg += fmt.Sprintf("uni/vmmp-%s/dom-%s/", cont.vmmDomainProvider(), cont.config.AciVmmDomain)
×
937
                subnetTargetFilterEpg := fmt.Sprintf("and(wcard(vmmEpPD.dn,\"%s\"))", tnTargetFilterEpg)
×
938
                cont.apicConn.AddSubscriptionClass("vmmEpPD",
×
939
                        []string{"vmmEpPD"}, subnetTargetFilterEpg)
×
940
                cont.apicConn.SetSubscriptionHooks("vmmEpPD",
×
941
                        func(obj apicapi.ApicObject) bool {
×
942
                                cont.vmmEpPDChanged(obj)
×
943
                                return true
×
944
                        },
×
945
                        func(dn string) {
×
946
                                cont.vmmEpPDDeleted(dn)
×
947
                        })
×
948
        }
949

950
        cont.initStaticObjs()
1✔
951

1✔
952
        err = cont.env.PrepareRun(stopCh)
1✔
953
        if err != nil {
1✔
954
                panic(err.Error())
×
955
        }
956

957
        cont.apicConn.FullSyncHook = func() {
1✔
958
                // put a channel into each work queue and wait on it to
×
959
                // checkpoint object syncing in response to new subscription
×
960
                // updates
×
961
                cont.log.Debug("Starting checkpoint")
×
962
                var chans []chan struct{}
×
963
                qs := make([]workqueue.RateLimitingInterface, 0)
×
964
                _, ok := cont.env.(*K8sEnvironment)
×
965
                if ok {
×
966
                        qs = []workqueue.RateLimitingInterface{cont.podQueue}
×
967
                        if !cont.isCNOEnabled() {
×
968
                                if !cont.config.DisableHppRendering {
×
969
                                        qs = append(qs, cont.netPolQueue)
×
970
                                }
×
971
                                if cont.config.EnableHppDirect {
×
972
                                        qs = append(qs, cont.remIpContQueue)
×
973
                                }
×
974
                                qs = append(qs, cont.qosQueue, cont.serviceQueue,
×
975
                                        cont.snatQueue, cont.netflowQueue, cont.snatNodeInfoQueue,
×
976
                                        cont.rdConfigQueue, cont.erspanQueue,
×
977
                                        cont.epgDnCacheUpdateQueue)
×
978
                        }
979
                }
980
                for _, q := range qs {
×
981
                        c := make(chan struct{})
×
982
                        chans = append(chans, c)
×
983
                        q.Add(c)
×
984
                }
×
985
                for _, c := range chans {
×
986
                        <-c
×
987
                }
×
988
                cont.log.Debug("Checkpoint complete")
×
989
        }
990

991
        if len(cont.config.ApicHosts) != 0 && !cont.isCNOEnabled() {
1✔
992
                cont.BuildSubnetDnCache(cont.config.AciVrfDn, cont.config.AciVrfDn)
×
993
                cont.scheduleRdConfig()
×
994
                if strings.Contains(cont.config.Flavor, "openstack") {
×
995
                        cont.setOpenStackSystemId()
×
996
                }
×
997
        }
998

999
        if !cont.isCNOEnabled() {
2✔
1000
                if cont.config.AciPolicyTenant != cont.config.AciVrfTenant {
2✔
1001
                        cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
1002
                                []string{"hostprotPol"})
1✔
1003
                }
1✔
1004
        } else {
1✔
1005
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
1006
                        []string{"fvBD", "fvAp"})
1✔
1007
                cont.apicConn.AddSubscriptionClass("fvnsVlanInstP",
1✔
1008
                        []string{"fvnsVlanInstP"}, "")
1✔
1009
                cont.apicConn.AddSubscriptionClass("infraRsDomP",
1✔
1010
                        []string{"infraRsDomP"}, "")
1✔
1011
                cont.apicConn.AddSubscriptionClass("physDomP",
1✔
1012
                        []string{"physDomP"}, "")
1✔
1013
                cont.apicConn.AddSubscriptionClass("l3extDomP",
1✔
1014
                        []string{"l3extDomP"}, "")
1✔
1015
                cont.apicConn.AddSubscriptionClass("infraRsVlanNs",
1✔
1016
                        []string{"infraRsVlanNs"}, "")
1✔
1017
                cont.apicConn.AddSubscriptionClass("infraGeneric",
1✔
1018
                        []string{"infraGeneric", "infraRsFuncToEpg"}, "")
1✔
1019
                cont.apicConn.AddSubscriptionClass("l3extOut",
1✔
1020
                        []string{"l3extInstP", "l3extSubnet", "fvRsCons", "fvRsProv", "l3extRsEctx", "l3extRsL3DomAtt", "l3extLNodeP", "l3extRsNodeL3OutAtt", "ipRouteP", "ipNexthopP", "l3extLIfP", "l3extVirtualLIfP", "l3extRsDynPathAtt",
1✔
1021
                                "l3extRsPathL3OutAtt", "l3extMember", "l3extIp", "bgpExtP", "bgpPeerP", "bgpAsP", "bgpLocalAsnP", "bgpRsPeerPfxPol"}, "")
1✔
1022
                cont.apicConn.AddSubscriptionClass("bgpPeerPfxPol",
1✔
1023
                        []string{"bgpPeerPfxPol"}, "")
1✔
1024
        }
1✔
1025

1026
        if cont.config.VmmLite {
1✔
1027
                cont.apicConn.AddSubscriptionClass("infraAttEntityP",
×
1028
                        []string{"infraRsFuncToEpg"}, "")
×
1029

×
1030
                cont.apicConn.SetSubscriptionHooks(
×
1031
                        "infraAttEntityP",
×
1032
                        func(obj apicapi.ApicObject) bool {
×
1033
                                cont.log.Debug("EPG attached to AAEP")
×
1034
                                cont.handleAaepEpgAttach(obj)
×
1035
                                return true
×
1036
                        },
×
1037
                        func(dn string) {
×
1038
                                cont.log.Debug("EPG detached from AAEP")
×
1039
                                cont.handleAaepEpgDetach(dn)
×
1040
                        },
×
1041
                )
1042
        }
1043

1044
        if !cont.isCNOEnabled() {
2✔
1045
                // When a new class is added for subscriptio, check if its name attribute
1✔
1046
                // is in the format aciPrefix-<some value>, if so add it in nameAttrClass
1✔
1047
                // in apicapi.go
1✔
1048
                subscribeMo := []string{"fvBD", "vnsLDevVip", "vnsAbsGraph", "vnsLDevCtx",
1✔
1049
                        "vzFilter", "vzBrCP", "l3extInstP", "vnsSvcRedirectPol",
1✔
1050
                        "vnsRedirectHealthGroup", "fvIPSLAMonitoringPol"}
1✔
1051
                if cont.config.AciPolicyTenant == cont.config.AciVrfTenant {
1✔
1052
                        subscribeMo = append(subscribeMo, "hostprotPol")
×
1053
                }
×
1054
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciVrfTenant,
1✔
1055
                        subscribeMo)
1✔
1056
                cont.apicConn.AddSubscriptionDn(fmt.Sprintf("uni/tn-%s/out-%s",
1✔
1057
                        cont.config.AciVrfTenant, cont.config.AciL3Out),
1✔
1058
                        []string{"fvRsCons"})
1✔
1059
                vmmDn := fmt.Sprintf("comp/prov-%s/ctrlr-[%s]-%s/injcont",
1✔
1060
                        cont.env.VmmPolicy(), cont.config.AciVmmDomain,
1✔
1061
                        cont.config.AciVmmController)
1✔
1062
                // Before subscribing to vmm objects, add vmmInjectedLabel as a child after explicit APIC version check
1✔
1063
                // Since it is not supported for APIC versions < "5.0"
1✔
1064
                cont.addVmmInjectedLabel()
1✔
1065
                cont.apicConn.AddSyncDn(vmmDn,
1✔
1066
                        []string{"vmmInjectedHost", "vmmInjectedNs"})
1✔
1067

1✔
1068
                var tnTargetFilter string
1✔
1069
                if len(cont.config.AciVrfRelatedTenants) > 0 {
1✔
1070
                        for _, tn := range cont.config.AciVrfRelatedTenants {
×
1071
                                tnTargetFilter += fmt.Sprintf("tn-%s/|", tn)
×
1072
                        }
×
1073
                } else {
1✔
1074
                        tnTargetFilter += fmt.Sprintf("tn-%s/|tn-%s/",
1✔
1075
                                cont.config.AciPolicyTenant, cont.config.AciVrfTenant)
1✔
1076
                }
1✔
1077
                subnetTargetFilter := fmt.Sprintf("and(wcard(fvSubnet.dn,\"%s\"))",
1✔
1078
                        tnTargetFilter)
1✔
1079
                cont.apicConn.AddSubscriptionClass("fvSubnet",
1✔
1080
                        []string{"fvSubnet"}, subnetTargetFilter)
1✔
1081

1✔
1082
                cont.apicConn.SetSubscriptionHooks("fvSubnet",
1✔
1083
                        func(obj apicapi.ApicObject) bool {
1✔
1084
                                cont.SubnetChanged(obj, cont.config.AciVrfDn)
×
1085
                                return true
×
1086
                        },
×
1087
                        func(dn string) {
×
1088
                                cont.SubnetDeleted(dn)
×
1089
                        })
×
1090

1091
                cont.apicConn.AddSubscriptionClass("opflexODev",
1✔
1092
                        []string{"opflexODev"}, "")
1✔
1093

1✔
1094
                cont.apicConn.SetSubscriptionHooks("opflexODev",
1✔
1095
                        func(obj apicapi.ApicObject) bool {
1✔
1096
                                cont.opflexDeviceChanged(obj)
×
1097
                                return true
×
1098
                        },
×
1099
                        func(dn string) {
×
1100
                                cont.opflexDeviceDeleted(dn)
×
1101
                        })
×
1102

1103
                if !cont.config.DisableServiceVlanPreprovisioning && !strings.Contains(cont.config.Flavor, "openstack") {
2✔
1104
                        if cont.config.AEP == "" {
2✔
1105
                                cont.log.Error("AEP is missing in configuration, preprovisioning of service vlan will be disabled")
1✔
1106
                        } else {
1✔
1107
                                infraRtAttEntPFilter := fmt.Sprintf("and(wcard(infraRtAttEntP.dn,\"/attentp-%s/\"))", cont.config.AEP)
×
1108
                                cont.apicConn.AddSubscriptionClass("infraRtAttEntP",
×
1109
                                        []string{"infraRtAttEntP"}, infraRtAttEntPFilter)
×
1110

×
1111
                                // For bare metal, the infraRtAttEntP associated with an AEP will be empty.
×
1112
                                // We should not receive any updates for such cases.
×
1113
                                cont.apicConn.SetSubscriptionHooks("infraRtAttEntP",
×
1114
                                        func(obj apicapi.ApicObject) bool {
×
1115
                                                cont.infraRtAttEntPChanged(obj)
×
1116
                                                return true
×
1117
                                        },
×
1118
                                        func(dn string) {
×
1119
                                                cont.infraRtAttEntPDeleted(dn)
×
1120
                                        })
×
1121

1122
                                cont.apicConn.AddSubscriptionClass("vpcIf",
×
1123
                                        []string{"vpcIf"}, "")
×
1124

×
1125
                                cont.apicConn.SetSubscriptionHooks("vpcIf",
×
1126
                                        func(obj apicapi.ApicObject) bool {
×
1127
                                                cont.vpcIfChanged(obj)
×
1128
                                                return true
×
1129
                                        },
×
1130
                                        func(dn string) {
×
1131
                                                cont.vpcIfDeleted(dn)
×
1132
                                        })
×
1133
                        }
1134
                }
1135

1136
                cont.apicConn.VersionUpdateHook =
1✔
1137
                        func() {
1✔
1138
                                cont.initStaticServiceObjs()
×
1139
                        }
×
1140
        } else if cont.config.VmmLite {
1✔
1141
                cont.apicConn.VMMLiteSyncHook = func() {
×
1142
                        cont.syncAndCleanNadCache()
×
NEW
1143
                        cont.syncAndCleanNads()
×
UNCOV
1144
                }
×
1145
        }
1146
        go cont.apicConn.Run(stopCh)
1✔
1147
}
1148

1149
func (cont *AciController) syncNodeAciPods(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1150
        cont.log.Debug("Go routine to periodically check the aci pod information in the opflexOdev of a node")
1✔
1151
        ticker := time.NewTicker(seconds * time.Second)
1✔
1152
        defer ticker.Stop()
1✔
1153

1✔
1154
        for {
2✔
1155
                select {
1✔
1156
                case <-ticker.C:
1✔
1157
                        if cont.config.EnableOpflexAgentReconnect {
1✔
1158
                                cont.checkChangeOfOpflexOdevAciPod()
×
1159
                        }
×
1160
                        if cont.config.AciMultipod {
1✔
1161
                                cont.checkChangeOfOdevAciPod()
×
1162
                        }
×
1163
                case <-stopCh:
1✔
1164
                        return
1✔
1165
                }
1166
        }
1167
}
1168

1169
func (cont *AciController) syncOpflexDevices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1170
        cont.log.Debug("Go routine to periodically delete old opflexdevices started")
1✔
1171
        ticker := time.NewTicker(seconds * time.Second)
1✔
1172
        defer ticker.Stop()
1✔
1173

1✔
1174
        for {
2✔
1175
                select {
1✔
1176
                case <-ticker.C:
1✔
1177
                        cont.deleteOldOpflexDevices()
1✔
1178
                case <-stopCh:
1✔
1179
                        return
1✔
1180
                }
1181
        }
1182
}
1183

1184
func (cont *AciController) syncDelayedEpSlices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1185
        cont.log.Debug("Go routine to periodically check and process the epslices having delay adding in service")
1✔
1186
        ticker := time.NewTicker(seconds * time.Second)
1✔
1187
        defer ticker.Stop()
1✔
1188

1✔
1189
        for {
2✔
1190
                select {
1✔
1191
                case <-ticker.C:
1✔
1192
                        cont.processDelayedEpSlices()
1✔
1193
                case <-stopCh:
1✔
1194
                        return
1✔
1195
                }
1196
        }
1197
}
1198

1199
func (cont *AciController) snatGlobalInfoSync(stopCh <-chan struct{}, seconds int) {
1✔
1200
        time.Sleep(time.Duration(seconds) * time.Second)
1✔
1201
        cont.log.Debug("Go routine to periodically sync globalinfo and nodeinfo started")
1✔
1202
        iteration := 0
1✔
1203
        for {
2✔
1204
                // To avoid noisy logs, only printing once in 5 minutes
1✔
1205
                if iteration%5 == 0 {
2✔
1206
                        cont.log.Debug("Syncing GlobalInfo with Node infos")
1✔
1207
                }
1✔
1208
                var nodeInfos []*nodeinfo.NodeInfo
1✔
1209
                cont.indexMutex.Lock()
1✔
1210
                cache.ListAll(cont.snatNodeInfoIndexer, labels.Everything(),
1✔
1211
                        func(nodeInfoObj interface{}) {
2✔
1212
                                nodeInfo := nodeInfoObj.(*nodeinfo.NodeInfo)
1✔
1213
                                nodeInfos = append(nodeInfos, nodeInfo)
1✔
1214
                        })
1✔
1215
                expectedmap := make(map[string]map[string]bool)
1✔
1216
                for _, glinfo := range cont.snatGlobalInfoCache {
2✔
1217
                        for nodename, entry := range glinfo {
2✔
1218
                                if _, found := expectedmap[nodename]; !found {
2✔
1219
                                        newentry := make(map[string]bool)
1✔
1220
                                        newentry[entry.SnatPolicyName] = true
1✔
1221
                                        expectedmap[nodename] = newentry
1✔
1222
                                } else {
2✔
1223
                                        currententry := expectedmap[nodename]
1✔
1224
                                        currententry[entry.SnatPolicyName] = true
1✔
1225
                                        expectedmap[nodename] = currententry
1✔
1226
                                }
1✔
1227
                        }
1228
                }
1229
                cont.indexMutex.Unlock()
1✔
1230

1✔
1231
                for _, value := range nodeInfos {
2✔
1232
                        marked := false
1✔
1233
                        policyNames := value.Spec.SnatPolicyNames
1✔
1234
                        nodeName := value.ObjectMeta.Name
1✔
1235
                        _, ok := expectedmap[nodeName]
1✔
1236
                        if !ok && len(policyNames) > 0 {
2✔
1237
                                cont.log.Info("Adding missing entry in snatglobalinfo for node: ", nodeName)
1✔
1238
                                cont.log.Debug("No snat policies found in snatglobalinfo")
1✔
1239
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1240
                                marked = true
1✔
1241
                        } else if len(policyNames) != len(expectedmap[nodeName]) {
3✔
1242
                                cont.log.Info("Adding missing snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1243
                                cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1244
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1245
                                marked = true
1✔
1246
                        } else {
2✔
1247
                                if len(policyNames) == 0 && len(expectedmap[nodeName]) == 0 {
1✔
1248
                                        // No snatpolicies present
×
1249
                                        continue
×
1250
                                }
1251
                                eq := reflect.DeepEqual(expectedmap[nodeName], policyNames)
1✔
1252
                                if !eq {
2✔
1253
                                        cont.log.Debug("Syncing inconsistent snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1254
                                        cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1255
                                        cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1256
                                        marked = true
1✔
1257
                                }
1✔
1258
                        }
1259
                        if marked {
2✔
1260
                                cont.log.Info("Nodeinfo and globalinfo out of sync for node: ", nodeName)
1✔
1261
                                nodeinfokey, err := cache.MetaNamespaceKeyFunc(value)
1✔
1262
                                if err != nil {
1✔
1263
                                        cont.log.Error("Not able to get key for node: ", nodeName)
×
1264
                                        continue
×
1265
                                }
1266
                                cont.log.Info("Queuing nodeinfokey for globalinfo sync: ", nodeinfokey)
1✔
1267
                                cont.queueNodeInfoUpdateByKey(nodeinfokey)
1✔
1268
                        } else if iteration%5 == 0 {
2✔
1269
                                cont.log.Info("Nodeinfo and globalinfo in sync for node: ", nodeName)
1✔
1270
                        }
1✔
1271
                }
1272
                time.Sleep(time.Duration(seconds) * time.Second)
1✔
1273
                iteration++
1✔
1274
        }
1275
}
1276

1277
func (cont *AciController) processSyncQueue(queue workqueue.RateLimitingInterface,
1278
        queueStop <-chan struct{}) {
1✔
1279
        go wait.Until(func() {
2✔
1280
                for {
2✔
1281
                        syncType, quit := queue.Get()
1✔
1282
                        if quit {
2✔
1283
                                break
1✔
1284
                        }
1285
                        var requeue bool
1✔
1286
                        if sType, ok := syncType.(string); ok {
2✔
1287
                                if f, ok := cont.syncProcessors[sType]; ok {
2✔
1288
                                        requeue = f()
1✔
1289
                                }
1✔
1290
                        }
1291
                        if requeue {
1✔
1292
                                queue.AddRateLimited(syncType)
×
1293
                        } else {
1✔
1294
                                queue.Forget(syncType)
1✔
1295
                        }
1✔
1296
                        queue.Done(syncType)
1✔
1297
                }
1298
        }, time.Second, queueStop)
1299
        <-queueStop
1✔
1300
        queue.ShutDown()
1✔
1301
}
1302

1303
func (cont *AciController) scheduleSyncGlobalInfo() {
1✔
1304
        cont.syncQueue.AddRateLimited("snatGlobalInfo")
1✔
1305
}
1✔
1306
func (cont *AciController) scheduleRdConfig() {
×
1307
        cont.syncQueue.AddRateLimited("rdConfig")
×
1308
}
×
1309
func (cont *AciController) scheduleCreateIstioCR() {
×
1310
        cont.syncQueue.AddRateLimited("istioCR")
×
1311
}
×
1312

1313
func (cont *AciController) addVmmInjectedLabel() {
1✔
1314
        if apicapi.ApicVersion >= "5.2" {
1✔
1315
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedLabel")
×
1316
                if err != nil {
×
1317
                        panic(err.Error())
×
1318
                }
1319
                err = apicapi.AddMetaDataChild("vmmInjectedSvc", "vmmInjectedLabel")
×
1320
                if err != nil {
×
1321
                        panic(err.Error())
×
1322
                }
1323
        }
1324
        if apicapi.ApicVersion >= "5.0" {
2✔
1325
                err := apicapi.AddMetaDataChild("vmmInjectedReplSet", "vmmInjectedLabel")
1✔
1326
                if err != nil {
1✔
1327
                        panic(err.Error())
×
1328
                }
1329
                err = apicapi.AddMetaDataChild("vmmInjectedContGrp", "vmmInjectedLabel")
1✔
1330
                if err != nil {
1✔
1331
                        panic(err.Error())
×
1332
                }
1333
                err = apicapi.AddMetaDataChild("vmmInjectedDepl", "vmmInjectedLabel")
1✔
1334
                if err != nil {
1✔
1335
                        panic(err.Error())
×
1336
                }
1337
        }
1338
}
1339

1340
func (cont *AciController) isCNOEnabled() bool {
1✔
1341
        return cont.config.ChainedMode || cont.config.VmmLite
1✔
1342
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc