• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 11070

06 Oct 2025 05:14PM UTC coverage: 65.889% (-0.06%) from 65.945%
11070

push

travis-pro

web-flow
Merge pull request #1580 from noironetworks/fixing_vmm_lite_cdets

Added fixes for CDETS

0 of 107 new or added lines in 2 files covered. (0.0%)

2 existing lines in 1 file now uncovered.

13359 of 20275 relevant lines covered (65.89%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.56
/pkg/controller/controller.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "encoding/json"
19
        "fmt"
20
        "net"
21
        "os"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/sirupsen/logrus"
29
        "github.com/yl2chen/cidranger"
30
        "golang.org/x/time/rate"
31

32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        v1net "k8s.io/api/networking/v1"
35
        "k8s.io/apimachinery/pkg/labels"
36
        "k8s.io/apimachinery/pkg/util/wait"
37
        "k8s.io/client-go/kubernetes"
38
        "k8s.io/client-go/tools/cache"
39
        "k8s.io/client-go/util/workqueue"
40

41
        "github.com/noironetworks/aci-containers/pkg/apicapi"
42
        fabattv1 "github.com/noironetworks/aci-containers/pkg/fabricattachment/apis/aci.fabricattachment/v1"
43
        fabattclset "github.com/noironetworks/aci-containers/pkg/fabricattachment/clientset/versioned"
44
        hppv1 "github.com/noironetworks/aci-containers/pkg/hpp/apis/aci.hpp/v1"
45
        "github.com/noironetworks/aci-containers/pkg/index"
46
        "github.com/noironetworks/aci-containers/pkg/ipam"
47
        istiov1 "github.com/noironetworks/aci-containers/pkg/istiocrd/apis/aci.istio/v1"
48
        "github.com/noironetworks/aci-containers/pkg/metadata"
49
        nodeinfo "github.com/noironetworks/aci-containers/pkg/nodeinfo/apis/aci.snat/v1"
50
        rdConfig "github.com/noironetworks/aci-containers/pkg/rdconfig/apis/aci.snat/v1"
51
        snatglobalinfo "github.com/noironetworks/aci-containers/pkg/snatglobalinfo/apis/aci.snat/v1"
52
        "github.com/noironetworks/aci-containers/pkg/util"
53
)
54

55
type podUpdateFunc func(*v1.Pod) (*v1.Pod, error)
56
type nodeUpdateFunc func(*v1.Node) (*v1.Node, error)
57
type serviceUpdateFunc func(*v1.Service) (*v1.Service, error)
58
type listNetworkPoliciesFunc func(string) (*v1net.NetworkPolicyList, error)
59
type listNamespacesFunc func() (*v1.NamespaceList, error)
60

61
type AciController struct {
62
        log    *logrus.Logger
63
        config *ControllerConfig
64
        env    Environment
65

66
        defaultEg string
67
        defaultSg string
68

69
        unitTestMode bool
70

71
        podQueue               workqueue.RateLimitingInterface
72
        netPolQueue            workqueue.RateLimitingInterface
73
        qosQueue               workqueue.RateLimitingInterface
74
        serviceQueue           workqueue.RateLimitingInterface
75
        snatQueue              workqueue.RateLimitingInterface
76
        netflowQueue           workqueue.RateLimitingInterface
77
        erspanQueue            workqueue.RateLimitingInterface
78
        snatNodeInfoQueue      workqueue.RateLimitingInterface
79
        rdConfigQueue          workqueue.RateLimitingInterface
80
        istioQueue             workqueue.RateLimitingInterface
81
        nodeFabNetAttQueue     workqueue.RateLimitingInterface
82
        netFabConfigQueue      workqueue.RateLimitingInterface
83
        nadVlanMapQueue        workqueue.RateLimitingInterface
84
        fabricVlanPoolQueue    workqueue.RateLimitingInterface
85
        netFabL3ConfigQueue    workqueue.RateLimitingInterface
86
        remIpContQueue         workqueue.RateLimitingInterface
87
        epgDnCacheUpdateQueue  workqueue.RateLimitingInterface
88
        aaepMonitorConfigQueue workqueue.RateLimitingInterface
89

90
        namespaceIndexer                     cache.Indexer
91
        namespaceInformer                    cache.Controller
92
        podIndexer                           cache.Indexer
93
        podInformer                          cache.Controller
94
        endpointsIndexer                     cache.Indexer
95
        endpointsInformer                    cache.Controller
96
        serviceIndexer                       cache.Indexer
97
        serviceInformer                      cache.Controller
98
        replicaSetIndexer                    cache.Indexer
99
        replicaSetInformer                   cache.Controller
100
        deploymentIndexer                    cache.Indexer
101
        deploymentInformer                   cache.Controller
102
        nodeIndexer                          cache.Indexer
103
        nodeInformer                         cache.Controller
104
        networkPolicyIndexer                 cache.Indexer
105
        networkPolicyInformer                cache.Controller
106
        snatIndexer                          cache.Indexer
107
        snatInformer                         cache.Controller
108
        snatNodeInfoIndexer                  cache.Indexer
109
        snatNodeInformer                     cache.Controller
110
        snatLocalInfoInformer                cache.Controller
111
        crdInformer                          cache.Controller
112
        rdConfigInformer                     cache.Controller
113
        rdConfigIndexer                      cache.Indexer
114
        qosIndexer                           cache.Indexer
115
        qosInformer                          cache.Controller
116
        netflowIndexer                       cache.Indexer
117
        netflowInformer                      cache.Controller
118
        erspanIndexer                        cache.Indexer
119
        erspanInformer                       cache.Controller
120
        nodePodIfIndexer                     cache.Indexer
121
        nodePodIfInformer                    cache.Controller
122
        istioIndexer                         cache.Indexer
123
        istioInformer                        cache.Controller
124
        endpointSliceIndexer                 cache.Indexer
125
        endpointSliceInformer                cache.Controller
126
        snatCfgInformer                      cache.Controller
127
        updatePod                            podUpdateFunc
128
        updateNode                           nodeUpdateFunc
129
        updateServiceStatus                  serviceUpdateFunc
130
        listNetworkPolicies                  listNetworkPoliciesFunc
131
        listNamespaces                       listNamespacesFunc
132
        nodeFabNetAttInformer                cache.SharedIndexInformer
133
        netFabConfigInformer                 cache.SharedIndexInformer
134
        nadVlanMapInformer                   cache.SharedIndexInformer
135
        fabricVlanPoolInformer               cache.SharedIndexInformer
136
        networkFabricL3ConfigurationInformer cache.SharedIndexInformer
137
        fabNetAttClient                      *fabattclset.Clientset
138
        proactiveConfInformer                cache.SharedIndexInformer
139
        aaepMonitorInformer                  cache.SharedIndexInformer
140
        poster                               *EventPoster
141

142
        indexMutex sync.Mutex
143
        hppMutex   sync.Mutex
144

145
        configuredPodNetworkIps *netIps
146
        podNetworkIps           *netIps
147
        serviceIps              *ipam.IpCache
148
        staticServiceIps        *netIps
149
        nodeServiceIps          *netIps
150

151
        // index of pods matched by deployments
152
        depPods *index.PodSelectorIndex
153
        // index of pods matched by network policies
154
        netPolPods *index.PodSelectorIndex
155
        // index of pods matched by network policy ingress rules
156
        netPolIngressPods *index.PodSelectorIndex
157
        // index of pods matched by network policy egress rules
158
        netPolEgressPods *index.PodSelectorIndex
159
        // index of IP addresses contained in endpoints objects
160
        endpointsIpIndex cidranger.Ranger
161
        // index of service target ports
162
        targetPortIndex map[string]*portIndexEntry
163
        // index of ip blocks referenced by network policy egress rules
164
        netPolSubnetIndex cidranger.Ranger
165
        // index of pods matched by erspan policies
166
        erspanPolPods *index.PodSelectorIndex
167

168
        apicConn *apicapi.ApicConnection
169

170
        nodeServiceMetaCache map[string]*nodeServiceMeta
171
        nodeACIPod           map[string]aciPodAnnot
172
        nodeACIPodAnnot      map[string]aciPodAnnot
173
        nodeOpflexDevice     map[string]apicapi.ApicSlice
174
        nodePodNetCache      map[string]*nodePodNetMeta
175
        serviceMetaCache     map[string]*serviceMeta
176
        snatPolicyCache      map[string]*ContSnatPolicy
177
        delayedEpSlices      []*DelayedEpSlice
178
        snatServices         map[string]bool
179
        snatNodeInfoCache    map[string]*nodeinfo.NodeInfo
180
        rdConfigCache        map[string]*rdConfig.RdConfig
181
        rdConfigSubnetCache  map[string]*rdConfig.RdConfigSpec
182
        istioCache           map[string]*istiov1.AciIstioOperator
183
        podIftoEp            map[string]*EndPointData
184
        // Node Name and Policy Name
185
        snatGlobalInfoCache map[string]map[string]*snatglobalinfo.GlobalInfo
186
        nodeSyncEnabled     bool
187
        serviceSyncEnabled  bool
188
        snatSyncEnabled     bool
189
        syncQueue           workqueue.RateLimitingInterface
190
        syncProcessors      map[string]func() bool
191
        serviceEndPoints    ServiceEndPointType
192
        crdHandlers         map[string]func(*AciController, <-chan struct{})
193
        stopCh              <-chan struct{}
194
        //index of containerportname to ctrPortNameEntry
195
        ctrPortNameCache map[string]*ctrPortNameEntry
196
        // named networkPolicies
197
        nmPortNp map[string]bool
198
        //maps network policy hash to hpp
199
        hppRef map[string]hppReference
200
        //map for ns to remoteIpConts
201
        nsRemoteIpCont map[string]remoteIpConts
202
        // cache to look for Epg DNs which are bound to Vmm domain
203
        cachedEpgDns             []string
204
        vmmClusterFaultSupported bool
205
        additionalNetworkCache   map[string]*AdditionalNetworkMeta
206
        //Used in Shared mode
207
        sharedEncapCache       map[int]*sharedEncapData
208
        sharedEncapAepCache    map[string]map[int]bool
209
        sharedEncapSviCache    map[int]*NfL3Data
210
        sharedEncapVrfCache    map[string]*NfVrfData
211
        sharedEncapTenantCache map[string]*NfTenantData
212
        nfl3configGenerationId int64
213
        // vlan to propertiesList
214
        sharedEncapNfcCache         map[int]*NfcData
215
        sharedEncapNfcVlanMap       map[int]*NfcData
216
        sharedEncapNfcLabelMap      map[string]*NfcData
217
        sharedEncapNfcAppProfileMap map[string]bool
218
        // nadVlanMap encapLabel to vlan
219
        sharedEncapLabelMap      map[string][]int
220
        lldpIfCache              map[string]*NfLLDPIfData
221
        globalVlanConfig         globalVlanConfig
222
        fabricVlanPoolMap        map[string]map[string]string
223
        openStackFabricPathDnMap map[string]openstackOpflexOdevInfo
224
        hostFabricPathDnMap      map[string]hostFabricInfo
225
        openStackSystemId        string
226
        sharedAaepMonitor        map[string][]*AaepMonitoringData
227
}
228

229
type hostFabricInfo struct {
230
        fabricPathDn string
231
        host         string
232
        vpcIfDn      map[string]struct{}
233
}
234

235
type NfLLDPIfData struct {
236
        LLDPIf string
237
        // As of now, manage at the NAD level
238
        // more granular introduces intf tracking complexities
239
        // for not sufficient benefits
240
        Refs map[string]bool
241
}
242

243
type NfL3OutData struct {
244
        // +kubebuilder:validation:Enum:"import"
245
        RtCtrl     string
246
        PodId      int
247
        RtrNodeMap map[int]*fabattv1.FabricL3OutRtrNode
248
        ExtEpgMap  map[string]*fabattv1.PolicyPrefixGroup
249
        SviMap     map[int]bool
250
}
251

252
type NfTenantData struct {
253
        CommonTenant     bool
254
        L3OutConfig      map[string]*NfL3OutData
255
        BGPPeerPfxConfig map[string]*fabattv1.BGPPeerPrefixPolicy
256
}
257

258
type NfVrfData struct {
259
        TenantConfig map[string]*NfTenantData
260
}
261

262
type NfL3Networks struct {
263
        fabattv1.PrimaryNetwork
264
        Subnets map[string]*fabattv1.FabricL3Subnet
265
}
266

267
type NfL3Data struct {
268
        Tenant      string
269
        Vrf         fabattv1.VRF
270
        PodId       int
271
        ConnectedNw *NfL3Networks
272
        NetAddr     map[string]*RoutedNetworkData
273
        Nodes       map[int]fabattv1.FabricL3OutNode
274
}
275

276
// maps pod name to remoteIpCont
277
type remoteIpConts map[string]remoteIpCont
278

279
// remoteIpCont maps ip to pod labels
280
type remoteIpCont map[string]map[string]string
281

282
type NfcData struct {
283
        Aeps map[string]bool
284
        Epg  fabattv1.Epg
285
}
286

287
type sharedEncapData struct {
288
        //node to NAD to pods
289
        Pods   map[string]map[string][]string
290
        NetRef map[string]*AdditionalNetworkMeta
291
        Aeps   map[string]bool
292
}
293

294
type globalVlanConfig struct {
295
        SharedPhysDom apicapi.ApicObject
296
        SharedL3Dom   apicapi.ApicObject
297
}
298

299
type hppReference struct {
300
        RefCount uint              `json:"ref-count,omitempty"`
301
        Npkeys   []string          `json:"npkeys,omitempty"`
302
        HppObj   apicapi.ApicSlice `json:"hpp-obj,omitempty"`
303
        HppCr    hppv1.HostprotPol `json:"hpp-cr,omitempty"`
304
}
305

306
type DelayedEpSlice struct {
307
        ServiceKey  string
308
        OldEpSlice  *discovery.EndpointSlice
309
        NewEpSlice  *discovery.EndpointSlice
310
        DelayedTime time.Time
311
}
312

313
type aciPodAnnot struct {
314
        aciPod             string
315
        isSingleOpflexOdev bool
316
        disconnectTime     time.Time
317
        connectTime        time.Time
318
        lastErrorTime      time.Time
319
}
320

321
type nodeServiceMeta struct {
322
        serviceEp metadata.ServiceEndpoint
323
}
324

325
type nodePodNetMeta struct {
326
        nodePods            map[string]bool
327
        podNetIps           metadata.NetIps
328
        podNetIpsAnnotation string
329
}
330

331
type openstackOpflexOdevInfo struct {
332
        opflexODevDn map[string]struct{}
333
        fabricPathDn string
334
}
335

336
type serviceMeta struct {
337
        requestedIps     []net.IP
338
        ingressIps       []net.IP
339
        staticIngressIps []net.IP
340
}
341

342
type ipIndexEntry struct {
343
        ipNet net.IPNet
344
        keys  map[string]bool
345
}
346

347
type targetPort struct {
348
        proto v1.Protocol
349
        ports []int
350
}
351

352
type portIndexEntry struct {
353
        port              targetPort
354
        serviceKeys       map[string]bool
355
        networkPolicyKeys map[string]bool
356
}
357

358
type portRangeSnat struct {
359
        start int
360
        end   int
361
}
362

363
// EndPointData holds PodIF data in controller.
364
type EndPointData struct {
365
        MacAddr    string
366
        EPG        string
367
        Namespace  string
368
        AppProfile string
369
}
370

371
type ctrPortNameEntry struct {
372
        // Proto+port->pods
373
        ctrNmpToPods map[string]map[string]bool
374
}
375

376
type LinkData struct {
377
        Link []string
378
        Pods []string
379
}
380

381
type RoutedNodeData struct {
382
        addr string
383
        idx  int
384
}
385

386
type RoutedNetworkData struct {
387
        subnet       string
388
        netAddress   string
389
        maskLen      int
390
        numAllocated int
391
        maxAddresses int
392
        baseAddress  net.IP
393
        nodeMap      map[string]RoutedNodeData
394
        availableMap map[int]bool
395
}
396

397
type AdditionalNetworkMeta struct {
398
        NetworkName string
399
        EncapVlan   string
400
        //node+localiface->fabricLinks
401
        FabricLink map[string]map[string]LinkData
402
        NodeCache  map[string]*fabattv1.NodeFabricNetworkAttachment
403
        Mode       util.EncapMode
404
}
405

406
type ServiceEndPointType interface {
407
        InitClientInformer(kubeClient *kubernetes.Clientset)
408
        Run(stopCh <-chan struct{})
409
        Wait(stopCh <-chan struct{})
410
        UpdateServicesForNode(nodename string)
411
        GetnodesMetadata(key string, service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint)
412
        SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool
413
        SetNpServiceAugmentForService(servicekey string, service *v1.Service, prs *portRemoteSubnet,
414
                portAugments map[string]*portServiceAugment, subnetIndex cidranger.Ranger, logger *logrus.Entry)
415
}
416

417
type serviceEndpoint struct {
418
        cont *AciController
419
}
420
type serviceEndpointSlice struct {
421
        cont *AciController
422
}
423

424
type AaepEpgAttachData struct {
425
        epgDn     string
426
        encapVlan int
427
}
428

429
type AaepMonitoringData struct {
430
        aaepEpgData   AaepEpgAttachData
431
        nadName       string
432
        namespaceName string
433
}
434

435
func (sep *serviceEndpoint) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
436
        sep.cont.initEndpointsInformerFromClient(kubeClient)
×
437
}
×
438

439
func (seps *serviceEndpointSlice) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
440
        seps.cont.initEndpointSliceInformerFromClient(kubeClient)
×
441
}
×
442

443
func (sep *serviceEndpoint) Run(stopCh <-chan struct{}) {
1✔
444
        go sep.cont.endpointsInformer.Run(stopCh)
1✔
445
}
1✔
446

447
func (seps *serviceEndpointSlice) Run(stopCh <-chan struct{}) {
1✔
448
        go seps.cont.endpointSliceInformer.Run(stopCh)
1✔
449
}
1✔
450

451
func (sep *serviceEndpoint) Wait(stopCh <-chan struct{}) {
1✔
452
        cache.WaitForCacheSync(stopCh,
1✔
453
                sep.cont.endpointsInformer.HasSynced,
1✔
454
                sep.cont.serviceInformer.HasSynced)
1✔
455
}
1✔
456

457
func (seps *serviceEndpointSlice) Wait(stopCh <-chan struct{}) {
1✔
458
        seps.cont.log.Debug("Waiting for EndPointSlicecache sync")
1✔
459
        cache.WaitForCacheSync(stopCh,
1✔
460
                seps.cont.endpointSliceInformer.HasSynced,
1✔
461
                seps.cont.serviceInformer.HasSynced)
1✔
462
}
1✔
463

464
func (e *ipIndexEntry) Network() net.IPNet {
1✔
465
        return e.ipNet
1✔
466
}
1✔
467

468
func newNodePodNetMeta() *nodePodNetMeta {
1✔
469
        return &nodePodNetMeta{
1✔
470
                nodePods: make(map[string]bool),
1✔
471
        }
1✔
472
}
1✔
473

474
func createQueue(name string) workqueue.RateLimitingInterface {
1✔
475
        return workqueue.NewNamedRateLimitingQueue(
1✔
476
                workqueue.NewMaxOfRateLimiter(
1✔
477
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
478
                                10*time.Second),
1✔
479
                        &workqueue.BucketRateLimiter{
1✔
480
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
481
                        },
1✔
482
                ),
1✔
483
                "delta")
1✔
484
}
1✔
485

486
func NewController(config *ControllerConfig, env Environment, log *logrus.Logger, unittestmode bool) *AciController {
1✔
487
        cont := &AciController{
1✔
488
                log:          log,
1✔
489
                config:       config,
1✔
490
                env:          env,
1✔
491
                defaultEg:    "",
1✔
492
                defaultSg:    "",
1✔
493
                unitTestMode: unittestmode,
1✔
494

1✔
495
                podQueue:               createQueue("pod"),
1✔
496
                netPolQueue:            createQueue("networkPolicy"),
1✔
497
                qosQueue:               createQueue("qos"),
1✔
498
                netflowQueue:           createQueue("netflow"),
1✔
499
                erspanQueue:            createQueue("erspan"),
1✔
500
                serviceQueue:           createQueue("service"),
1✔
501
                snatQueue:              createQueue("snat"),
1✔
502
                snatNodeInfoQueue:      createQueue("snatnodeinfo"),
1✔
503
                rdConfigQueue:          createQueue("rdconfig"),
1✔
504
                istioQueue:             createQueue("istio"),
1✔
505
                nodeFabNetAttQueue:     createQueue("nodefabricnetworkattachment"),
1✔
506
                netFabConfigQueue:      createQueue("networkfabricconfiguration"),
1✔
507
                nadVlanMapQueue:        createQueue("nadvlanmap"),
1✔
508
                fabricVlanPoolQueue:    createQueue("fabricvlanpool"),
1✔
509
                netFabL3ConfigQueue:    createQueue("networkfabricl3configuration"),
1✔
510
                remIpContQueue:         createQueue("remoteIpContainer"),
1✔
511
                epgDnCacheUpdateQueue:  createQueue("epgDnCache"),
1✔
512
                aaepMonitorConfigQueue: createQueue("aaepepgmap"),
1✔
513
                syncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
514
                        &workqueue.BucketRateLimiter{
1✔
515
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
516
                        }, "sync"),
1✔
517

1✔
518
                configuredPodNetworkIps: newNetIps(),
1✔
519
                podNetworkIps:           newNetIps(),
1✔
520
                serviceIps:              ipam.NewIpCache(),
1✔
521
                staticServiceIps:        newNetIps(),
1✔
522
                nodeServiceIps:          newNetIps(),
1✔
523

1✔
524
                nodeACIPod:       make(map[string]aciPodAnnot),
1✔
525
                nodeACIPodAnnot:  make(map[string]aciPodAnnot),
1✔
526
                nodeOpflexDevice: make(map[string]apicapi.ApicSlice),
1✔
527

1✔
528
                nodeServiceMetaCache:        make(map[string]*nodeServiceMeta),
1✔
529
                nodePodNetCache:             make(map[string]*nodePodNetMeta),
1✔
530
                serviceMetaCache:            make(map[string]*serviceMeta),
1✔
531
                snatPolicyCache:             make(map[string]*ContSnatPolicy),
1✔
532
                snatServices:                make(map[string]bool),
1✔
533
                snatNodeInfoCache:           make(map[string]*nodeinfo.NodeInfo),
1✔
534
                rdConfigCache:               make(map[string]*rdConfig.RdConfig),
1✔
535
                rdConfigSubnetCache:         make(map[string]*rdConfig.RdConfigSpec),
1✔
536
                podIftoEp:                   make(map[string]*EndPointData),
1✔
537
                snatGlobalInfoCache:         make(map[string]map[string]*snatglobalinfo.GlobalInfo),
1✔
538
                istioCache:                  make(map[string]*istiov1.AciIstioOperator),
1✔
539
                crdHandlers:                 make(map[string]func(*AciController, <-chan struct{})),
1✔
540
                ctrPortNameCache:            make(map[string]*ctrPortNameEntry),
1✔
541
                nmPortNp:                    make(map[string]bool),
1✔
542
                hppRef:                      make(map[string]hppReference),
1✔
543
                additionalNetworkCache:      make(map[string]*AdditionalNetworkMeta),
1✔
544
                sharedEncapCache:            make(map[int]*sharedEncapData),
1✔
545
                sharedEncapAepCache:         make(map[string]map[int]bool),
1✔
546
                sharedEncapSviCache:         make(map[int]*NfL3Data),
1✔
547
                sharedEncapVrfCache:         make(map[string]*NfVrfData),
1✔
548
                sharedEncapTenantCache:      make(map[string]*NfTenantData),
1✔
549
                sharedEncapNfcCache:         make(map[int]*NfcData),
1✔
550
                sharedEncapNfcVlanMap:       make(map[int]*NfcData),
1✔
551
                sharedEncapNfcLabelMap:      make(map[string]*NfcData),
1✔
552
                sharedEncapNfcAppProfileMap: make(map[string]bool),
1✔
553
                sharedEncapLabelMap:         make(map[string][]int),
1✔
554
                lldpIfCache:                 make(map[string]*NfLLDPIfData),
1✔
555
                fabricVlanPoolMap:           make(map[string]map[string]string),
1✔
556
                openStackFabricPathDnMap:    make(map[string]openstackOpflexOdevInfo),
1✔
557
                hostFabricPathDnMap:         make(map[string]hostFabricInfo),
1✔
558
                nsRemoteIpCont:              make(map[string]remoteIpConts),
1✔
559
                sharedAaepMonitor:           make(map[string][]*AaepMonitoringData),
1✔
560
        }
1✔
561
        cont.syncProcessors = map[string]func() bool{
1✔
562
                "snatGlobalInfo": cont.syncSnatGlobalInfo,
1✔
563
                "rdConfig":       cont.syncRdConfig,
1✔
564
                /* Commenting code to remove dependency from istio.io/istio package.
1✔
565
                   Vulnerabilties were detected by quay.io security scan of aci-containers-controller
1✔
566
                   and aci-containers-operator images for istio.io/istio package
1✔
567

1✔
568
                "istioCR":        cont.createIstioCR,
1✔
569
                */
1✔
570
        }
1✔
571
        return cont
1✔
572
}
1✔
573

574
func (cont *AciController) Init() {
×
575
        if cont.config.ChainedMode {
×
576
                cont.log.Info("In chained mode")
×
577
        }
×
578
        if cont.config.VmmLite {
×
579
                cont.log.Info("In VMM lite mode")
×
580
        }
×
581

582
        egdata, err := json.Marshal(cont.config.DefaultEg)
×
583
        if err != nil {
×
584
                cont.log.Error("Could not serialize default endpoint group")
×
585
                panic(err.Error())
×
586
        }
587
        cont.defaultEg = string(egdata)
×
588

×
589
        sgdata, err := json.Marshal(cont.config.DefaultSg)
×
590
        if err != nil {
×
591
                cont.log.Error("Could not serialize default security groups")
×
592
                panic(err.Error())
×
593
        }
594
        cont.defaultSg = string(sgdata)
×
595

×
596
        cont.log.Debug("Initializing IPAM")
×
597
        cont.initIpam()
×
598
        // check if the cluster supports endpoint slices
×
599
        // if cluster doesn't have the support fallback to endpoints
×
600
        kubeClient := cont.env.(*K8sEnvironment).kubeClient
×
601
        if util.IsEndPointSlicesSupported(kubeClient) {
×
602
                cont.serviceEndPoints = &serviceEndpointSlice{}
×
603
                cont.serviceEndPoints.(*serviceEndpointSlice).cont = cont
×
604
                cont.log.Info("Initializing ServiceEndpointSlices")
×
605
        } else {
×
606
                cont.serviceEndPoints = &serviceEndpoint{}
×
607
                cont.serviceEndPoints.(*serviceEndpoint).cont = cont
×
608
                cont.log.Info("Initializing ServiceEndpoints")
×
609
        }
×
610

611
        err = cont.env.Init(cont)
×
612
        if err != nil {
×
613
                panic(err.Error())
×
614
        }
615
}
616

617
func (cont *AciController) processQueue(queue workqueue.RateLimitingInterface,
618
        store cache.Store, handler func(interface{}) bool,
619
        deleteHandler func(string) bool,
620
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
621
        go wait.Until(func() {
2✔
622
                for {
2✔
623
                        key, quit := queue.Get()
1✔
624
                        if quit {
2✔
625
                                break
1✔
626
                        }
627

628
                        var requeue bool
1✔
629
                        switch key := key.(type) {
1✔
630
                        case chan struct{}:
×
631
                                close(key)
×
632
                        case string:
1✔
633
                                if strings.HasPrefix(key, "DELETED_") {
2✔
634
                                        delKey := strings.Trim(key, "DELETED_")
1✔
635
                                        requeue = deleteHandler(delKey)
1✔
636
                                } else {
2✔
637
                                        obj, exists, err := store.GetByKey(key)
1✔
638
                                        if err != nil {
1✔
639
                                                cont.log.Debugf("Error fetching object with key %s from store: %v", key, err)
×
640
                                        }
×
641
                                        //Handle Add/Update/Delete
642
                                        if exists && handler != nil {
2✔
643
                                                requeue = handler(obj)
1✔
644
                                        }
1✔
645
                                        //Handle Post Delete
646
                                        if !exists && postDelHandler != nil {
1✔
647
                                                requeue = postDelHandler()
×
648
                                        }
×
649
                                }
650
                        }
651
                        if requeue {
2✔
652
                                queue.AddRateLimited(key)
1✔
653
                        } else {
2✔
654
                                queue.Forget(key)
1✔
655
                        }
1✔
656
                        queue.Done(key)
1✔
657
                }
658
        }, time.Second, stopCh)
659
        <-stopCh
1✔
660
        queue.ShutDown()
1✔
661
}
662

663
func (cont *AciController) processRemIpContQueue(queue workqueue.RateLimitingInterface,
664
        handler func(interface{}) bool,
665
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
666
        go wait.Until(func() {
2✔
667
                for {
2✔
668
                        key, quit := queue.Get()
1✔
669
                        if quit {
2✔
670
                                break
1✔
671
                        }
672

673
                        var requeue bool
1✔
674
                        switch key := key.(type) {
1✔
675
                        case chan struct{}:
×
676
                                close(key)
×
677
                        case string:
1✔
678
                                if handler != nil {
2✔
679
                                        requeue = handler(key)
1✔
680
                                }
1✔
681
                                if postDelHandler != nil {
2✔
682
                                        requeue = postDelHandler()
1✔
683
                                }
1✔
684
                        }
685
                        if requeue {
1✔
686
                                queue.AddRateLimited(key)
×
687
                        } else {
1✔
688
                                queue.Forget(key)
1✔
689
                        }
1✔
690
                        queue.Done(key)
1✔
691

692
                }
693
        }, time.Second, stopCh)
694
        <-stopCh
1✔
695
        queue.ShutDown()
1✔
696
}
697

698
func (cont *AciController) processEpgDnCacheUpdateQueue(queue workqueue.RateLimitingInterface,
699
        handler func(interface{}) bool,
700
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
701
        go wait.Until(func() {
2✔
702
                for {
2✔
703
                        key, quit := queue.Get()
1✔
704
                        if quit {
2✔
705
                                break
1✔
706
                        }
707

708
                        var requeue bool
1✔
709
                        switch key := key.(type) {
1✔
710
                        case chan struct{}:
×
711
                                close(key)
×
712
                        case bool:
1✔
713
                                if handler != nil {
2✔
714
                                        requeue = handler(key)
1✔
715
                                }
1✔
716
                                if postDelHandler != nil {
1✔
717
                                        requeue = postDelHandler()
×
718
                                }
×
719
                        }
720
                        if requeue {
1✔
721
                                queue.AddRateLimited(key)
×
722
                        } else {
1✔
723
                                queue.Forget(key)
1✔
724
                        }
1✔
725
                        queue.Done(key)
1✔
726

727
                }
728
        }, time.Second, stopCh)
729
        <-stopCh
1✔
730
        queue.ShutDown()
1✔
731
}
732

733
func (cont *AciController) globalStaticObjs() apicapi.ApicSlice {
1✔
734
        return apicapi.ApicSlice{}
1✔
735
}
1✔
736

737
func (cont *AciController) aciNameForKey(ktype, key string) string {
1✔
738
        return util.AciNameForKey(cont.config.AciPrefix, ktype, key)
1✔
739
}
1✔
740

741
func (cont *AciController) initStaticObjs() {
1✔
742
        cont.env.InitStaticAciObjects()
1✔
743
        cont.apicConn.WriteStaticApicObjects(cont.config.AciPrefix+"_static",
1✔
744
                cont.globalStaticObjs())
1✔
745
}
1✔
746

747
func (cont *AciController) vmmDomainProvider() (vmmProv string) {
1✔
748
        vmmProv = "Kubernetes"
1✔
749
        if strings.ToLower(cont.config.AciVmmDomainType) == "openshift" {
1✔
750
                vmmProv = "OpenShift"
×
751
        }
×
752
        return
1✔
753
}
754

755
func (cont *AciController) Run(stopCh <-chan struct{}) {
1✔
756
        var err error
1✔
757
        var privKey []byte
1✔
758
        var apicCert []byte
1✔
759

1✔
760
        cont.config.AciVrfDn = "uni/tn-" + cont.config.AciVrfTenant + "/ctx-" + cont.config.AciVrf
1✔
761

1✔
762
        if cont.config.ApicPrivateKeyPath != "" {
1✔
763
                privKey, err = os.ReadFile(cont.config.ApicPrivateKeyPath)
×
764
                if err != nil {
×
765
                        panic(err)
×
766
                }
767
        }
768
        if cont.config.ApicCertPath != "" {
1✔
769
                apicCert, err = os.ReadFile(cont.config.ApicCertPath)
×
770
                if err != nil {
×
771
                        panic(err)
×
772
                }
773
        }
774
        // If not defined, default is 1800
775
        if cont.config.ApicRefreshTimer == "" {
2✔
776
                cont.config.ApicRefreshTimer = "1800"
1✔
777
        }
1✔
778
        refreshTimeout, err := strconv.Atoi(cont.config.ApicRefreshTimer)
1✔
779
        if err != nil {
1✔
780
                panic(err)
×
781
        }
782
        cont.log.Info("ApicRefreshTimer conf is set to: ", refreshTimeout)
1✔
783

1✔
784
        // Bailout if the refreshTimeout is more than 12Hours or less than 5Mins
1✔
785
        if refreshTimeout > (12*60*60) || refreshTimeout < (5*60) {
1✔
786
                cont.log.Info("ApicRefreshTimer can't be more than 12Hrs or less than 5Mins")
×
787
                panic(err)
×
788
        }
789

790
        // If RefreshTickerAdjustInterval is not defined, default to 150Sec.
791
        if cont.config.ApicRefreshTickerAdjust == "" {
2✔
792
                cont.config.ApicRefreshTickerAdjust = "210"
1✔
793
        }
1✔
794
        refreshTickerAdjust, err := strconv.Atoi(cont.config.ApicRefreshTickerAdjust)
1✔
795
        if err != nil {
1✔
796
                panic(err)
×
797
        }
798

799
        //If ApicSubscriptionDelay is not defined, default to 100ms
800
        if cont.config.ApicSubscriptionDelay == 0 {
2✔
801
                cont.config.ApicSubscriptionDelay = 100
1✔
802
        }
1✔
803
        cont.log.Info("ApicSubscriptionDelay conf is set to: ", cont.config.ApicSubscriptionDelay)
1✔
804

1✔
805
        // If OpflexDeviceDeleteTimeout is not defined, default to 1800s
1✔
806
        if cont.config.OpflexDeviceDeleteTimeout == 0 {
2✔
807
                cont.config.OpflexDeviceDeleteTimeout = 1800
1✔
808
        }
1✔
809

810
        // If OpflexDeviceReconnectWaitTimeout is not defined, default to 25s
811
        if cont.config.OpflexDeviceReconnectWaitTimeout == 0 {
2✔
812
                cont.config.OpflexDeviceReconnectWaitTimeout = 25
1✔
813
        }
1✔
814
        cont.log.Debug("OpflexDeviceReconnectWaitTimeout set to: ", cont.config.OpflexDeviceReconnectWaitTimeout)
1✔
815

1✔
816
        // If SleepTimeSnatGlobalInfoSync is not defined, default to 60
1✔
817
        if cont.config.SleepTimeSnatGlobalInfoSync == 0 {
2✔
818
                cont.config.SleepTimeSnatGlobalInfoSync = 60
1✔
819
        }
1✔
820

821
        // If not defined, default to 32
822
        if cont.config.PodIpPoolChunkSize == 0 {
2✔
823
                cont.config.PodIpPoolChunkSize = 32
1✔
824
        }
1✔
825
        if !cont.isCNOEnabled() {
2✔
826
                cont.log.Info("PodIpPoolChunkSize conf is set to: ", cont.config.PodIpPoolChunkSize)
1✔
827
        }
1✔
828

829
        // If ApicConnectionRetryLimit is not defined, default to 5
830
        if cont.config.ApicConnectionRetryLimit == 0 {
2✔
831
                cont.config.ApicConnectionRetryLimit = 5
1✔
832
        }
1✔
833
        cont.log.Debug("ApicConnectionRetryLimit set to: ", cont.config.ApicConnectionRetryLimit)
1✔
834

1✔
835
        // If not valid, default to 5000-65000
1✔
836
        // other permissible values 1-65000
1✔
837
        defStart := 5000
1✔
838
        defEnd := 65000
1✔
839
        if cont.config.SnatDefaultPortRangeStart == 0 {
2✔
840
                cont.config.SnatDefaultPortRangeStart = defStart
1✔
841
        }
1✔
842
        if cont.config.SnatDefaultPortRangeEnd == 0 {
2✔
843
                cont.config.SnatDefaultPortRangeEnd = defEnd
1✔
844
        }
1✔
845
        if cont.config.SnatDefaultPortRangeStart < 0 || cont.config.SnatDefaultPortRangeEnd < 0 ||
1✔
846
                cont.config.SnatDefaultPortRangeStart > defEnd || cont.config.SnatDefaultPortRangeEnd > defEnd ||
1✔
847
                cont.config.SnatDefaultPortRangeStart > cont.config.SnatDefaultPortRangeEnd {
1✔
848
                cont.config.SnatDefaultPortRangeStart = defStart
×
849
                cont.config.SnatDefaultPortRangeEnd = defEnd
×
850
        }
×
851

852
        // Set default value for pbr programming delay if services list is not empty
853
        // and delay value is empty
854
        if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 &&
1✔
855
                cont.config.ServiceGraphEndpointAddDelay.Services != nil &&
1✔
856
                len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 {
1✔
857
                cont.config.ServiceGraphEndpointAddDelay.Delay = 90
×
858
        }
×
859
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
860
                cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay)
×
861
        }
×
862

863
        // Set contract scope for snat svc graph to global by default
864
        if cont.config.SnatSvcContractScope == "" {
2✔
865
                cont.config.SnatSvcContractScope = "global"
1✔
866
        }
1✔
867
        if cont.config.MaxSvcGraphNodes == 0 {
2✔
868
                cont.config.MaxSvcGraphNodes = 32
1✔
869
        }
1✔
870
        if !cont.isCNOEnabled() {
2✔
871
                cont.log.Info("Max number of nodes per svc graph is set to: ", cont.config.MaxSvcGraphNodes)
1✔
872
        }
1✔
873
        cont.apicConn, err = apicapi.New(cont.log, cont.config.ApicHosts,
1✔
874
                cont.config.ApicUsername, cont.config.ApicPassword,
1✔
875
                privKey, apicCert, cont.config.AciPrefix,
1✔
876
                refreshTimeout, refreshTickerAdjust, cont.config.ApicSubscriptionDelay,
1✔
877
                cont.config.AciVrfTenant, cont.UpdateLLDPIfLocked)
1✔
878
        if err != nil {
1✔
879
                panic(err)
×
880
        }
881

882
        cont.apicConn.FilterOpflexDevice = cont.config.FilterOpflexDevice
1✔
883
        cont.apicConn.Flavor = cont.config.Flavor
1✔
884
        cont.apicConn.VmmDomain = cont.config.AciVmmDomain
1✔
885
        cont.apicConn.ReconnectRetryLimit = cont.config.ApicConnectionRetryLimit
1✔
886
        cont.apicConn.RequestRetryDelay = cont.config.ApicRequestRetryDelay
1✔
887
        cont.apicConn.EnableRequestRetry = cont.config.EnableApicRequestRetry
1✔
888

1✔
889
        if len(cont.config.ApicHosts) != 0 {
1✔
890
        APIC_SWITCH:
×
891
                cont.log.WithFields(logrus.Fields{
×
892
                        "mod":  "APICAPI",
×
893
                        "host": cont.apicConn.Apic[cont.apicConn.ApicIndex],
×
894
                }).Debug("Connecting to APIC to determine the Version")
×
895

×
896
                version, err := cont.apicConn.GetVersion()
×
897
                if err != nil {
×
898
                        cont.log.Error("Could not get APIC version, switching to next APIC")
×
899
                        cont.apicConn.ApicIndex = (cont.apicConn.ApicIndex + 1) % len(cont.apicConn.Apic)
×
900
                        time.Sleep(cont.apicConn.ReconnectInterval)
×
901
                        goto APIC_SWITCH
×
902
                }
903
                cont.apicConn.CachedVersion = version
×
904
                apicapi.ApicVersion = version
×
905
                if version >= "4.2(4i)" {
×
906
                        cont.apicConn.SnatPbrFltrChain = true
×
907
                } else {
×
908
                        cont.apicConn.SnatPbrFltrChain = false
×
909
                }
×
910
                if version >= "5.2" {
×
911
                        cont.vmmClusterFaultSupported = true
×
912
                }
×
913
        } else { // For unit-tests
1✔
914
                cont.apicConn.SnatPbrFltrChain = true
1✔
915
        }
1✔
916

917
        if !cont.isCNOEnabled() {
2✔
918
                cont.log.Debug("SnatPbrFltrChain set to:", cont.apicConn.SnatPbrFltrChain)
1✔
919
                // Make sure Pod/NodeBDs are assoicated to same VRF.
1✔
920
                if len(cont.config.ApicHosts) != 0 && cont.config.AciPodBdDn != "" && cont.config.AciNodeBdDn != "" {
1✔
921
                        var expectedVrfRelations []string
×
922
                        expectedVrfRelations = append(expectedVrfRelations, cont.config.AciPodBdDn, cont.config.AciNodeBdDn)
×
923
                        cont.log.Debug("expectedVrfRelations:", expectedVrfRelations)
×
924
                        err = cont.apicConn.ValidateAciVrfAssociation(cont.config.AciVrfDn, expectedVrfRelations)
×
925
                        if err != nil {
×
926
                                cont.log.Error("Pod/NodeBDs and AciL3Out VRF association is incorrect")
×
927
                                panic(err)
×
928
                        }
929
                }
930
        }
931

932
        if len(cont.config.ApicHosts) != 0 && cont.vmmClusterFaultSupported && !cont.isCNOEnabled() {
1✔
933
                //Clear fault instances when the controller starts
×
934
                cont.clearFaultInstances()
×
935
                //Subscribe for vmmEpPD for a given domain
×
936
                var tnTargetFilterEpg string
×
937
                tnTargetFilterEpg += fmt.Sprintf("uni/vmmp-%s/dom-%s/", cont.vmmDomainProvider(), cont.config.AciVmmDomain)
×
938
                subnetTargetFilterEpg := fmt.Sprintf("and(wcard(vmmEpPD.dn,\"%s\"))", tnTargetFilterEpg)
×
939
                cont.apicConn.AddSubscriptionClass("vmmEpPD",
×
940
                        []string{"vmmEpPD"}, subnetTargetFilterEpg)
×
941
                cont.apicConn.SetSubscriptionHooks("vmmEpPD",
×
942
                        func(obj apicapi.ApicObject) bool {
×
943
                                cont.vmmEpPDChanged(obj)
×
944
                                return true
×
945
                        },
×
946
                        func(dn string) {
×
947
                                cont.vmmEpPDDeleted(dn)
×
948
                        })
×
949
        }
950

951
        cont.initStaticObjs()
1✔
952

1✔
953
        err = cont.env.PrepareRun(stopCh)
1✔
954
        if err != nil {
1✔
955
                panic(err.Error())
×
956
        }
957

958
        cont.apicConn.FullSyncHook = func() {
1✔
959
                // put a channel into each work queue and wait on it to
×
960
                // checkpoint object syncing in response to new subscription
×
961
                // updates
×
962
                cont.log.Debug("Starting checkpoint")
×
963
                var chans []chan struct{}
×
964
                qs := make([]workqueue.RateLimitingInterface, 0)
×
965
                _, ok := cont.env.(*K8sEnvironment)
×
966
                if ok {
×
967
                        qs = []workqueue.RateLimitingInterface{cont.podQueue}
×
968
                        if !cont.isCNOEnabled() {
×
969
                                if !cont.config.DisableHppRendering {
×
970
                                        qs = append(qs, cont.netPolQueue)
×
971
                                }
×
972
                                if cont.config.EnableHppDirect {
×
973
                                        qs = append(qs, cont.remIpContQueue)
×
974
                                }
×
975
                                qs = append(qs, cont.qosQueue, cont.serviceQueue,
×
976
                                        cont.snatQueue, cont.netflowQueue, cont.snatNodeInfoQueue,
×
977
                                        cont.rdConfigQueue, cont.erspanQueue,
×
978
                                        cont.epgDnCacheUpdateQueue)
×
979
                        }
980
                }
981
                for _, q := range qs {
×
982
                        c := make(chan struct{})
×
983
                        chans = append(chans, c)
×
984
                        q.Add(c)
×
985
                }
×
986
                for _, c := range chans {
×
987
                        <-c
×
988
                }
×
989
                cont.log.Debug("Checkpoint complete")
×
990
        }
991

992
        if len(cont.config.ApicHosts) != 0 && !cont.isCNOEnabled() {
1✔
993
                cont.BuildSubnetDnCache(cont.config.AciVrfDn, cont.config.AciVrfDn)
×
994
                cont.scheduleRdConfig()
×
995
                if strings.Contains(cont.config.Flavor, "openstack") {
×
996
                        cont.setOpenStackSystemId()
×
997
                }
×
998
        }
999

1000
        if !cont.isCNOEnabled() {
2✔
1001
                if cont.config.AciPolicyTenant != cont.config.AciVrfTenant {
2✔
1002
                        cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
1003
                                []string{"hostprotPol"})
1✔
1004
                }
1✔
1005
        } else {
1✔
1006
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
1007
                        []string{"fvBD", "fvAp"})
1✔
1008
                cont.apicConn.AddSubscriptionClass("fvnsVlanInstP",
1✔
1009
                        []string{"fvnsVlanInstP"}, "")
1✔
1010
                cont.apicConn.AddSubscriptionClass("infraRsDomP",
1✔
1011
                        []string{"infraRsDomP"}, "")
1✔
1012
                cont.apicConn.AddSubscriptionClass("physDomP",
1✔
1013
                        []string{"physDomP"}, "")
1✔
1014
                cont.apicConn.AddSubscriptionClass("l3extDomP",
1✔
1015
                        []string{"l3extDomP"}, "")
1✔
1016
                cont.apicConn.AddSubscriptionClass("infraRsVlanNs",
1✔
1017
                        []string{"infraRsVlanNs"}, "")
1✔
1018
                cont.apicConn.AddSubscriptionClass("infraGeneric",
1✔
1019
                        []string{"infraGeneric", "infraRsFuncToEpg"}, "")
1✔
1020
                cont.apicConn.AddSubscriptionClass("l3extOut",
1✔
1021
                        []string{"l3extInstP", "l3extSubnet", "fvRsCons", "fvRsProv", "l3extRsEctx", "l3extRsL3DomAtt", "l3extLNodeP", "l3extRsNodeL3OutAtt", "ipRouteP", "ipNexthopP", "l3extLIfP", "l3extVirtualLIfP", "l3extRsDynPathAtt",
1✔
1022
                                "l3extRsPathL3OutAtt", "l3extMember", "l3extIp", "bgpExtP", "bgpPeerP", "bgpAsP", "bgpLocalAsnP", "bgpRsPeerPfxPol"}, "")
1✔
1023
                cont.apicConn.AddSubscriptionClass("bgpPeerPfxPol",
1✔
1024
                        []string{"bgpPeerPfxPol"}, "")
1✔
1025
        }
1✔
1026

1027
        if cont.config.VmmLite {
1✔
1028
                cont.apicConn.AddSubscriptionClass("infraAttEntityP",
×
1029
                        []string{"infraRsFuncToEpg"}, "")
×
1030

×
1031
                cont.apicConn.SetSubscriptionHooks(
×
1032
                        "infraAttEntityP",
×
1033
                        func(obj apicapi.ApicObject) bool {
×
NEW
1034
                                cont.log.Debug("EPG attached to AAEP")
×
1035
                                cont.handleAaepEpgAttach(obj)
×
1036
                                return true
×
1037
                        },
×
1038
                        func(dn string) {
×
NEW
1039
                                cont.log.Debug("EPG detached from AAEP")
×
1040
                                cont.handleAaepEpgDetach(dn)
×
1041
                        },
×
1042
                )
1043
        }
1044

1045
        if !cont.isCNOEnabled() {
2✔
1046
                // When a new class is added for subscriptio, check if its name attribute
1✔
1047
                // is in the format aciPrefix-<some value>, if so add it in nameAttrClass
1✔
1048
                // in apicapi.go
1✔
1049
                subscribeMo := []string{"fvBD", "vnsLDevVip", "vnsAbsGraph", "vnsLDevCtx",
1✔
1050
                        "vzFilter", "vzBrCP", "l3extInstP", "vnsSvcRedirectPol",
1✔
1051
                        "vnsRedirectHealthGroup", "fvIPSLAMonitoringPol"}
1✔
1052
                if cont.config.AciPolicyTenant == cont.config.AciVrfTenant {
1✔
1053
                        subscribeMo = append(subscribeMo, "hostprotPol")
×
1054
                }
×
1055
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciVrfTenant,
1✔
1056
                        subscribeMo)
1✔
1057
                cont.apicConn.AddSubscriptionDn(fmt.Sprintf("uni/tn-%s/out-%s",
1✔
1058
                        cont.config.AciVrfTenant, cont.config.AciL3Out),
1✔
1059
                        []string{"fvRsCons"})
1✔
1060
                vmmDn := fmt.Sprintf("comp/prov-%s/ctrlr-[%s]-%s/injcont",
1✔
1061
                        cont.env.VmmPolicy(), cont.config.AciVmmDomain,
1✔
1062
                        cont.config.AciVmmController)
1✔
1063
                // Before subscribing to vmm objects, add vmmInjectedLabel as a child after explicit APIC version check
1✔
1064
                // Since it is not supported for APIC versions < "5.0"
1✔
1065
                cont.addVmmInjectedLabel()
1✔
1066
                cont.apicConn.AddSubscriptionDn(vmmDn,
1✔
1067
                        []string{"vmmInjectedHost", "vmmInjectedNs"})
1✔
1068

1✔
1069
                var tnTargetFilter string
1✔
1070
                if len(cont.config.AciVrfRelatedTenants) > 0 {
1✔
1071
                        for _, tn := range cont.config.AciVrfRelatedTenants {
×
1072
                                tnTargetFilter += fmt.Sprintf("tn-%s/|", tn)
×
1073
                        }
×
1074
                } else {
1✔
1075
                        tnTargetFilter += fmt.Sprintf("tn-%s/|tn-%s/",
1✔
1076
                                cont.config.AciPolicyTenant, cont.config.AciVrfTenant)
1✔
1077
                }
1✔
1078
                subnetTargetFilter := fmt.Sprintf("and(wcard(fvSubnet.dn,\"%s\"))",
1✔
1079
                        tnTargetFilter)
1✔
1080
                cont.apicConn.AddSubscriptionClass("fvSubnet",
1✔
1081
                        []string{"fvSubnet"}, subnetTargetFilter)
1✔
1082

1✔
1083
                cont.apicConn.SetSubscriptionHooks("fvSubnet",
1✔
1084
                        func(obj apicapi.ApicObject) bool {
1✔
1085
                                cont.SubnetChanged(obj, cont.config.AciVrfDn)
×
1086
                                return true
×
1087
                        },
×
1088
                        func(dn string) {
×
1089
                                cont.SubnetDeleted(dn)
×
1090
                        })
×
1091

1092
                cont.apicConn.AddSubscriptionClass("opflexODev",
1✔
1093
                        []string{"opflexODev"}, "")
1✔
1094

1✔
1095
                cont.apicConn.SetSubscriptionHooks("opflexODev",
1✔
1096
                        func(obj apicapi.ApicObject) bool {
1✔
1097
                                cont.opflexDeviceChanged(obj)
×
1098
                                return true
×
1099
                        },
×
1100
                        func(dn string) {
×
1101
                                cont.opflexDeviceDeleted(dn)
×
1102
                        })
×
1103

1104
                if !cont.config.DisableServiceVlanPreprovisioning && !strings.Contains(cont.config.Flavor, "openstack") {
2✔
1105
                        if cont.config.AEP == "" {
2✔
1106
                                cont.log.Error("AEP is missing in configuration, preprovisioning of service vlan will be disabled")
1✔
1107
                        } else {
1✔
1108
                                infraRtAttEntPFilter := fmt.Sprintf("and(wcard(infraRtAttEntP.dn,\"/attentp-%s/\"))", cont.config.AEP)
×
1109
                                cont.apicConn.AddSubscriptionClass("infraRtAttEntP",
×
1110
                                        []string{"infraRtAttEntP"}, infraRtAttEntPFilter)
×
1111

×
1112
                                // For bare metal, the infraRtAttEntP associated with an AEP will be empty.
×
1113
                                // We should not receive any updates for such cases.
×
1114
                                cont.apicConn.SetSubscriptionHooks("infraRtAttEntP",
×
1115
                                        func(obj apicapi.ApicObject) bool {
×
1116
                                                cont.infraRtAttEntPChanged(obj)
×
1117
                                                return true
×
1118
                                        },
×
1119
                                        func(dn string) {
×
1120
                                                cont.infraRtAttEntPDeleted(dn)
×
1121
                                        })
×
1122

1123
                                cont.apicConn.AddSubscriptionClass("vpcIf",
×
1124
                                        []string{"vpcIf"}, "")
×
1125

×
1126
                                cont.apicConn.SetSubscriptionHooks("vpcIf",
×
1127
                                        func(obj apicapi.ApicObject) bool {
×
1128
                                                cont.vpcIfChanged(obj)
×
1129
                                                return true
×
1130
                                        },
×
1131
                                        func(dn string) {
×
1132
                                                cont.vpcIfDeleted(dn)
×
1133
                                        })
×
1134
                        }
1135
                }
1136

1137
                cont.apicConn.VersionUpdateHook =
1✔
1138
                        func() {
1✔
1139
                                cont.initStaticServiceObjs()
×
1140
                        }
×
1141
        }
1142
        go cont.apicConn.Run(stopCh)
1✔
1143
}
1144

1145
func (cont *AciController) syncNodeAciPods(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1146
        cont.log.Debug("Go routine to periodically check the aci pod information in the opflexOdev of a node")
1✔
1147
        ticker := time.NewTicker(seconds * time.Second)
1✔
1148
        defer ticker.Stop()
1✔
1149

1✔
1150
        for {
2✔
1151
                select {
1✔
1152
                case <-ticker.C:
1✔
1153
                        if cont.config.EnableOpflexAgentReconnect {
1✔
1154
                                cont.checkChangeOfOpflexOdevAciPod()
×
1155
                        }
×
1156
                        if cont.config.AciMultipod {
1✔
1157
                                cont.checkChangeOfOdevAciPod()
×
1158
                        }
×
1159
                case <-stopCh:
1✔
1160
                        return
1✔
1161
                }
1162
        }
1163
}
1164

1165
func (cont *AciController) syncOpflexDevices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1166
        cont.log.Debug("Go routine to periodically delete old opflexdevices started")
1✔
1167
        ticker := time.NewTicker(seconds * time.Second)
1✔
1168
        defer ticker.Stop()
1✔
1169

1✔
1170
        for {
2✔
1171
                select {
1✔
1172
                case <-ticker.C:
1✔
1173
                        cont.deleteOldOpflexDevices()
1✔
1174
                case <-stopCh:
1✔
1175
                        return
1✔
1176
                }
1177
        }
1178
}
1179

1180
func (cont *AciController) syncDelayedEpSlices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1181
        cont.log.Debug("Go routine to periodically check and process the epslices having delay adding in service")
1✔
1182
        ticker := time.NewTicker(seconds * time.Second)
1✔
1183
        defer ticker.Stop()
1✔
1184

1✔
1185
        for {
2✔
1186
                select {
1✔
1187
                case <-ticker.C:
1✔
1188
                        cont.processDelayedEpSlices()
1✔
1189
                case <-stopCh:
1✔
1190
                        return
1✔
1191
                }
1192
        }
1193
}
1194

1195
func (cont *AciController) snatGlobalInfoSync(stopCh <-chan struct{}, seconds int) {
1✔
1196
        time.Sleep(time.Duration(seconds) * time.Second)
1✔
1197
        cont.log.Debug("Go routine to periodically sync globalinfo and nodeinfo started")
1✔
1198
        iteration := 0
1✔
1199
        for {
2✔
1200
                // To avoid noisy logs, only printing once in 5 minutes
1✔
1201
                if iteration%5 == 0 {
2✔
1202
                        cont.log.Debug("Syncing GlobalInfo with Node infos")
1✔
1203
                }
1✔
1204
                var nodeInfos []*nodeinfo.NodeInfo
1✔
1205
                cont.indexMutex.Lock()
1✔
1206
                cache.ListAll(cont.snatNodeInfoIndexer, labels.Everything(),
1✔
1207
                        func(nodeInfoObj interface{}) {
2✔
1208
                                nodeInfo := nodeInfoObj.(*nodeinfo.NodeInfo)
1✔
1209
                                nodeInfos = append(nodeInfos, nodeInfo)
1✔
1210
                        })
1✔
1211
                expectedmap := make(map[string]map[string]bool)
1✔
1212
                for _, glinfo := range cont.snatGlobalInfoCache {
2✔
1213
                        for nodename, entry := range glinfo {
2✔
1214
                                if _, found := expectedmap[nodename]; !found {
2✔
1215
                                        newentry := make(map[string]bool)
1✔
1216
                                        newentry[entry.SnatPolicyName] = true
1✔
1217
                                        expectedmap[nodename] = newentry
1✔
1218
                                } else {
2✔
1219
                                        currententry := expectedmap[nodename]
1✔
1220
                                        currententry[entry.SnatPolicyName] = true
1✔
1221
                                        expectedmap[nodename] = currententry
1✔
1222
                                }
1✔
1223
                        }
1224
                }
1225
                cont.indexMutex.Unlock()
1✔
1226

1✔
1227
                for _, value := range nodeInfos {
2✔
1228
                        marked := false
1✔
1229
                        policyNames := value.Spec.SnatPolicyNames
1✔
1230
                        nodeName := value.ObjectMeta.Name
1✔
1231
                        _, ok := expectedmap[nodeName]
1✔
1232
                        if !ok && len(policyNames) > 0 {
2✔
1233
                                cont.log.Info("Adding missing entry in snatglobalinfo for node: ", nodeName)
1✔
1234
                                cont.log.Debug("No snat policies found in snatglobalinfo")
1✔
1235
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1236
                                marked = true
1✔
1237
                        } else if len(policyNames) != len(expectedmap[nodeName]) {
3✔
1238
                                cont.log.Info("Adding missing snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1239
                                cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1240
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1241
                                marked = true
1✔
1242
                        } else {
2✔
1243
                                if len(policyNames) == 0 && len(expectedmap[nodeName]) == 0 {
1✔
1244
                                        // No snatpolicies present
×
1245
                                        continue
×
1246
                                }
1247
                                eq := reflect.DeepEqual(expectedmap[nodeName], policyNames)
1✔
1248
                                if !eq {
2✔
1249
                                        cont.log.Debug("Syncing inconsistent snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1250
                                        cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1251
                                        cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1252
                                        marked = true
1✔
1253
                                }
1✔
1254
                        }
1255
                        if marked {
2✔
1256
                                cont.log.Info("Nodeinfo and globalinfo out of sync for node: ", nodeName)
1✔
1257
                                nodeinfokey, err := cache.MetaNamespaceKeyFunc(value)
1✔
1258
                                if err != nil {
1✔
1259
                                        cont.log.Error("Not able to get key for node: ", nodeName)
×
1260
                                        continue
×
1261
                                }
1262
                                cont.log.Info("Queuing nodeinfokey for globalinfo sync: ", nodeinfokey)
1✔
1263
                                cont.queueNodeInfoUpdateByKey(nodeinfokey)
1✔
1264
                        } else if iteration%5 == 0 {
2✔
1265
                                cont.log.Info("Nodeinfo and globalinfo in sync for node: ", nodeName)
1✔
1266
                        }
1✔
1267
                }
1268
                time.Sleep(time.Duration(seconds) * time.Second)
1✔
1269
                iteration++
1✔
1270
        }
1271
}
1272

1273
func (cont *AciController) processSyncQueue(queue workqueue.RateLimitingInterface,
1274
        queueStop <-chan struct{}) {
1✔
1275
        go wait.Until(func() {
2✔
1276
                for {
2✔
1277
                        syncType, quit := queue.Get()
1✔
1278
                        if quit {
2✔
1279
                                break
1✔
1280
                        }
1281
                        var requeue bool
1✔
1282
                        if sType, ok := syncType.(string); ok {
2✔
1283
                                if f, ok := cont.syncProcessors[sType]; ok {
2✔
1284
                                        requeue = f()
1✔
1285
                                }
1✔
1286
                        }
1287
                        if requeue {
1✔
1288
                                queue.AddRateLimited(syncType)
×
1289
                        } else {
1✔
1290
                                queue.Forget(syncType)
1✔
1291
                        }
1✔
1292
                        queue.Done(syncType)
1✔
1293
                }
1294
        }, time.Second, queueStop)
1295
        <-queueStop
1✔
1296
        queue.ShutDown()
1✔
1297
}
1298

1299
func (cont *AciController) scheduleSyncGlobalInfo() {
1✔
1300
        cont.syncQueue.AddRateLimited("snatGlobalInfo")
1✔
1301
}
1✔
1302
func (cont *AciController) scheduleRdConfig() {
×
1303
        cont.syncQueue.AddRateLimited("rdConfig")
×
1304
}
×
1305
func (cont *AciController) scheduleCreateIstioCR() {
×
1306
        cont.syncQueue.AddRateLimited("istioCR")
×
1307
}
×
1308

1309
func (cont *AciController) addVmmInjectedLabel() {
1✔
1310
        if apicapi.ApicVersion >= "5.2" {
1✔
1311
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedLabel")
×
1312
                if err != nil {
×
1313
                        panic(err.Error())
×
1314
                }
1315
                err = apicapi.AddMetaDataChild("vmmInjectedSvc", "vmmInjectedLabel")
×
1316
                if err != nil {
×
1317
                        panic(err.Error())
×
1318
                }
1319
        }
1320
        if apicapi.ApicVersion >= "5.0" {
2✔
1321
                err := apicapi.AddMetaDataChild("vmmInjectedReplSet", "vmmInjectedLabel")
1✔
1322
                if err != nil {
1✔
1323
                        panic(err.Error())
×
1324
                }
1325
                err = apicapi.AddMetaDataChild("vmmInjectedContGrp", "vmmInjectedLabel")
1✔
1326
                if err != nil {
1✔
1327
                        panic(err.Error())
×
1328
                }
1329
                err = apicapi.AddMetaDataChild("vmmInjectedDepl", "vmmInjectedLabel")
1✔
1330
                if err != nil {
1✔
1331
                        panic(err.Error())
×
1332
                }
1333
        }
1334
}
1335

1336
func (cont *AciController) isCNOEnabled() bool {
1✔
1337
        return cont.config.ChainedMode || cont.config.VmmLite
1✔
1338
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc