• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 11429

19 Nov 2025 10:33AM UTC coverage: 63.209% (-0.4%) from 63.619%
11429

push

travis-pro

web-flow
Merge pull request #1642 from noironetworks/improve-dhcp-logic

Improve VLAN IP release and request behavior

0 of 169 new or added lines in 1 file covered. (0.0%)

569 existing lines in 7 files now uncovered.

13375 of 21160 relevant lines covered (63.21%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.37
/pkg/controller/controller.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "encoding/json"
19
        "fmt"
20
        "net"
21
        "os"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/sirupsen/logrus"
29
        "github.com/yl2chen/cidranger"
30
        "golang.org/x/time/rate"
31

32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        v1net "k8s.io/api/networking/v1"
35
        "k8s.io/apimachinery/pkg/labels"
36
        "k8s.io/apimachinery/pkg/util/wait"
37
        "k8s.io/client-go/kubernetes"
38
        "k8s.io/client-go/tools/cache"
39
        "k8s.io/client-go/util/workqueue"
40

41
        "github.com/noironetworks/aci-containers/pkg/apicapi"
42
        fabattv1 "github.com/noironetworks/aci-containers/pkg/fabricattachment/apis/aci.fabricattachment/v1"
43
        fabattclset "github.com/noironetworks/aci-containers/pkg/fabricattachment/clientset/versioned"
44
        hppv1 "github.com/noironetworks/aci-containers/pkg/hpp/apis/aci.hpp/v1"
45
        "github.com/noironetworks/aci-containers/pkg/index"
46
        "github.com/noironetworks/aci-containers/pkg/ipam"
47
        istiov1 "github.com/noironetworks/aci-containers/pkg/istiocrd/apis/aci.istio/v1"
48
        "github.com/noironetworks/aci-containers/pkg/metadata"
49
        nodeinfo "github.com/noironetworks/aci-containers/pkg/nodeinfo/apis/aci.snat/v1"
50
        rdConfig "github.com/noironetworks/aci-containers/pkg/rdconfig/apis/aci.snat/v1"
51
        snatglobalinfo "github.com/noironetworks/aci-containers/pkg/snatglobalinfo/apis/aci.snat/v1"
52
        "github.com/noironetworks/aci-containers/pkg/util"
53
)
54

55
type podUpdateFunc func(*v1.Pod) (*v1.Pod, error)
56
type nodeUpdateFunc func(*v1.Node) (*v1.Node, error)
57
type serviceUpdateFunc func(*v1.Service) (*v1.Service, error)
58
type listNetworkPoliciesFunc func(string) (*v1net.NetworkPolicyList, error)
59
type listNamespacesFunc func() (*v1.NamespaceList, error)
60

61
type AciController struct {
62
        log    *logrus.Logger
63
        config *ControllerConfig
64
        env    Environment
65

66
        defaultEg string
67
        defaultSg string
68

69
        unitTestMode bool
70

71
        podQueue               workqueue.RateLimitingInterface
72
        netPolQueue            workqueue.RateLimitingInterface
73
        qosQueue               workqueue.RateLimitingInterface
74
        serviceQueue           workqueue.RateLimitingInterface
75
        snatQueue              workqueue.RateLimitingInterface
76
        netflowQueue           workqueue.RateLimitingInterface
77
        erspanQueue            workqueue.RateLimitingInterface
78
        snatNodeInfoQueue      workqueue.RateLimitingInterface
79
        rdConfigQueue          workqueue.RateLimitingInterface
80
        istioQueue             workqueue.RateLimitingInterface
81
        nodeFabNetAttQueue     workqueue.RateLimitingInterface
82
        netFabConfigQueue      workqueue.RateLimitingInterface
83
        nadVlanMapQueue        workqueue.RateLimitingInterface
84
        fabricVlanPoolQueue    workqueue.RateLimitingInterface
85
        netFabL3ConfigQueue    workqueue.RateLimitingInterface
86
        remIpContQueue         workqueue.RateLimitingInterface
87
        epgDnCacheUpdateQueue  workqueue.RateLimitingInterface
88
        aaepMonitorConfigQueue workqueue.RateLimitingInterface
89

90
        namespaceIndexer                     cache.Indexer
91
        namespaceInformer                    cache.Controller
92
        podIndexer                           cache.Indexer
93
        podInformer                          cache.Controller
94
        endpointsIndexer                     cache.Indexer
95
        endpointsInformer                    cache.Controller
96
        serviceIndexer                       cache.Indexer
97
        serviceInformer                      cache.Controller
98
        replicaSetIndexer                    cache.Indexer
99
        replicaSetInformer                   cache.Controller
100
        deploymentIndexer                    cache.Indexer
101
        deploymentInformer                   cache.Controller
102
        nodeIndexer                          cache.Indexer
103
        nodeInformer                         cache.Controller
104
        networkPolicyIndexer                 cache.Indexer
105
        networkPolicyInformer                cache.Controller
106
        snatIndexer                          cache.Indexer
107
        snatInformer                         cache.Controller
108
        snatNodeInfoIndexer                  cache.Indexer
109
        snatNodeInformer                     cache.Controller
110
        snatLocalInfoInformer                cache.Controller
111
        crdInformer                          cache.Controller
112
        rdConfigInformer                     cache.Controller
113
        rdConfigIndexer                      cache.Indexer
114
        qosIndexer                           cache.Indexer
115
        qosInformer                          cache.Controller
116
        netflowIndexer                       cache.Indexer
117
        netflowInformer                      cache.Controller
118
        erspanIndexer                        cache.Indexer
119
        erspanInformer                       cache.Controller
120
        nodePodIfIndexer                     cache.Indexer
121
        nodePodIfInformer                    cache.Controller
122
        istioIndexer                         cache.Indexer
123
        istioInformer                        cache.Controller
124
        endpointSliceIndexer                 cache.Indexer
125
        endpointSliceInformer                cache.Controller
126
        snatCfgInformer                      cache.Controller
127
        updatePod                            podUpdateFunc
128
        updateNode                           nodeUpdateFunc
129
        updateServiceStatus                  serviceUpdateFunc
130
        listNetworkPolicies                  listNetworkPoliciesFunc
131
        listNamespaces                       listNamespacesFunc
132
        nodeFabNetAttInformer                cache.SharedIndexInformer
133
        netFabConfigInformer                 cache.SharedIndexInformer
134
        nadVlanMapInformer                   cache.SharedIndexInformer
135
        fabricVlanPoolInformer               cache.SharedIndexInformer
136
        networkFabricL3ConfigurationInformer cache.SharedIndexInformer
137
        fabNetAttClient                      *fabattclset.Clientset
138
        proactiveConfInformer                cache.SharedIndexInformer
139
        aaepMonitorInformer                  cache.SharedIndexInformer
140
        poster                               *EventPoster
141

142
        indexMutex sync.Mutex
143
        hppMutex   sync.Mutex
144

145
        configuredPodNetworkIps *netIps
146
        podNetworkIps           *netIps
147
        serviceIps              *ipam.IpCache
148
        staticServiceIps        *netIps
149
        nodeServiceIps          *netIps
150

151
        // index of pods matched by deployments
152
        depPods *index.PodSelectorIndex
153
        // index of pods matched by network policies
154
        netPolPods *index.PodSelectorIndex
155
        // index of pods matched by network policy ingress rules
156
        netPolIngressPods *index.PodSelectorIndex
157
        // index of pods matched by network policy egress rules
158
        netPolEgressPods *index.PodSelectorIndex
159
        // index of IP addresses contained in endpoints objects
160
        endpointsIpIndex cidranger.Ranger
161
        // index of service target ports
162
        targetPortIndex map[string]*portIndexEntry
163
        // index of ip blocks referenced by network policy egress rules
164
        netPolSubnetIndex cidranger.Ranger
165
        // index of pods matched by erspan policies
166
        erspanPolPods *index.PodSelectorIndex
167

168
        apicConn *apicapi.ApicConnection
169

170
        nodeServiceMetaCache map[string]*nodeServiceMeta
171
        nodeACIPod           map[string]aciPodAnnot
172
        nodeACIPodAnnot      map[string]aciPodAnnot
173
        nodeOpflexDevice     map[string]apicapi.ApicSlice
174
        nodePodNetCache      map[string]*nodePodNetMeta
175
        serviceMetaCache     map[string]*serviceMeta
176
        snatPolicyCache      map[string]*ContSnatPolicy
177
        delayedEpSlices      []*DelayedEpSlice
178
        snatServices         map[string]bool
179
        snatNodeInfoCache    map[string]*nodeinfo.NodeInfo
180
        rdConfigCache        map[string]*rdConfig.RdConfig
181
        rdConfigSubnetCache  map[string]*rdConfig.RdConfigSpec
182
        istioCache           map[string]*istiov1.AciIstioOperator
183
        podIftoEp            map[string]*EndPointData
184
        // Node Name and Policy Name
185
        snatGlobalInfoCache map[string]map[string]*snatglobalinfo.GlobalInfo
186
        nodeSyncEnabled     bool
187
        serviceSyncEnabled  bool
188
        snatSyncEnabled     bool
189
        syncQueue           workqueue.RateLimitingInterface
190
        syncProcessors      map[string]func() bool
191
        serviceEndPoints    ServiceEndPointType
192
        crdHandlers         map[string]func(*AciController, <-chan struct{})
193
        stopCh              <-chan struct{}
194
        //index of containerportname to ctrPortNameEntry
195
        ctrPortNameCache map[string]*ctrPortNameEntry
196
        // named networkPolicies
197
        nmPortNp map[string]bool
198
        //maps network policy hash to hpp
199
        hppRef map[string]hppReference
200
        //map for ns to remoteIpConts
201
        nsRemoteIpCont map[string]remoteIpConts
202
        // cache to look for Epg DNs which are bound to Vmm domain
203
        cachedEpgDns             []string
204
        vmmClusterFaultSupported bool
205
        additionalNetworkCache   map[string]*AdditionalNetworkMeta
206
        //Used in Shared mode
207
        sharedEncapCache       map[int]*sharedEncapData
208
        sharedEncapAepCache    map[string]map[int]bool
209
        sharedEncapSviCache    map[int]*NfL3Data
210
        sharedEncapVrfCache    map[string]*NfVrfData
211
        sharedEncapTenantCache map[string]*NfTenantData
212
        nfl3configGenerationId int64
213
        // vlan to propertiesList
214
        sharedEncapNfcCache         map[int]*NfcData
215
        sharedEncapNfcVlanMap       map[int]*NfcData
216
        sharedEncapNfcLabelMap      map[string]*NfcData
217
        sharedEncapNfcAppProfileMap map[string]bool
218
        // nadVlanMap encapLabel to vlan
219
        sharedEncapLabelMap      map[string][]int
220
        lldpIfCache              map[string]*NfLLDPIfData
221
        globalVlanConfig         globalVlanConfig
222
        fabricVlanPoolMap        map[string]map[string]string
223
        openStackFabricPathDnMap map[string]openstackOpflexOdevInfo
224
        hostFabricPathDnMap      map[string]hostFabricInfo
225
        openStackSystemId        string
226
        sharedAaepMonitor        map[string]map[string]*AaepEpgAttachData
227
}
228

229
type hostFabricInfo struct {
230
        fabricPathDn string
231
        host         string
232
        vpcIfDn      map[string]struct{}
233
}
234

235
type NfLLDPIfData struct {
236
        LLDPIf string
237
        // As of now, manage at the NAD level
238
        // more granular introduces intf tracking complexities
239
        // for not sufficient benefits
240
        Refs map[string]bool
241
}
242

243
type NfL3OutData struct {
244
        // +kubebuilder:validation:Enum:"import"
245
        RtCtrl     string
246
        PodId      int
247
        RtrNodeMap map[int]*fabattv1.FabricL3OutRtrNode
248
        ExtEpgMap  map[string]*fabattv1.PolicyPrefixGroup
249
        SviMap     map[int]bool
250
}
251

252
type NfTenantData struct {
253
        CommonTenant     bool
254
        L3OutConfig      map[string]*NfL3OutData
255
        BGPPeerPfxConfig map[string]*fabattv1.BGPPeerPrefixPolicy
256
}
257

258
type NfVrfData struct {
259
        TenantConfig map[string]*NfTenantData
260
}
261

262
type NfL3Networks struct {
263
        fabattv1.PrimaryNetwork
264
        Subnets map[string]*fabattv1.FabricL3Subnet
265
}
266

267
type NfL3Data struct {
268
        Tenant      string
269
        Vrf         fabattv1.VRF
270
        PodId       int
271
        ConnectedNw *NfL3Networks
272
        NetAddr     map[string]*RoutedNetworkData
273
        Nodes       map[int]fabattv1.FabricL3OutNode
274
}
275

276
// maps pod name to remoteIpCont
277
type remoteIpConts map[string]remoteIpCont
278

279
// remoteIpCont maps ip to pod labels
280
type remoteIpCont map[string]map[string]string
281

282
type NfcData struct {
283
        Aeps map[string]bool
284
        Epg  fabattv1.Epg
285
}
286

287
type sharedEncapData struct {
288
        //node to NAD to pods
289
        Pods   map[string]map[string][]string
290
        NetRef map[string]*AdditionalNetworkMeta
291
        Aeps   map[string]bool
292
}
293

294
type globalVlanConfig struct {
295
        SharedPhysDom apicapi.ApicObject
296
        SharedL3Dom   apicapi.ApicObject
297
}
298

299
type hppReference struct {
300
        RefCount uint              `json:"ref-count,omitempty"`
301
        Npkeys   []string          `json:"npkeys,omitempty"`
302
        HppObj   apicapi.ApicSlice `json:"hpp-obj,omitempty"`
303
        HppCr    hppv1.HostprotPol `json:"hpp-cr,omitempty"`
304
}
305

306
type DelayedEpSlice struct {
307
        ServiceKey  string
308
        OldEpSlice  *discovery.EndpointSlice
309
        NewEpSlice  *discovery.EndpointSlice
310
        DelayedTime time.Time
311
}
312

313
type aciPodAnnot struct {
314
        aciPod         string
315
        disconnectTime time.Time
316
        lastErrorTime  time.Time
317
}
318

319
type nodeServiceMeta struct {
320
        serviceEp metadata.ServiceEndpoint
321
}
322

323
type nodePodNetMeta struct {
324
        nodePods            map[string]bool
325
        podNetIps           metadata.NetIps
326
        podNetIpsAnnotation string
327
}
328

329
type openstackOpflexOdevInfo struct {
330
        opflexODevDn map[string]struct{}
331
        fabricPathDn string
332
}
333

334
type serviceMeta struct {
335
        requestedIps     []net.IP
336
        ingressIps       []net.IP
337
        staticIngressIps []net.IP
338
}
339

340
type ipIndexEntry struct {
341
        ipNet net.IPNet
342
        keys  map[string]bool
343
}
344

345
type targetPort struct {
346
        proto v1.Protocol
347
        ports []int
348
}
349

350
type portIndexEntry struct {
351
        port              targetPort
352
        serviceKeys       map[string]bool
353
        networkPolicyKeys map[string]bool
354
}
355

356
type portRangeSnat struct {
357
        start int
358
        end   int
359
}
360

361
// EndPointData holds PodIF data in controller.
362
type EndPointData struct {
363
        MacAddr    string
364
        EPG        string
365
        Namespace  string
366
        AppProfile string
367
}
368

369
type ctrPortNameEntry struct {
370
        // Proto+port->pods
371
        ctrNmpToPods map[string]map[string]bool
372
}
373

374
type LinkData struct {
375
        Link []string
376
        Pods []string
377
}
378

379
type RoutedNodeData struct {
380
        addr string
381
        idx  int
382
}
383

384
type RoutedNetworkData struct {
385
        subnet       string
386
        netAddress   string
387
        maskLen      int
388
        numAllocated int
389
        maxAddresses int
390
        baseAddress  net.IP
391
        nodeMap      map[string]RoutedNodeData
392
        availableMap map[int]bool
393
}
394

395
type AdditionalNetworkMeta struct {
396
        NetworkName string
397
        EncapVlan   string
398
        //node+localiface->fabricLinks
399
        FabricLink map[string]map[string]LinkData
400
        NodeCache  map[string]*fabattv1.NodeFabricNetworkAttachment
401
        Mode       util.EncapMode
402
}
403

404
type ServiceEndPointType interface {
405
        InitClientInformer(kubeClient *kubernetes.Clientset)
406
        Run(stopCh <-chan struct{})
407
        Wait(stopCh <-chan struct{})
408
        UpdateServicesForNode(nodename string)
409
        GetnodesMetadata(key string, service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint)
410
        SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool
411
        SetNpServiceAugmentForService(servicekey string, service *v1.Service, prs *portRemoteSubnet,
412
                portAugments map[string]*portServiceAugment, subnetIndex cidranger.Ranger, logger *logrus.Entry)
413
}
414

415
type serviceEndpoint struct {
416
        cont *AciController
417
}
418
type serviceEndpointSlice struct {
419
        cont *AciController
420
}
421

422
type AaepEpgAttachData struct {
423
        encapVlan     int
424
        nadName       string
425
        namespaceName string
426
        nadCreated    bool
427
}
428

429
type EpgVlanMap struct {
430
        epgDn     string
431
        encapVlan int
432
}
433

434
func (sep *serviceEndpoint) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
435
        sep.cont.initEndpointsInformerFromClient(kubeClient)
×
436
}
×
437

438
func (seps *serviceEndpointSlice) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
439
        seps.cont.initEndpointSliceInformerFromClient(kubeClient)
×
440
}
×
441

442
func (sep *serviceEndpoint) Run(stopCh <-chan struct{}) {
1✔
443
        go sep.cont.endpointsInformer.Run(stopCh)
1✔
444
}
1✔
445

446
func (seps *serviceEndpointSlice) Run(stopCh <-chan struct{}) {
1✔
447
        go seps.cont.endpointSliceInformer.Run(stopCh)
1✔
448
}
1✔
449

450
func (sep *serviceEndpoint) Wait(stopCh <-chan struct{}) {
1✔
451
        cache.WaitForCacheSync(stopCh,
1✔
452
                sep.cont.endpointsInformer.HasSynced,
1✔
453
                sep.cont.serviceInformer.HasSynced)
1✔
454
}
1✔
455

456
func (seps *serviceEndpointSlice) Wait(stopCh <-chan struct{}) {
1✔
457
        seps.cont.log.Debug("Waiting for EndPointSlicecache sync")
1✔
458
        cache.WaitForCacheSync(stopCh,
1✔
459
                seps.cont.endpointSliceInformer.HasSynced,
1✔
460
                seps.cont.serviceInformer.HasSynced)
1✔
461
}
1✔
462

463
func (e *ipIndexEntry) Network() net.IPNet {
1✔
464
        return e.ipNet
1✔
465
}
1✔
466

467
func newNodePodNetMeta() *nodePodNetMeta {
1✔
468
        return &nodePodNetMeta{
1✔
469
                nodePods: make(map[string]bool),
1✔
470
        }
1✔
471
}
1✔
472

473
func createQueue(name string) workqueue.RateLimitingInterface {
1✔
474
        return workqueue.NewNamedRateLimitingQueue(
1✔
475
                workqueue.NewMaxOfRateLimiter(
1✔
476
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
477
                                10*time.Second),
1✔
478
                        &workqueue.BucketRateLimiter{
1✔
479
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
480
                        },
1✔
481
                ),
1✔
482
                "delta")
1✔
483
}
1✔
484

485
func NewController(config *ControllerConfig, env Environment, log *logrus.Logger, unittestmode bool) *AciController {
1✔
486
        cont := &AciController{
1✔
487
                log:          log,
1✔
488
                config:       config,
1✔
489
                env:          env,
1✔
490
                defaultEg:    "",
1✔
491
                defaultSg:    "",
1✔
492
                unitTestMode: unittestmode,
1✔
493

1✔
494
                podQueue:               createQueue("pod"),
1✔
495
                netPolQueue:            createQueue("networkPolicy"),
1✔
496
                qosQueue:               createQueue("qos"),
1✔
497
                netflowQueue:           createQueue("netflow"),
1✔
498
                erspanQueue:            createQueue("erspan"),
1✔
499
                serviceQueue:           createQueue("service"),
1✔
500
                snatQueue:              createQueue("snat"),
1✔
501
                snatNodeInfoQueue:      createQueue("snatnodeinfo"),
1✔
502
                rdConfigQueue:          createQueue("rdconfig"),
1✔
503
                istioQueue:             createQueue("istio"),
1✔
504
                nodeFabNetAttQueue:     createQueue("nodefabricnetworkattachment"),
1✔
505
                netFabConfigQueue:      createQueue("networkfabricconfiguration"),
1✔
506
                nadVlanMapQueue:        createQueue("nadvlanmap"),
1✔
507
                fabricVlanPoolQueue:    createQueue("fabricvlanpool"),
1✔
508
                netFabL3ConfigQueue:    createQueue("networkfabricl3configuration"),
1✔
509
                remIpContQueue:         createQueue("remoteIpContainer"),
1✔
510
                epgDnCacheUpdateQueue:  createQueue("epgDnCache"),
1✔
511
                aaepMonitorConfigQueue: createQueue("aaepepgmap"),
1✔
512
                syncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
513
                        &workqueue.BucketRateLimiter{
1✔
514
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
515
                        }, "sync"),
1✔
516

1✔
517
                configuredPodNetworkIps: newNetIps(),
1✔
518
                podNetworkIps:           newNetIps(),
1✔
519
                serviceIps:              ipam.NewIpCache(),
1✔
520
                staticServiceIps:        newNetIps(),
1✔
521
                nodeServiceIps:          newNetIps(),
1✔
522

1✔
523
                nodeACIPod:       make(map[string]aciPodAnnot),
1✔
524
                nodeACIPodAnnot:  make(map[string]aciPodAnnot),
1✔
525
                nodeOpflexDevice: make(map[string]apicapi.ApicSlice),
1✔
526

1✔
527
                nodeServiceMetaCache:        make(map[string]*nodeServiceMeta),
1✔
528
                nodePodNetCache:             make(map[string]*nodePodNetMeta),
1✔
529
                serviceMetaCache:            make(map[string]*serviceMeta),
1✔
530
                snatPolicyCache:             make(map[string]*ContSnatPolicy),
1✔
531
                snatServices:                make(map[string]bool),
1✔
532
                snatNodeInfoCache:           make(map[string]*nodeinfo.NodeInfo),
1✔
533
                rdConfigCache:               make(map[string]*rdConfig.RdConfig),
1✔
534
                rdConfigSubnetCache:         make(map[string]*rdConfig.RdConfigSpec),
1✔
535
                podIftoEp:                   make(map[string]*EndPointData),
1✔
536
                snatGlobalInfoCache:         make(map[string]map[string]*snatglobalinfo.GlobalInfo),
1✔
537
                istioCache:                  make(map[string]*istiov1.AciIstioOperator),
1✔
538
                crdHandlers:                 make(map[string]func(*AciController, <-chan struct{})),
1✔
539
                ctrPortNameCache:            make(map[string]*ctrPortNameEntry),
1✔
540
                nmPortNp:                    make(map[string]bool),
1✔
541
                hppRef:                      make(map[string]hppReference),
1✔
542
                additionalNetworkCache:      make(map[string]*AdditionalNetworkMeta),
1✔
543
                sharedEncapCache:            make(map[int]*sharedEncapData),
1✔
544
                sharedEncapAepCache:         make(map[string]map[int]bool),
1✔
545
                sharedEncapSviCache:         make(map[int]*NfL3Data),
1✔
546
                sharedEncapVrfCache:         make(map[string]*NfVrfData),
1✔
547
                sharedEncapTenantCache:      make(map[string]*NfTenantData),
1✔
548
                sharedEncapNfcCache:         make(map[int]*NfcData),
1✔
549
                sharedEncapNfcVlanMap:       make(map[int]*NfcData),
1✔
550
                sharedEncapNfcLabelMap:      make(map[string]*NfcData),
1✔
551
                sharedEncapNfcAppProfileMap: make(map[string]bool),
1✔
552
                sharedEncapLabelMap:         make(map[string][]int),
1✔
553
                lldpIfCache:                 make(map[string]*NfLLDPIfData),
1✔
554
                fabricVlanPoolMap:           make(map[string]map[string]string),
1✔
555
                openStackFabricPathDnMap:    make(map[string]openstackOpflexOdevInfo),
1✔
556
                hostFabricPathDnMap:         make(map[string]hostFabricInfo),
1✔
557
                nsRemoteIpCont:              make(map[string]remoteIpConts),
1✔
558
                sharedAaepMonitor:           make(map[string]map[string]*AaepEpgAttachData),
1✔
559
        }
1✔
560
        cont.syncProcessors = map[string]func() bool{
1✔
561
                "snatGlobalInfo": cont.syncSnatGlobalInfo,
1✔
562
                "rdConfig":       cont.syncRdConfig,
1✔
563
                /* Commenting code to remove dependency from istio.io/istio package.
1✔
564
                   Vulnerabilties were detected by quay.io security scan of aci-containers-controller
1✔
565
                   and aci-containers-operator images for istio.io/istio package
1✔
566

1✔
567
                "istioCR":        cont.createIstioCR,
1✔
568
                */
1✔
569
        }
1✔
570
        return cont
1✔
571
}
1✔
572

573
func (cont *AciController) Init() {
×
574
        if cont.config.ChainedMode {
×
575
                cont.log.Info("In chained mode")
×
576
        }
×
577
        if cont.config.VmmLite {
×
578
                cont.log.Info("In VMM lite mode")
×
579
        }
×
580

581
        egdata, err := json.Marshal(cont.config.DefaultEg)
×
582
        if err != nil {
×
583
                cont.log.Error("Could not serialize default endpoint group")
×
584
                panic(err.Error())
×
585
        }
586
        cont.defaultEg = string(egdata)
×
587

×
588
        sgdata, err := json.Marshal(cont.config.DefaultSg)
×
589
        if err != nil {
×
590
                cont.log.Error("Could not serialize default security groups")
×
591
                panic(err.Error())
×
592
        }
593
        cont.defaultSg = string(sgdata)
×
594

×
595
        cont.log.Debug("Initializing IPAM")
×
596
        cont.initIpam()
×
597
        // check if the cluster supports endpoint slices
×
598
        // if cluster doesn't have the support fallback to endpoints
×
599
        kubeClient := cont.env.(*K8sEnvironment).kubeClient
×
600
        if util.IsEndPointSlicesSupported(kubeClient) {
×
601
                cont.serviceEndPoints = &serviceEndpointSlice{}
×
602
                cont.serviceEndPoints.(*serviceEndpointSlice).cont = cont
×
603
                cont.log.Info("Initializing ServiceEndpointSlices")
×
604
        } else {
×
605
                cont.serviceEndPoints = &serviceEndpoint{}
×
606
                cont.serviceEndPoints.(*serviceEndpoint).cont = cont
×
607
                cont.log.Info("Initializing ServiceEndpoints")
×
608
        }
×
609

610
        err = cont.env.Init(cont)
×
611
        if err != nil {
×
612
                panic(err.Error())
×
613
        }
614
}
615

616
func (cont *AciController) processQueue(queue workqueue.RateLimitingInterface,
617
        store cache.Store, handler func(interface{}) bool,
618
        deleteHandler func(string) bool,
619
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
620
        go wait.Until(func() {
2✔
621
                for {
2✔
622
                        key, quit := queue.Get()
1✔
623
                        if quit {
2✔
624
                                break
1✔
625
                        }
626

627
                        var requeue bool
1✔
628
                        switch key := key.(type) {
1✔
629
                        case chan struct{}:
×
630
                                close(key)
×
631
                        case string:
1✔
632
                                if strings.HasPrefix(key, "DELETED_") {
2✔
633
                                        delKey := strings.Trim(key, "DELETED_")
1✔
634
                                        requeue = deleteHandler(delKey)
1✔
635
                                } else {
2✔
636
                                        obj, exists, err := store.GetByKey(key)
1✔
637
                                        if err != nil {
1✔
638
                                                cont.log.Debugf("Error fetching object with key %s from store: %v", key, err)
×
639
                                        }
×
640
                                        //Handle Add/Update/Delete
641
                                        if exists && handler != nil {
2✔
642
                                                requeue = handler(obj)
1✔
643
                                        }
1✔
644
                                        //Handle Post Delete
645
                                        if !exists && postDelHandler != nil {
1✔
646
                                                requeue = postDelHandler()
×
647
                                        }
×
648
                                }
649
                        }
650
                        if requeue {
2✔
651
                                queue.AddRateLimited(key)
1✔
652
                        } else {
2✔
653
                                queue.Forget(key)
1✔
654
                        }
1✔
655
                        queue.Done(key)
1✔
656
                }
657
        }, time.Second, stopCh)
658
        <-stopCh
1✔
659
        queue.ShutDown()
1✔
660
}
661

662
func (cont *AciController) processRemIpContQueue(queue workqueue.RateLimitingInterface,
663
        handler func(interface{}) bool,
664
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
665
        go wait.Until(func() {
2✔
666
                for {
2✔
667
                        key, quit := queue.Get()
1✔
668
                        if quit {
2✔
669
                                break
1✔
670
                        }
671

672
                        var requeue bool
1✔
673
                        switch key := key.(type) {
1✔
674
                        case chan struct{}:
×
675
                                close(key)
×
676
                        case string:
1✔
677
                                if handler != nil {
2✔
678
                                        requeue = handler(key)
1✔
679
                                }
1✔
680
                                if postDelHandler != nil {
2✔
681
                                        requeue = postDelHandler()
1✔
682
                                }
1✔
683
                        }
684
                        if requeue {
1✔
685
                                queue.AddRateLimited(key)
×
686
                        } else {
1✔
687
                                queue.Forget(key)
1✔
688
                        }
1✔
689
                        queue.Done(key)
1✔
690

691
                }
692
        }, time.Second, stopCh)
693
        <-stopCh
1✔
694
        queue.ShutDown()
1✔
695
}
696

697
func (cont *AciController) processEpgDnCacheUpdateQueue(queue workqueue.RateLimitingInterface,
698
        handler func(interface{}) bool,
699
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
700
        go wait.Until(func() {
2✔
701
                for {
2✔
702
                        key, quit := queue.Get()
1✔
703
                        if quit {
2✔
704
                                break
1✔
705
                        }
706

707
                        var requeue bool
1✔
708
                        switch key := key.(type) {
1✔
709
                        case chan struct{}:
×
710
                                close(key)
×
711
                        case bool:
1✔
712
                                if handler != nil {
2✔
713
                                        requeue = handler(key)
1✔
714
                                }
1✔
715
                                if postDelHandler != nil {
1✔
716
                                        requeue = postDelHandler()
×
717
                                }
×
718
                        }
719
                        if requeue {
1✔
720
                                queue.AddRateLimited(key)
×
721
                        } else {
1✔
722
                                queue.Forget(key)
1✔
723
                        }
1✔
724
                        queue.Done(key)
1✔
725

726
                }
727
        }, time.Second, stopCh)
728
        <-stopCh
1✔
729
        queue.ShutDown()
1✔
730
}
731

732
func (cont *AciController) globalStaticObjs() apicapi.ApicSlice {
1✔
733
        return apicapi.ApicSlice{}
1✔
734
}
1✔
735

736
func (cont *AciController) aciNameForKey(ktype, key string) string {
1✔
737
        return util.AciNameForKey(cont.config.AciPrefix, ktype, key)
1✔
738
}
1✔
739

740
func (cont *AciController) initStaticObjs() {
1✔
741
        cont.env.InitStaticAciObjects()
1✔
742
        cont.apicConn.WriteStaticApicObjects(cont.config.AciPrefix+"_static",
1✔
743
                cont.globalStaticObjs())
1✔
744
}
1✔
745

746
func (cont *AciController) vmmDomainProvider() (vmmProv string) {
1✔
747
        vmmProv = "Kubernetes"
1✔
748
        if strings.ToLower(cont.config.AciVmmDomainType) == "openshift" {
1✔
749
                vmmProv = "OpenShift"
×
750
        }
×
751
        return
1✔
752
}
753

754
func (cont *AciController) Run(stopCh <-chan struct{}) {
1✔
755
        var err error
1✔
756
        var privKey []byte
1✔
757
        var apicCert []byte
1✔
758

1✔
759
        cont.config.AciVrfDn = "uni/tn-" + cont.config.AciVrfTenant + "/ctx-" + cont.config.AciVrf
1✔
760

1✔
761
        if cont.config.ApicPrivateKeyPath != "" {
1✔
762
                privKey, err = os.ReadFile(cont.config.ApicPrivateKeyPath)
×
763
                if err != nil {
×
764
                        panic(err)
×
765
                }
766
        }
767
        if cont.config.ApicCertPath != "" {
1✔
768
                apicCert, err = os.ReadFile(cont.config.ApicCertPath)
×
769
                if err != nil {
×
770
                        panic(err)
×
771
                }
772
        }
773
        // If not defined, default is 1800
774
        if cont.config.ApicRefreshTimer == "" {
2✔
775
                cont.config.ApicRefreshTimer = "1800"
1✔
776
        }
1✔
777
        refreshTimeout, err := strconv.Atoi(cont.config.ApicRefreshTimer)
1✔
778
        if err != nil {
1✔
779
                panic(err)
×
780
        }
781
        cont.log.Info("ApicRefreshTimer conf is set to: ", refreshTimeout)
1✔
782

1✔
783
        // Bailout if the refreshTimeout is more than 12Hours or less than 5Mins
1✔
784
        if refreshTimeout > (12*60*60) || refreshTimeout < (5*60) {
1✔
785
                cont.log.Info("ApicRefreshTimer can't be more than 12Hrs or less than 5Mins")
×
786
                panic(err)
×
787
        }
788

789
        // If RefreshTickerAdjustInterval is not defined, default to 150Sec.
790
        if cont.config.ApicRefreshTickerAdjust == "" {
2✔
791
                cont.config.ApicRefreshTickerAdjust = "210"
1✔
792
        }
1✔
793
        refreshTickerAdjust, err := strconv.Atoi(cont.config.ApicRefreshTickerAdjust)
1✔
794
        if err != nil {
1✔
795
                panic(err)
×
796
        }
797

798
        // If not defined, default to 900s
799
        if cont.config.LeafRebootCheckInterval == 0 {
2✔
800
                cont.config.LeafRebootCheckInterval = 900
1✔
801
        }
1✔
802

803
        //If ApicSubscriptionDelay is not defined, default to 100ms
804
        if cont.config.ApicSubscriptionDelay == 0 {
2✔
805
                cont.config.ApicSubscriptionDelay = 100
1✔
806
        }
1✔
807
        cont.log.Info("ApicSubscriptionDelay conf is set to: ", cont.config.ApicSubscriptionDelay)
1✔
808

1✔
809
        // If OpflexDeviceDeleteTimeout is not defined, default to 1800s
1✔
810
        if cont.config.OpflexDeviceDeleteTimeout == 0 {
2✔
811
                cont.config.OpflexDeviceDeleteTimeout = 1800
1✔
812
        }
1✔
813

814
        // If OpflexDeviceReconnectWaitTimeout is not defined, default to 25s
815
        if cont.config.OpflexDeviceReconnectWaitTimeout == 0 {
2✔
816
                cont.config.OpflexDeviceReconnectWaitTimeout = 25
1✔
817
        }
1✔
818
        cont.log.Debug("OpflexDeviceReconnectWaitTimeout set to: ", cont.config.OpflexDeviceReconnectWaitTimeout)
1✔
819

1✔
820
        // If SleepTimeSnatGlobalInfoSync is not defined, default to 60
1✔
821
        if cont.config.SleepTimeSnatGlobalInfoSync == 0 {
2✔
822
                cont.config.SleepTimeSnatGlobalInfoSync = 60
1✔
823
        }
1✔
824

825
        // If not defined, default to 32
826
        if cont.config.PodIpPoolChunkSize == 0 {
2✔
827
                cont.config.PodIpPoolChunkSize = 32
1✔
828
        }
1✔
829
        if !cont.isCNOEnabled() {
2✔
830
                cont.log.Info("PodIpPoolChunkSize conf is set to: ", cont.config.PodIpPoolChunkSize)
1✔
831
        }
1✔
832

833
        // If ApicConnectionRetryLimit is not defined, default to 5
834
        if cont.config.ApicConnectionRetryLimit == 0 {
2✔
835
                cont.config.ApicConnectionRetryLimit = 5
1✔
836
        }
1✔
837
        cont.log.Debug("ApicConnectionRetryLimit set to: ", cont.config.ApicConnectionRetryLimit)
1✔
838

1✔
839
        // If not valid, default to 5000-65000
1✔
840
        // other permissible values 1-65000
1✔
841
        defStart := 5000
1✔
842
        defEnd := 65000
1✔
843
        if cont.config.SnatDefaultPortRangeStart == 0 {
2✔
844
                cont.config.SnatDefaultPortRangeStart = defStart
1✔
845
        }
1✔
846
        if cont.config.SnatDefaultPortRangeEnd == 0 {
2✔
847
                cont.config.SnatDefaultPortRangeEnd = defEnd
1✔
848
        }
1✔
849
        if cont.config.SnatDefaultPortRangeStart < 0 || cont.config.SnatDefaultPortRangeEnd < 0 ||
1✔
850
                cont.config.SnatDefaultPortRangeStart > defEnd || cont.config.SnatDefaultPortRangeEnd > defEnd ||
1✔
851
                cont.config.SnatDefaultPortRangeStart > cont.config.SnatDefaultPortRangeEnd {
1✔
UNCOV
852
                cont.config.SnatDefaultPortRangeStart = defStart
×
UNCOV
853
                cont.config.SnatDefaultPortRangeEnd = defEnd
×
UNCOV
854
        }
×
855

856
        // Set default value for pbr programming delay if services list is not empty
857
        // and delay value is empty
858
        if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 &&
1✔
859
                cont.config.ServiceGraphEndpointAddDelay.Services != nil &&
1✔
860
                len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 {
1✔
UNCOV
861
                cont.config.ServiceGraphEndpointAddDelay.Delay = 90
×
UNCOV
862
        }
×
863
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
UNCOV
864
                cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay)
×
UNCOV
865
        }
×
866

867
        // Set contract scope for snat svc graph to global by default
868
        if cont.config.SnatSvcContractScope == "" {
2✔
869
                cont.config.SnatSvcContractScope = "global"
1✔
870
        }
1✔
871
        if cont.config.MaxSvcGraphNodes == 0 {
2✔
872
                cont.config.MaxSvcGraphNodes = 32
1✔
873
        }
1✔
874
        if !cont.isCNOEnabled() {
2✔
875
                cont.log.Info("Max number of nodes per svc graph is set to: ", cont.config.MaxSvcGraphNodes)
1✔
876
        }
1✔
877
        cont.apicConn, err = apicapi.New(cont.log, cont.config.ApicHosts,
1✔
878
                cont.config.ApicUsername, cont.config.ApicPassword,
1✔
879
                privKey, apicCert, cont.config.AciPrefix,
1✔
880
                refreshTimeout, refreshTickerAdjust, cont.config.LeafRebootCheckInterval, cont.config.ApicSubscriptionDelay,
1✔
881
                cont.config.AciVrfTenant, cont.UpdateLLDPIfLocked, cont.isCNOEnabled())
1✔
882
        if err != nil {
1✔
UNCOV
883
                panic(err)
×
884
        }
885

886
        cont.apicConn.FilterOpflexDevice = cont.config.FilterOpflexDevice
1✔
887
        cont.apicConn.Flavor = cont.config.Flavor
1✔
888
        cont.apicConn.VmmDomain = cont.config.AciVmmDomain
1✔
889
        cont.apicConn.ReconnectRetryLimit = cont.config.ApicConnectionRetryLimit
1✔
890
        cont.apicConn.RequestRetryDelay = cont.config.ApicRequestRetryDelay
1✔
891
        cont.apicConn.EnableRequestRetry = cont.config.EnableApicRequestRetry
1✔
892

1✔
893
        if len(cont.config.ApicHosts) != 0 {
1✔
894
        APIC_SWITCH:
×
895
                cont.log.WithFields(logrus.Fields{
×
896
                        "mod":  "APICAPI",
×
897
                        "host": cont.apicConn.Apic[cont.apicConn.ApicIndex],
×
898
                }).Debug("Connecting to APIC to determine the Version")
×
899

×
900
                version, err := cont.apicConn.GetVersion()
×
UNCOV
901
                if err != nil {
×
902
                        cont.log.Error("Could not get APIC version, switching to next APIC")
×
903
                        cont.apicConn.ApicIndex = (cont.apicConn.ApicIndex + 1) % len(cont.apicConn.Apic)
×
904
                        time.Sleep(cont.apicConn.ReconnectInterval)
×
905
                        goto APIC_SWITCH
×
906
                }
907
                cont.apicConn.CachedVersion = version
×
908
                apicapi.ApicVersion = version
×
909
                if version >= "4.2(4i)" {
×
910
                        cont.apicConn.SnatPbrFltrChain = true
×
911
                } else {
×
UNCOV
912
                        cont.apicConn.SnatPbrFltrChain = false
×
UNCOV
913
                }
×
UNCOV
914
                if version >= "5.2" {
×
UNCOV
915
                        cont.vmmClusterFaultSupported = true
×
UNCOV
916
                }
×
917
        } else { // For unit-tests
1✔
918
                cont.apicConn.SnatPbrFltrChain = true
1✔
919
        }
1✔
920

921
        if !cont.isCNOEnabled() {
2✔
922
                cont.log.Debug("SnatPbrFltrChain set to:", cont.apicConn.SnatPbrFltrChain)
1✔
923
                // Make sure Pod/NodeBDs are assoicated to same VRF.
1✔
924
                if len(cont.config.ApicHosts) != 0 && cont.config.AciPodBdDn != "" && cont.config.AciNodeBdDn != "" {
1✔
925
                        var expectedVrfRelations []string
×
926
                        expectedVrfRelations = append(expectedVrfRelations, cont.config.AciPodBdDn, cont.config.AciNodeBdDn)
×
UNCOV
927
                        cont.log.Debug("expectedVrfRelations:", expectedVrfRelations)
×
UNCOV
928
                        err = cont.apicConn.ValidateAciVrfAssociation(cont.config.AciVrfDn, expectedVrfRelations)
×
UNCOV
929
                        if err != nil {
×
UNCOV
930
                                cont.log.Error("Pod/NodeBDs and AciL3Out VRF association is incorrect")
×
UNCOV
931
                                panic(err)
×
932
                        }
933
                }
934
        }
935

936
        if len(cont.config.ApicHosts) != 0 && cont.vmmClusterFaultSupported && !cont.isCNOEnabled() {
1✔
937
                //Clear fault instances when the controller starts
×
938
                cont.clearFaultInstances()
×
939
                //Subscribe for vmmEpPD for a given domain
×
940
                var tnTargetFilterEpg string
×
941
                tnTargetFilterEpg += fmt.Sprintf("uni/vmmp-%s/dom-%s/", cont.vmmDomainProvider(), cont.config.AciVmmDomain)
×
942
                subnetTargetFilterEpg := fmt.Sprintf("and(wcard(vmmEpPD.dn,\"%s\"))", tnTargetFilterEpg)
×
943
                cont.apicConn.AddSubscriptionClass("vmmEpPD",
×
944
                        []string{"vmmEpPD"}, subnetTargetFilterEpg)
×
945
                cont.apicConn.SetSubscriptionHooks("vmmEpPD",
×
946
                        func(obj apicapi.ApicObject) bool {
×
947
                                cont.vmmEpPDChanged(obj)
×
UNCOV
948
                                return true
×
UNCOV
949
                        },
×
UNCOV
950
                        func(dn string) {
×
UNCOV
951
                                cont.vmmEpPDDeleted(dn)
×
UNCOV
952
                        })
×
953
        }
954

955
        cont.initStaticObjs()
1✔
956

1✔
957
        err = cont.env.PrepareRun(stopCh)
1✔
958
        if err != nil {
1✔
959
                panic(err.Error())
×
960
        }
961

962
        cont.apicConn.FullSyncHook = func() {
1✔
963
                // put a channel into each work queue and wait on it to
×
964
                // checkpoint object syncing in response to new subscription
×
965
                // updates
×
966
                cont.log.Debug("Starting checkpoint")
×
967
                var chans []chan struct{}
×
968
                qs := make([]workqueue.RateLimitingInterface, 0)
×
969
                _, ok := cont.env.(*K8sEnvironment)
×
970
                if ok {
×
971
                        qs = []workqueue.RateLimitingInterface{cont.podQueue}
×
972
                        if !cont.isCNOEnabled() {
×
973
                                if !cont.config.DisableHppRendering {
×
974
                                        qs = append(qs, cont.netPolQueue)
×
975
                                }
×
976
                                if cont.config.EnableHppDirect {
×
977
                                        qs = append(qs, cont.remIpContQueue)
×
UNCOV
978
                                }
×
UNCOV
979
                                qs = append(qs, cont.qosQueue, cont.serviceQueue,
×
980
                                        cont.snatQueue, cont.netflowQueue, cont.snatNodeInfoQueue,
×
981
                                        cont.rdConfigQueue, cont.erspanQueue,
×
982
                                        cont.epgDnCacheUpdateQueue)
×
983
                        }
984
                }
985
                for _, q := range qs {
×
986
                        c := make(chan struct{})
×
987
                        chans = append(chans, c)
×
988
                        q.Add(c)
×
UNCOV
989
                }
×
UNCOV
990
                for _, c := range chans {
×
UNCOV
991
                        <-c
×
992
                }
×
993
                cont.log.Debug("Checkpoint complete")
×
994
        }
995

996
        if len(cont.config.ApicHosts) != 0 && !cont.isCNOEnabled() {
1✔
UNCOV
997
                cont.BuildSubnetDnCache(cont.config.AciVrfDn, cont.config.AciVrfDn)
×
UNCOV
998
                cont.scheduleRdConfig()
×
UNCOV
999
                if strings.Contains(cont.config.Flavor, "openstack") {
×
UNCOV
1000
                        cont.setOpenStackSystemId()
×
UNCOV
1001
                }
×
1002
        }
1003

1004
        if !cont.isCNOEnabled() {
2✔
1005
                if cont.config.AciPolicyTenant != cont.config.AciVrfTenant {
2✔
1006
                        cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
1007
                                []string{"hostprotPol"})
1✔
1008
                }
1✔
1009
        } else {
1✔
1010
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
1011
                        []string{"fvBD", "fvAp"})
1✔
1012
                cont.apicConn.AddSubscriptionClass("fvnsVlanInstP",
1✔
1013
                        []string{"fvnsVlanInstP"}, "")
1✔
1014
                cont.apicConn.AddSubscriptionClass("infraRsDomP",
1✔
1015
                        []string{"infraRsDomP"}, "")
1✔
1016
                cont.apicConn.AddSubscriptionClass("physDomP",
1✔
1017
                        []string{"physDomP"}, "")
1✔
1018
                cont.apicConn.AddSubscriptionClass("l3extDomP",
1✔
1019
                        []string{"l3extDomP"}, "")
1✔
1020
                cont.apicConn.AddSubscriptionClass("infraRsVlanNs",
1✔
1021
                        []string{"infraRsVlanNs"}, "")
1✔
1022
                cont.apicConn.AddSubscriptionClass("infraGeneric",
1✔
1023
                        []string{"infraGeneric", "infraRsFuncToEpg"}, "")
1✔
1024
                cont.apicConn.AddSubscriptionClass("l3extOut",
1✔
1025
                        []string{"l3extInstP", "l3extSubnet", "fvRsCons", "fvRsProv", "l3extRsEctx", "l3extRsL3DomAtt", "l3extLNodeP", "l3extRsNodeL3OutAtt", "ipRouteP", "ipNexthopP", "l3extLIfP", "l3extVirtualLIfP", "l3extRsDynPathAtt",
1✔
1026
                                "l3extRsPathL3OutAtt", "l3extMember", "l3extIp", "bgpExtP", "bgpPeerP", "bgpAsP", "bgpLocalAsnP", "bgpRsPeerPfxPol"}, "")
1✔
1027
                cont.apicConn.AddSubscriptionClass("bgpPeerPfxPol",
1✔
1028
                        []string{"bgpPeerPfxPol"}, "")
1✔
1029
        }
1✔
1030

1031
        if cont.config.VmmLite {
1✔
1032
                cont.apicConn.AddSubscriptionClass("infraAttEntityP",
×
1033
                        []string{"infraRsFuncToEpg"}, "")
×
1034

×
1035
                cont.apicConn.SetSubscriptionHooks(
×
1036
                        "infraAttEntityP",
×
1037
                        func(obj apicapi.ApicObject) bool {
×
1038
                                cont.log.Debug("EPG attached to AAEP")
×
1039
                                cont.handleAaepEpgAttach(obj)
×
1040
                                return true
×
UNCOV
1041
                        },
×
UNCOV
1042
                        func(dn string) {
×
UNCOV
1043
                                cont.log.Debug("EPG detached from AAEP")
×
UNCOV
1044
                                cont.handleAaepEpgDetach(dn)
×
UNCOV
1045
                        },
×
1046
                )
1047
        }
1048

1049
        if !cont.isCNOEnabled() {
2✔
1050
                // When a new class is added for subscriptio, check if its name attribute
1✔
1051
                // is in the format aciPrefix-<some value>, if so add it in nameAttrClass
1✔
1052
                // in apicapi.go
1✔
1053
                subscribeMo := []string{"fvBD", "vnsLDevVip", "vnsAbsGraph", "vnsLDevCtx",
1✔
1054
                        "vzFilter", "vzBrCP", "l3extInstP", "vnsSvcRedirectPol",
1✔
1055
                        "vnsRedirectHealthGroup", "fvIPSLAMonitoringPol"}
1✔
1056
                if cont.config.AciPolicyTenant == cont.config.AciVrfTenant {
1✔
UNCOV
1057
                        subscribeMo = append(subscribeMo, "hostprotPol")
×
UNCOV
1058
                }
×
1059
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciVrfTenant,
1✔
1060
                        subscribeMo)
1✔
1061
                cont.apicConn.AddSubscriptionDn(fmt.Sprintf("uni/tn-%s/out-%s",
1✔
1062
                        cont.config.AciVrfTenant, cont.config.AciL3Out),
1✔
1063
                        []string{"fvRsCons"})
1✔
1064
                vmmDn := fmt.Sprintf("comp/prov-%s/ctrlr-[%s]-%s/injcont",
1✔
1065
                        cont.env.VmmPolicy(), cont.config.AciVmmDomain,
1✔
1066
                        cont.config.AciVmmController)
1✔
1067
                // Before subscribing to vmm objects, add vmmInjectedLabel as a child after explicit APIC version check
1✔
1068
                // Since it is not supported for APIC versions < "5.0"
1✔
1069
                cont.addVmmInjectedLabel()
1✔
1070
                cont.apicConn.AddSyncDn(vmmDn,
1✔
1071
                        []string{"vmmInjectedHost", "vmmInjectedNs"})
1✔
1072

1✔
1073
                var tnTargetFilter string
1✔
1074
                if len(cont.config.AciVrfRelatedTenants) > 0 {
1✔
UNCOV
1075
                        for _, tn := range cont.config.AciVrfRelatedTenants {
×
UNCOV
1076
                                tnTargetFilter += fmt.Sprintf("tn-%s/|", tn)
×
UNCOV
1077
                        }
×
1078
                } else {
1✔
1079
                        tnTargetFilter += fmt.Sprintf("tn-%s/|tn-%s/",
1✔
1080
                                cont.config.AciPolicyTenant, cont.config.AciVrfTenant)
1✔
1081
                }
1✔
1082
                subnetTargetFilter := fmt.Sprintf("and(wcard(fvSubnet.dn,\"%s\"))",
1✔
1083
                        tnTargetFilter)
1✔
1084
                cont.apicConn.AddSubscriptionClass("fvSubnet",
1✔
1085
                        []string{"fvSubnet"}, subnetTargetFilter)
1✔
1086

1✔
1087
                cont.apicConn.SetSubscriptionHooks("fvSubnet",
1✔
1088
                        func(obj apicapi.ApicObject) bool {
1✔
1089
                                cont.SubnetChanged(obj, cont.config.AciVrfDn)
×
UNCOV
1090
                                return true
×
UNCOV
1091
                        },
×
UNCOV
1092
                        func(dn string) {
×
UNCOV
1093
                                cont.SubnetDeleted(dn)
×
UNCOV
1094
                        })
×
1095

1096
                cont.apicConn.AddSubscriptionClass("opflexODev",
1✔
1097
                        []string{"opflexODev"}, "")
1✔
1098

1✔
1099
                cont.apicConn.SetSubscriptionHooks("opflexODev",
1✔
1100
                        func(obj apicapi.ApicObject) bool {
1✔
1101
                                cont.opflexDeviceChanged(obj)
×
UNCOV
1102
                                return true
×
UNCOV
1103
                        },
×
UNCOV
1104
                        func(dn string) {
×
UNCOV
1105
                                cont.opflexDeviceDeleted(dn)
×
UNCOV
1106
                        })
×
1107

1108
                if !cont.config.DisableServiceVlanPreprovisioning && !strings.Contains(cont.config.Flavor, "openstack") {
2✔
1109
                        if cont.config.AEP == "" {
2✔
1110
                                cont.log.Error("AEP is missing in configuration, preprovisioning of service vlan will be disabled")
1✔
1111
                        } else {
1✔
1112
                                infraRtAttEntPFilter := fmt.Sprintf("and(wcard(infraRtAttEntP.dn,\"/attentp-%s/\"))", cont.config.AEP)
×
1113
                                cont.apicConn.AddSubscriptionClass("infraRtAttEntP",
×
1114
                                        []string{"infraRtAttEntP"}, infraRtAttEntPFilter)
×
1115

×
1116
                                // For bare metal, the infraRtAttEntP associated with an AEP will be empty.
×
1117
                                // We should not receive any updates for such cases.
×
1118
                                cont.apicConn.SetSubscriptionHooks("infraRtAttEntP",
×
1119
                                        func(obj apicapi.ApicObject) bool {
×
1120
                                                cont.infraRtAttEntPChanged(obj)
×
UNCOV
1121
                                                return true
×
1122
                                        },
×
1123
                                        func(dn string) {
×
1124
                                                cont.infraRtAttEntPDeleted(dn)
×
1125
                                        })
×
1126

1127
                                cont.apicConn.AddSubscriptionClass("vpcIf",
×
1128
                                        []string{"vpcIf"}, "")
×
1129

×
1130
                                cont.apicConn.SetSubscriptionHooks("vpcIf",
×
1131
                                        func(obj apicapi.ApicObject) bool {
×
1132
                                                cont.vpcIfChanged(obj)
×
UNCOV
1133
                                                return true
×
UNCOV
1134
                                        },
×
UNCOV
1135
                                        func(dn string) {
×
UNCOV
1136
                                                cont.vpcIfDeleted(dn)
×
UNCOV
1137
                                        })
×
1138
                        }
1139
                }
1140

1141
                cont.apicConn.VersionUpdateHook =
1✔
1142
                        func() {
1✔
1143
                                cont.initStaticServiceObjs()
×
1144
                        }
×
1145
        } else if cont.config.VmmLite {
1✔
UNCOV
1146
                cont.apicConn.VMMLiteSyncHook = func() {
×
UNCOV
1147
                        cont.syncAndCleanNadCache()
×
UNCOV
1148
                        cont.syncAndCleanNads()
×
UNCOV
1149
                }
×
1150
        }
1151
        go cont.apicConn.Run(stopCh)
1✔
1152
}
1153

1154
func (cont *AciController) syncNodeAciPods(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1155
        cont.log.Debug("Go routine to periodically check the aci pod information in the opflexOdev of a node")
1✔
1156
        ticker := time.NewTicker(seconds * time.Second)
1✔
1157
        defer ticker.Stop()
1✔
1158

1✔
1159
        for {
2✔
1160
                select {
1✔
1161
                case <-ticker.C:
1✔
1162
                        if cont.config.EnableOpflexAgentReconnect {
1✔
UNCOV
1163
                                cont.checkChangeOfOpflexOdevAciPod()
×
UNCOV
1164
                        }
×
1165
                        if cont.config.AciMultipod {
1✔
UNCOV
1166
                                cont.checkChangeOfOdevAciPod()
×
UNCOV
1167
                        }
×
1168
                case <-stopCh:
1✔
1169
                        return
1✔
1170
                }
1171
        }
1172
}
1173

1174
func (cont *AciController) syncOpflexDevices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1175
        cont.log.Debug("Go routine to periodically delete old opflexdevices started")
1✔
1176
        ticker := time.NewTicker(seconds * time.Second)
1✔
1177
        defer ticker.Stop()
1✔
1178

1✔
1179
        for {
2✔
1180
                select {
1✔
1181
                case <-ticker.C:
1✔
1182
                        cont.deleteOldOpflexDevices()
1✔
1183
                case <-stopCh:
1✔
1184
                        return
1✔
1185
                }
1186
        }
1187
}
1188

1189
func (cont *AciController) syncDelayedEpSlices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1190
        cont.log.Debug("Go routine to periodically check and process the epslices having delay adding in service")
1✔
1191
        ticker := time.NewTicker(seconds * time.Second)
1✔
1192
        defer ticker.Stop()
1✔
1193

1✔
1194
        for {
2✔
1195
                select {
1✔
1196
                case <-ticker.C:
1✔
1197
                        cont.processDelayedEpSlices()
1✔
1198
                case <-stopCh:
1✔
1199
                        return
1✔
1200
                }
1201
        }
1202
}
1203

1204
func (cont *AciController) snatGlobalInfoSync(stopCh <-chan struct{}, seconds int) {
1✔
1205
        time.Sleep(time.Duration(seconds) * time.Second)
1✔
1206
        cont.log.Debug("Go routine to periodically sync globalinfo and nodeinfo started")
1✔
1207
        iteration := 0
1✔
1208
        for {
2✔
1209
                // To avoid noisy logs, only printing once in 5 minutes
1✔
1210
                if iteration%5 == 0 {
2✔
1211
                        cont.log.Debug("Syncing GlobalInfo with Node infos")
1✔
1212
                }
1✔
1213
                var nodeInfos []*nodeinfo.NodeInfo
1✔
1214
                cont.indexMutex.Lock()
1✔
1215
                cache.ListAll(cont.snatNodeInfoIndexer, labels.Everything(),
1✔
1216
                        func(nodeInfoObj interface{}) {
2✔
1217
                                nodeInfo := nodeInfoObj.(*nodeinfo.NodeInfo)
1✔
1218
                                nodeInfos = append(nodeInfos, nodeInfo)
1✔
1219
                        })
1✔
1220
                expectedmap := make(map[string]map[string]bool)
1✔
1221
                for _, glinfo := range cont.snatGlobalInfoCache {
2✔
1222
                        for nodename, entry := range glinfo {
2✔
1223
                                if _, found := expectedmap[nodename]; !found {
2✔
1224
                                        newentry := make(map[string]bool)
1✔
1225
                                        newentry[entry.SnatPolicyName] = true
1✔
1226
                                        expectedmap[nodename] = newentry
1✔
1227
                                } else {
2✔
1228
                                        currententry := expectedmap[nodename]
1✔
1229
                                        currententry[entry.SnatPolicyName] = true
1✔
1230
                                        expectedmap[nodename] = currententry
1✔
1231
                                }
1✔
1232
                        }
1233
                }
1234
                cont.indexMutex.Unlock()
1✔
1235

1✔
1236
                for _, value := range nodeInfos {
2✔
1237
                        marked := false
1✔
1238
                        policyNames := value.Spec.SnatPolicyNames
1✔
1239
                        nodeName := value.ObjectMeta.Name
1✔
1240
                        _, ok := expectedmap[nodeName]
1✔
1241
                        if !ok && len(policyNames) > 0 {
2✔
1242
                                cont.log.Info("Adding missing entry in snatglobalinfo for node: ", nodeName)
1✔
1243
                                cont.log.Debug("No snat policies found in snatglobalinfo")
1✔
1244
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1245
                                marked = true
1✔
1246
                        } else if len(policyNames) != len(expectedmap[nodeName]) {
3✔
1247
                                cont.log.Info("Adding missing snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1248
                                cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1249
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1250
                                marked = true
1✔
1251
                        } else {
2✔
1252
                                if len(policyNames) == 0 && len(expectedmap[nodeName]) == 0 {
1✔
UNCOV
1253
                                        // No snatpolicies present
×
UNCOV
1254
                                        continue
×
1255
                                }
1256
                                eq := reflect.DeepEqual(expectedmap[nodeName], policyNames)
1✔
1257
                                if !eq {
2✔
1258
                                        cont.log.Debug("Syncing inconsistent snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1259
                                        cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1260
                                        cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1261
                                        marked = true
1✔
1262
                                }
1✔
1263
                        }
1264
                        if marked {
2✔
1265
                                cont.log.Info("Nodeinfo and globalinfo out of sync for node: ", nodeName)
1✔
1266
                                nodeinfokey, err := cache.MetaNamespaceKeyFunc(value)
1✔
1267
                                if err != nil {
1✔
UNCOV
1268
                                        cont.log.Error("Not able to get key for node: ", nodeName)
×
UNCOV
1269
                                        continue
×
1270
                                }
1271
                                cont.log.Info("Queuing nodeinfokey for globalinfo sync: ", nodeinfokey)
1✔
1272
                                cont.queueNodeInfoUpdateByKey(nodeinfokey)
1✔
1273
                        } else if iteration%5 == 0 {
2✔
1274
                                cont.log.Info("Nodeinfo and globalinfo in sync for node: ", nodeName)
1✔
1275
                        }
1✔
1276
                }
1277
                time.Sleep(time.Duration(seconds) * time.Second)
1✔
1278
                iteration++
1✔
1279
        }
1280
}
1281

1282
func (cont *AciController) processSyncQueue(queue workqueue.RateLimitingInterface,
1283
        queueStop <-chan struct{}) {
1✔
1284
        go wait.Until(func() {
2✔
1285
                for {
2✔
1286
                        syncType, quit := queue.Get()
1✔
1287
                        if quit {
2✔
1288
                                break
1✔
1289
                        }
1290
                        var requeue bool
1✔
1291
                        if sType, ok := syncType.(string); ok {
2✔
1292
                                if f, ok := cont.syncProcessors[sType]; ok {
2✔
1293
                                        requeue = f()
1✔
1294
                                }
1✔
1295
                        }
1296
                        if requeue {
1✔
UNCOV
1297
                                queue.AddRateLimited(syncType)
×
1298
                        } else {
1✔
1299
                                queue.Forget(syncType)
1✔
1300
                        }
1✔
1301
                        queue.Done(syncType)
1✔
1302
                }
1303
        }, time.Second, queueStop)
1304
        <-queueStop
1✔
1305
        queue.ShutDown()
1✔
1306
}
1307

1308
func (cont *AciController) scheduleSyncGlobalInfo() {
1✔
1309
        cont.syncQueue.AddRateLimited("snatGlobalInfo")
1✔
1310
}
1✔
1311
func (cont *AciController) scheduleRdConfig() {
×
UNCOV
1312
        cont.syncQueue.AddRateLimited("rdConfig")
×
UNCOV
1313
}
×
UNCOV
1314
func (cont *AciController) scheduleCreateIstioCR() {
×
1315
        cont.syncQueue.AddRateLimited("istioCR")
×
1316
}
×
1317

1318
func (cont *AciController) addVmmInjectedLabel() {
1✔
1319
        if apicapi.ApicVersion >= "5.2" {
1✔
1320
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedLabel")
×
1321
                if err != nil {
×
UNCOV
1322
                        panic(err.Error())
×
1323
                }
UNCOV
1324
                err = apicapi.AddMetaDataChild("vmmInjectedSvc", "vmmInjectedLabel")
×
UNCOV
1325
                if err != nil {
×
UNCOV
1326
                        panic(err.Error())
×
1327
                }
1328
        }
1329
        if apicapi.ApicVersion >= "5.0" {
2✔
1330
                err := apicapi.AddMetaDataChild("vmmInjectedReplSet", "vmmInjectedLabel")
1✔
1331
                if err != nil {
1✔
UNCOV
1332
                        panic(err.Error())
×
1333
                }
1334
                err = apicapi.AddMetaDataChild("vmmInjectedContGrp", "vmmInjectedLabel")
1✔
1335
                if err != nil {
1✔
UNCOV
1336
                        panic(err.Error())
×
1337
                }
1338
                err = apicapi.AddMetaDataChild("vmmInjectedDepl", "vmmInjectedLabel")
1✔
1339
                if err != nil {
1✔
UNCOV
1340
                        panic(err.Error())
×
1341
                }
1342
        }
1343
}
1344

1345
func (cont *AciController) isCNOEnabled() bool {
1✔
1346
        return cont.config.ChainedMode || cont.config.VmmLite
1✔
1347
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc