• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 11671

02 Mar 2026 07:51AM UTC coverage: 62.892% (+0.06%) from 62.83%
11671

push

travis-pro

web-flow
Merge pull request #1679 from noironetworks/backport-remove-epslice-check

[mmr-6.1.1] Remove deprecated Endpoints API fallback logic

0 of 9 new or added lines in 2 files covered. (0.0%)

99 existing lines in 1 file now uncovered.

13491 of 21451 relevant lines covered (62.89%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.0
/pkg/controller/controller.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "encoding/json"
19
        "fmt"
20
        "net"
21
        "os"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/sirupsen/logrus"
29
        "github.com/yl2chen/cidranger"
30
        "golang.org/x/time/rate"
31

32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        v1net "k8s.io/api/networking/v1"
35
        "k8s.io/apimachinery/pkg/labels"
36
        "k8s.io/apimachinery/pkg/util/wait"
37
        "k8s.io/client-go/kubernetes"
38
        "k8s.io/client-go/tools/cache"
39
        "k8s.io/client-go/util/workqueue"
40

41
        "github.com/noironetworks/aci-containers/pkg/apicapi"
42
        fabattv1 "github.com/noironetworks/aci-containers/pkg/fabricattachment/apis/aci.fabricattachment/v1"
43
        fabattclset "github.com/noironetworks/aci-containers/pkg/fabricattachment/clientset/versioned"
44
        hppv1 "github.com/noironetworks/aci-containers/pkg/hpp/apis/aci.hpp/v1"
45
        "github.com/noironetworks/aci-containers/pkg/index"
46
        "github.com/noironetworks/aci-containers/pkg/ipam"
47
        istiov1 "github.com/noironetworks/aci-containers/pkg/istiocrd/apis/aci.istio/v1"
48
        "github.com/noironetworks/aci-containers/pkg/metadata"
49
        nodeinfo "github.com/noironetworks/aci-containers/pkg/nodeinfo/apis/aci.snat/v1"
50
        rdConfig "github.com/noironetworks/aci-containers/pkg/rdconfig/apis/aci.snat/v1"
51
        snatglobalinfo "github.com/noironetworks/aci-containers/pkg/snatglobalinfo/apis/aci.snat/v1"
52
        "github.com/noironetworks/aci-containers/pkg/util"
53
)
54

55
type podUpdateFunc func(*v1.Pod) (*v1.Pod, error)
56
type nodeUpdateFunc func(*v1.Node) (*v1.Node, error)
57
type serviceUpdateFunc func(*v1.Service) (*v1.Service, error)
58
type listNetworkPoliciesFunc func(string) (*v1net.NetworkPolicyList, error)
59
type listNamespacesFunc func() (*v1.NamespaceList, error)
60

61
type AciController struct {
62
        log    *logrus.Logger
63
        config *ControllerConfig
64
        env    Environment
65

66
        defaultEg string
67
        defaultSg string
68

69
        unitTestMode bool
70

71
        podQueue               workqueue.RateLimitingInterface
72
        netPolQueue            workqueue.RateLimitingInterface
73
        qosQueue               workqueue.RateLimitingInterface
74
        serviceQueue           workqueue.RateLimitingInterface
75
        snatQueue              workqueue.RateLimitingInterface
76
        netflowQueue           workqueue.RateLimitingInterface
77
        erspanQueue            workqueue.RateLimitingInterface
78
        snatNodeInfoQueue      workqueue.RateLimitingInterface
79
        rdConfigQueue          workqueue.RateLimitingInterface
80
        istioQueue             workqueue.RateLimitingInterface
81
        nodeFabNetAttQueue     workqueue.RateLimitingInterface
82
        netFabConfigQueue      workqueue.RateLimitingInterface
83
        nadVlanMapQueue        workqueue.RateLimitingInterface
84
        fabricVlanPoolQueue    workqueue.RateLimitingInterface
85
        netFabL3ConfigQueue    workqueue.RateLimitingInterface
86
        remIpContQueue         workqueue.RateLimitingInterface
87
        epgDnCacheUpdateQueue  workqueue.RateLimitingInterface
88
        aaepMonitorConfigQueue workqueue.RateLimitingInterface
89

90
        namespaceIndexer                     cache.Indexer
91
        namespaceInformer                    cache.Controller
92
        podIndexer                           cache.Indexer
93
        podInformer                          cache.Controller
94
        endpointsIndexer                     cache.Indexer
95
        endpointsInformer                    cache.Controller
96
        serviceIndexer                       cache.Indexer
97
        serviceInformer                      cache.Controller
98
        replicaSetIndexer                    cache.Indexer
99
        replicaSetInformer                   cache.Controller
100
        deploymentIndexer                    cache.Indexer
101
        deploymentInformer                   cache.Controller
102
        nodeIndexer                          cache.Indexer
103
        nodeInformer                         cache.Controller
104
        networkPolicyIndexer                 cache.Indexer
105
        networkPolicyInformer                cache.Controller
106
        snatIndexer                          cache.Indexer
107
        snatInformer                         cache.Controller
108
        snatNodeInfoIndexer                  cache.Indexer
109
        snatNodeInformer                     cache.Controller
110
        snatLocalInfoInformer                cache.Controller
111
        crdInformer                          cache.Controller
112
        rdConfigInformer                     cache.Controller
113
        rdConfigIndexer                      cache.Indexer
114
        qosIndexer                           cache.Indexer
115
        qosInformer                          cache.Controller
116
        netflowIndexer                       cache.Indexer
117
        netflowInformer                      cache.Controller
118
        erspanIndexer                        cache.Indexer
119
        erspanInformer                       cache.Controller
120
        nodePodIfIndexer                     cache.Indexer
121
        nodePodIfInformer                    cache.Controller
122
        istioIndexer                         cache.Indexer
123
        istioInformer                        cache.Controller
124
        endpointSliceIndexer                 cache.Indexer
125
        endpointSliceInformer                cache.Controller
126
        snatCfgInformer                      cache.Controller
127
        updatePod                            podUpdateFunc
128
        updateNode                           nodeUpdateFunc
129
        updateServiceStatus                  serviceUpdateFunc
130
        listNetworkPolicies                  listNetworkPoliciesFunc
131
        listNamespaces                       listNamespacesFunc
132
        nodeFabNetAttInformer                cache.SharedIndexInformer
133
        netFabConfigInformer                 cache.SharedIndexInformer
134
        nadVlanMapInformer                   cache.SharedIndexInformer
135
        fabricVlanPoolInformer               cache.SharedIndexInformer
136
        networkFabricL3ConfigurationInformer cache.SharedIndexInformer
137
        fabNetAttClient                      *fabattclset.Clientset
138
        proactiveConfInformer                cache.SharedIndexInformer
139
        aaepMonitorInformer                  cache.SharedIndexInformer
140
        poster                               *EventPoster
141

142
        indexMutex sync.Mutex
143
        hppMutex   sync.Mutex
144

145
        configuredPodNetworkIps *netIps
146
        podNetworkIps           *netIps
147
        serviceIps              *ipam.IpCache
148
        staticServiceIps        *netIps
149
        nodeServiceIps          *netIps
150

151
        // index of pods matched by deployments
152
        depPods *index.PodSelectorIndex
153
        // index of pods matched by network policies
154
        netPolPods *index.PodSelectorIndex
155
        // index of pods matched by network policy ingress rules
156
        netPolIngressPods *index.PodSelectorIndex
157
        // index of pods matched by network policy egress rules
158
        netPolEgressPods *index.PodSelectorIndex
159
        // index of IP addresses contained in endpoints objects
160
        endpointsIpIndex cidranger.Ranger
161
        // index of service target ports
162
        targetPortIndex map[string]*portIndexEntry
163
        // index of services with named target ports
164
        namedPortServiceIndex map[string]*namedPortServiceIndexEntry
165
        // index of ip blocks referenced by network policy egress rules
166
        netPolSubnetIndex cidranger.Ranger
167
        // index of pods matched by erspan policies
168
        erspanPolPods *index.PodSelectorIndex
169

170
        apicConn *apicapi.ApicConnection
171

172
        nodeServiceMetaCache map[string]*nodeServiceMeta
173
        nodeACIPod           map[string]aciPodAnnot
174
        nodeACIPodAnnot      map[string]aciPodAnnot
175
        nodeOpflexDevice     map[string]apicapi.ApicSlice
176
        nodePodNetCache      map[string]*nodePodNetMeta
177
        serviceMetaCache     map[string]*serviceMeta
178
        snatPolicyCache      map[string]*ContSnatPolicy
179
        delayedEpSlices      []*DelayedEpSlice
180
        snatServices         map[string]bool
181
        snatNodeInfoCache    map[string]*nodeinfo.NodeInfo
182
        rdConfigCache        map[string]*rdConfig.RdConfig
183
        rdConfigSubnetCache  map[string]*rdConfig.RdConfigSpec
184
        istioCache           map[string]*istiov1.AciIstioOperator
185
        podIftoEp            map[string]*EndPointData
186
        // Node Name and Policy Name
187
        snatGlobalInfoCache map[string]map[string]*snatglobalinfo.GlobalInfo
188
        nodeSyncEnabled     bool
189
        serviceSyncEnabled  bool
190
        snatSyncEnabled     bool
191
        syncQueue           workqueue.RateLimitingInterface
192
        syncProcessors      map[string]func() bool
193
        serviceEndPoints    ServiceEndPointType
194
        crdHandlers         map[string]func(*AciController, <-chan struct{})
195
        stopCh              <-chan struct{}
196
        //index of containerportname to ctrPortNameEntry
197
        ctrPortNameCache map[string]*ctrPortNameEntry
198
        // named networkPolicies
199
        nmPortNp map[string]bool
200
        //maps network policy hash to hpp
201
        hppRef map[string]hppReference
202
        //map for ns to remoteIpConts
203
        nsRemoteIpCont map[string]remoteIpConts
204
        // cache to look for Epg DNs which are bound to Vmm domain
205
        cachedEpgDns             []string
206
        vmmClusterFaultSupported bool
207
        additionalNetworkCache   map[string]*AdditionalNetworkMeta
208
        //Used in Shared mode
209
        sharedEncapCache       map[int]*sharedEncapData
210
        sharedEncapAepCache    map[string]map[int]bool
211
        sharedEncapSviCache    map[int]*NfL3Data
212
        sharedEncapVrfCache    map[string]*NfVrfData
213
        sharedEncapTenantCache map[string]*NfTenantData
214
        nfl3configGenerationId int64
215
        // vlan to propertiesList
216
        sharedEncapNfcCache         map[int]*NfcData
217
        sharedEncapNfcVlanMap       map[int]*NfcData
218
        sharedEncapNfcLabelMap      map[string]*NfcData
219
        sharedEncapNfcAppProfileMap map[string]bool
220
        // nadVlanMap encapLabel to vlan
221
        sharedEncapLabelMap      map[string][]int
222
        lldpIfCache              map[string]*NfLLDPIfData
223
        globalVlanConfig         globalVlanConfig
224
        fabricVlanPoolMap        map[string]map[string]string
225
        openStackFabricPathDnMap map[string]openstackOpflexOdevInfo
226
        hostFabricPathDnMap      map[string]hostFabricInfo
227
        openStackSystemId        string
228
        sharedAaepMonitor        map[string]map[string]*AaepEpgAttachData
229
}
230

231
type hostFabricInfo struct {
232
        fabricPathDn string
233
        host         string
234
        vpcIfDn      map[string]struct{}
235
}
236

237
type NfLLDPIfData struct {
238
        LLDPIf string
239
        // As of now, manage at the NAD level
240
        // more granular introduces intf tracking complexities
241
        // for not sufficient benefits
242
        Refs map[string]bool
243
}
244

245
type NfL3OutData struct {
246
        // +kubebuilder:validation:Enum:"import"
247
        RtCtrl     string
248
        PodId      int
249
        RtrNodeMap map[int]*fabattv1.FabricL3OutRtrNode
250
        ExtEpgMap  map[string]*fabattv1.PolicyPrefixGroup
251
        SviMap     map[int]bool
252
}
253

254
type NfTenantData struct {
255
        CommonTenant     bool
256
        L3OutConfig      map[string]*NfL3OutData
257
        BGPPeerPfxConfig map[string]*fabattv1.BGPPeerPrefixPolicy
258
}
259

260
type NfVrfData struct {
261
        TenantConfig map[string]*NfTenantData
262
}
263

264
type NfL3Networks struct {
265
        fabattv1.PrimaryNetwork
266
        Subnets map[string]*fabattv1.FabricL3Subnet
267
}
268

269
type NfL3Data struct {
270
        Tenant      string
271
        Vrf         fabattv1.VRF
272
        PodId       int
273
        ConnectedNw *NfL3Networks
274
        NetAddr     map[string]*RoutedNetworkData
275
        Nodes       map[int]fabattv1.FabricL3OutNode
276
}
277

278
// maps pod name to remoteIpCont
279
type remoteIpConts map[string]remoteIpCont
280

281
// remoteIpCont maps ip to pod labels
282
type remoteIpCont map[string]map[string]string
283

284
type NfcData struct {
285
        Aeps map[string]bool
286
        Epg  fabattv1.Epg
287
}
288

289
type sharedEncapData struct {
290
        //node to NAD to pods
291
        Pods   map[string]map[string][]string
292
        NetRef map[string]*AdditionalNetworkMeta
293
        Aeps   map[string]bool
294
}
295

296
type globalVlanConfig struct {
297
        SharedPhysDom apicapi.ApicObject
298
        SharedL3Dom   apicapi.ApicObject
299
}
300

301
type hppReference struct {
302
        RefCount uint              `json:"ref-count,omitempty"`
303
        Npkeys   []string          `json:"npkeys,omitempty"`
304
        HppObj   apicapi.ApicSlice `json:"hpp-obj,omitempty"`
305
        HppCr    hppv1.HostprotPol `json:"hpp-cr,omitempty"`
306
}
307

308
type DelayedEpSlice struct {
309
        ServiceKey  string
310
        OldEpSlice  *discovery.EndpointSlice
311
        NewEpSlice  *discovery.EndpointSlice
312
        DelayedTime time.Time
313
}
314

315
type aciPodAnnot struct {
316
        aciPod         string
317
        disconnectTime time.Time
318
        lastErrorTime  time.Time
319
}
320

321
type nodeServiceMeta struct {
322
        serviceEp metadata.ServiceEndpoint
323
}
324

325
type nodePodNetMeta struct {
326
        nodePods            map[string]bool
327
        podNetIps           metadata.NetIps
328
        podNetIpsAnnotation string
329
}
330

331
type openstackOpflexOdevInfo struct {
332
        opflexODevDn map[string]struct{}
333
        fabricPathDn string
334
}
335

336
type serviceMeta struct {
337
        requestedIps     []net.IP
338
        ingressIps       []net.IP
339
        staticIngressIps []net.IP
340
}
341

342
type ipIndexEntry struct {
343
        ipNet net.IPNet
344
        keys  map[string]bool
345
}
346

347
type targetPort struct {
348
        proto v1.Protocol
349
        ports map[int]bool
350
}
351

352
type portIndexEntry struct {
353
        port              targetPort
354
        serviceKeys       map[string]bool
355
        networkPolicyKeys map[string]bool
356
}
357

358
type namedPortServiceIndexPort struct {
359
        targetPortName string
360
        resolvedPorts  map[int]bool
361
}
362

363
type namedPortServiceIndexEntry map[string]*namedPortServiceIndexPort
364

365
type portRangeSnat struct {
366
        start int
367
        end   int
368
}
369

370
// EndPointData holds PodIF data in controller.
371
type EndPointData struct {
372
        MacAddr    string
373
        EPG        string
374
        Namespace  string
375
        AppProfile string
376
}
377

378
type ctrPortNameEntry struct {
379
        // Proto+port->pods
380
        ctrNmpToPods map[string]map[string]bool
381
}
382

383
type LinkData struct {
384
        Link []string
385
        Pods []string
386
}
387

388
type RoutedNodeData struct {
389
        addr string
390
        idx  int
391
}
392

393
type RoutedNetworkData struct {
394
        subnet       string
395
        netAddress   string
396
        maskLen      int
397
        numAllocated int
398
        maxAddresses int
399
        baseAddress  net.IP
400
        nodeMap      map[string]RoutedNodeData
401
        availableMap map[int]bool
402
}
403

404
type AdditionalNetworkMeta struct {
405
        NetworkName string
406
        EncapVlan   string
407
        //node+localiface->fabricLinks
408
        FabricLink map[string]map[string]LinkData
409
        NodeCache  map[string]*fabattv1.NodeFabricNetworkAttachment
410
        Mode       util.EncapMode
411
}
412

413
type ServiceEndPointType interface {
414
        InitClientInformer(kubeClient *kubernetes.Clientset)
415
        Run(stopCh <-chan struct{})
416
        Wait(stopCh <-chan struct{})
417
        UpdateServicesForNode(nodename string)
418
        GetnodesMetadata(key string, service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint)
419
        SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool
420
        SetNpServiceAugmentForService(servicekey string, service *v1.Service, prs *portRemoteSubnet,
421
                portAugments map[string]*portServiceAugment, subnetIndex cidranger.Ranger, logger *logrus.Entry)
422
}
423

424
type serviceEndpoint struct {
425
        cont *AciController
426
}
427
type serviceEndpointSlice struct {
428
        cont *AciController
429
}
430

431
type AaepEpgAttachData struct {
432
        encapVlan     int
433
        nadName       string
434
        namespaceName string
435
        nadCreated    bool
436
}
437

438
type EpgVlanMap struct {
439
        epgDn     string
440
        encapVlan int
441
}
442

443
func (sep *serviceEndpoint) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
444
        sep.cont.initEndpointsInformerFromClient(kubeClient)
×
445
}
×
446

447
func (seps *serviceEndpointSlice) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
448
        seps.cont.initEndpointSliceInformerFromClient(kubeClient)
×
449
}
×
450

451
func (sep *serviceEndpoint) Run(stopCh <-chan struct{}) {
1✔
452
        go sep.cont.endpointsInformer.Run(stopCh)
1✔
453
}
1✔
454

455
func (seps *serviceEndpointSlice) Run(stopCh <-chan struct{}) {
1✔
456
        go seps.cont.endpointSliceInformer.Run(stopCh)
1✔
457
}
1✔
458

459
func (sep *serviceEndpoint) Wait(stopCh <-chan struct{}) {
1✔
460
        cache.WaitForCacheSync(stopCh,
1✔
461
                sep.cont.endpointsInformer.HasSynced,
1✔
462
                sep.cont.serviceInformer.HasSynced)
1✔
463
}
1✔
464

465
func (seps *serviceEndpointSlice) Wait(stopCh <-chan struct{}) {
1✔
466
        seps.cont.log.Debug("Waiting for EndPointSlicecache sync")
1✔
467
        cache.WaitForCacheSync(stopCh,
1✔
468
                seps.cont.endpointSliceInformer.HasSynced,
1✔
469
                seps.cont.serviceInformer.HasSynced)
1✔
470
}
1✔
471

472
func (e *ipIndexEntry) Network() net.IPNet {
1✔
473
        return e.ipNet
1✔
474
}
1✔
475

476
func newNodePodNetMeta() *nodePodNetMeta {
1✔
477
        return &nodePodNetMeta{
1✔
478
                nodePods: make(map[string]bool),
1✔
479
        }
1✔
480
}
1✔
481

482
func createQueue(name string) workqueue.RateLimitingInterface {
1✔
483
        return workqueue.NewNamedRateLimitingQueue(
1✔
484
                workqueue.NewMaxOfRateLimiter(
1✔
485
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
486
                                10*time.Second),
1✔
487
                        &workqueue.BucketRateLimiter{
1✔
488
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
489
                        },
1✔
490
                ),
1✔
491
                "delta")
1✔
492
}
1✔
493

494
func NewController(config *ControllerConfig, env Environment, log *logrus.Logger, unittestmode bool) *AciController {
1✔
495
        cont := &AciController{
1✔
496
                log:          log,
1✔
497
                config:       config,
1✔
498
                env:          env,
1✔
499
                defaultEg:    "",
1✔
500
                defaultSg:    "",
1✔
501
                unitTestMode: unittestmode,
1✔
502

1✔
503
                podQueue:               createQueue("pod"),
1✔
504
                netPolQueue:            createQueue("networkPolicy"),
1✔
505
                qosQueue:               createQueue("qos"),
1✔
506
                netflowQueue:           createQueue("netflow"),
1✔
507
                erspanQueue:            createQueue("erspan"),
1✔
508
                serviceQueue:           createQueue("service"),
1✔
509
                snatQueue:              createQueue("snat"),
1✔
510
                snatNodeInfoQueue:      createQueue("snatnodeinfo"),
1✔
511
                rdConfigQueue:          createQueue("rdconfig"),
1✔
512
                istioQueue:             createQueue("istio"),
1✔
513
                nodeFabNetAttQueue:     createQueue("nodefabricnetworkattachment"),
1✔
514
                netFabConfigQueue:      createQueue("networkfabricconfiguration"),
1✔
515
                nadVlanMapQueue:        createQueue("nadvlanmap"),
1✔
516
                fabricVlanPoolQueue:    createQueue("fabricvlanpool"),
1✔
517
                netFabL3ConfigQueue:    createQueue("networkfabricl3configuration"),
1✔
518
                remIpContQueue:         createQueue("remoteIpContainer"),
1✔
519
                epgDnCacheUpdateQueue:  createQueue("epgDnCache"),
1✔
520
                aaepMonitorConfigQueue: createQueue("aaepepgmap"),
1✔
521
                syncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
522
                        &workqueue.BucketRateLimiter{
1✔
523
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
524
                        }, "sync"),
1✔
525

1✔
526
                configuredPodNetworkIps: newNetIps(),
1✔
527
                podNetworkIps:           newNetIps(),
1✔
528
                serviceIps:              ipam.NewIpCache(),
1✔
529
                staticServiceIps:        newNetIps(),
1✔
530
                nodeServiceIps:          newNetIps(),
1✔
531

1✔
532
                nodeACIPod:       make(map[string]aciPodAnnot),
1✔
533
                nodeACIPodAnnot:  make(map[string]aciPodAnnot),
1✔
534
                nodeOpflexDevice: make(map[string]apicapi.ApicSlice),
1✔
535

1✔
536
                nodeServiceMetaCache:        make(map[string]*nodeServiceMeta),
1✔
537
                nodePodNetCache:             make(map[string]*nodePodNetMeta),
1✔
538
                serviceMetaCache:            make(map[string]*serviceMeta),
1✔
539
                snatPolicyCache:             make(map[string]*ContSnatPolicy),
1✔
540
                snatServices:                make(map[string]bool),
1✔
541
                snatNodeInfoCache:           make(map[string]*nodeinfo.NodeInfo),
1✔
542
                rdConfigCache:               make(map[string]*rdConfig.RdConfig),
1✔
543
                rdConfigSubnetCache:         make(map[string]*rdConfig.RdConfigSpec),
1✔
544
                podIftoEp:                   make(map[string]*EndPointData),
1✔
545
                snatGlobalInfoCache:         make(map[string]map[string]*snatglobalinfo.GlobalInfo),
1✔
546
                istioCache:                  make(map[string]*istiov1.AciIstioOperator),
1✔
547
                crdHandlers:                 make(map[string]func(*AciController, <-chan struct{})),
1✔
548
                ctrPortNameCache:            make(map[string]*ctrPortNameEntry),
1✔
549
                nmPortNp:                    make(map[string]bool),
1✔
550
                hppRef:                      make(map[string]hppReference),
1✔
551
                additionalNetworkCache:      make(map[string]*AdditionalNetworkMeta),
1✔
552
                sharedEncapCache:            make(map[int]*sharedEncapData),
1✔
553
                sharedEncapAepCache:         make(map[string]map[int]bool),
1✔
554
                sharedEncapSviCache:         make(map[int]*NfL3Data),
1✔
555
                sharedEncapVrfCache:         make(map[string]*NfVrfData),
1✔
556
                sharedEncapTenantCache:      make(map[string]*NfTenantData),
1✔
557
                sharedEncapNfcCache:         make(map[int]*NfcData),
1✔
558
                sharedEncapNfcVlanMap:       make(map[int]*NfcData),
1✔
559
                sharedEncapNfcLabelMap:      make(map[string]*NfcData),
1✔
560
                sharedEncapNfcAppProfileMap: make(map[string]bool),
1✔
561
                sharedEncapLabelMap:         make(map[string][]int),
1✔
562
                lldpIfCache:                 make(map[string]*NfLLDPIfData),
1✔
563
                fabricVlanPoolMap:           make(map[string]map[string]string),
1✔
564
                openStackFabricPathDnMap:    make(map[string]openstackOpflexOdevInfo),
1✔
565
                hostFabricPathDnMap:         make(map[string]hostFabricInfo),
1✔
566
                nsRemoteIpCont:              make(map[string]remoteIpConts),
1✔
567
                sharedAaepMonitor:           make(map[string]map[string]*AaepEpgAttachData),
1✔
568
        }
1✔
569
        cont.syncProcessors = map[string]func() bool{
1✔
570
                "snatGlobalInfo": cont.syncSnatGlobalInfo,
1✔
571
                "rdConfig":       cont.syncRdConfig,
1✔
572
                /* Commenting code to remove dependency from istio.io/istio package.
1✔
573
                   Vulnerabilties were detected by quay.io security scan of aci-containers-controller
1✔
574
                   and aci-containers-operator images for istio.io/istio package
1✔
575

1✔
576
                "istioCR":        cont.createIstioCR,
1✔
577
                */
1✔
578
        }
1✔
579
        return cont
1✔
580
}
1✔
581

582
func (cont *AciController) Init() {
×
583
        if cont.config.ChainedMode {
×
584
                cont.log.Info("In chained mode")
×
585
        }
×
586
        if cont.config.VmmLite {
×
587
                cont.log.Info("In VMM lite mode")
×
588
        }
×
589

590
        egdata, err := json.Marshal(cont.config.DefaultEg)
×
591
        if err != nil {
×
592
                cont.log.Error("Could not serialize default endpoint group")
×
593
                panic(err.Error())
×
594
        }
595
        cont.defaultEg = string(egdata)
×
596

×
597
        sgdata, err := json.Marshal(cont.config.DefaultSg)
×
598
        if err != nil {
×
599
                cont.log.Error("Could not serialize default security groups")
×
600
                panic(err.Error())
×
601
        }
602
        cont.defaultSg = string(sgdata)
×
603

×
604
        cont.log.Debug("Initializing IPAM")
×
605
        cont.initIpam()
×
NEW
606

×
NEW
607
        cont.serviceEndPoints = &serviceEndpointSlice{}
×
NEW
608
        cont.serviceEndPoints.(*serviceEndpointSlice).cont = cont
×
NEW
609
        cont.log.Info("Initializing ServiceEndpointSlices")
×
610

×
611
        err = cont.env.Init(cont)
×
612
        if err != nil {
×
613
                panic(err.Error())
×
614
        }
615
}
616

617
func (cont *AciController) processQueue(queue workqueue.RateLimitingInterface,
618
        store cache.Store, handler func(interface{}) bool,
619
        deleteHandler func(string) bool,
620
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
621
        go wait.Until(func() {
2✔
622
                for {
2✔
623
                        key, quit := queue.Get()
1✔
624
                        if quit {
2✔
625
                                break
1✔
626
                        }
627

628
                        var requeue bool
1✔
629
                        switch key := key.(type) {
1✔
630
                        case chan struct{}:
×
631
                                close(key)
×
632
                        case string:
1✔
633
                                if strings.HasPrefix(key, "DELETED_") {
2✔
634
                                        delKey := strings.Trim(key, "DELETED_")
1✔
635
                                        requeue = deleteHandler(delKey)
1✔
636
                                } else {
2✔
637
                                        obj, exists, err := store.GetByKey(key)
1✔
638
                                        if err != nil {
1✔
639
                                                cont.log.Debugf("Error fetching object with key %s from store: %v", key, err)
×
640
                                        }
×
641
                                        //Handle Add/Update/Delete
642
                                        if exists && handler != nil {
2✔
643
                                                requeue = handler(obj)
1✔
644
                                        }
1✔
645
                                        //Handle Post Delete
646
                                        if !exists && postDelHandler != nil {
1✔
647
                                                requeue = postDelHandler()
×
648
                                        }
×
649
                                }
650
                        }
651
                        if requeue {
2✔
652
                                queue.AddRateLimited(key)
1✔
653
                        } else {
2✔
654
                                queue.Forget(key)
1✔
655
                        }
1✔
656
                        queue.Done(key)
1✔
657
                }
658
        }, time.Second, stopCh)
659
        <-stopCh
1✔
660
        queue.ShutDown()
1✔
661
}
662

663
func (cont *AciController) processRemIpContQueue(queue workqueue.RateLimitingInterface,
664
        handler func(interface{}) bool,
665
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
666
        go wait.Until(func() {
2✔
667
                for {
2✔
668
                        key, quit := queue.Get()
1✔
669
                        if quit {
2✔
670
                                break
1✔
671
                        }
672

673
                        var requeue bool
1✔
674
                        switch key := key.(type) {
1✔
675
                        case chan struct{}:
×
676
                                close(key)
×
677
                        case string:
1✔
678
                                if handler != nil {
2✔
679
                                        requeue = handler(key)
1✔
680
                                }
1✔
681
                                if postDelHandler != nil {
2✔
682
                                        requeue = postDelHandler()
1✔
683
                                }
1✔
684
                        }
685
                        if requeue {
1✔
686
                                queue.AddRateLimited(key)
×
687
                        } else {
1✔
688
                                queue.Forget(key)
1✔
689
                        }
1✔
690
                        queue.Done(key)
1✔
691

692
                }
693
        }, time.Second, stopCh)
694
        <-stopCh
1✔
695
        queue.ShutDown()
1✔
696
}
697

698
func (cont *AciController) processEpgDnCacheUpdateQueue(queue workqueue.RateLimitingInterface,
699
        handler func(interface{}) bool,
700
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
701
        go wait.Until(func() {
2✔
702
                for {
2✔
703
                        key, quit := queue.Get()
1✔
704
                        if quit {
2✔
705
                                break
1✔
706
                        }
707

708
                        var requeue bool
1✔
709
                        switch key := key.(type) {
1✔
710
                        case chan struct{}:
×
711
                                close(key)
×
712
                        case bool:
1✔
713
                                if handler != nil {
2✔
714
                                        requeue = handler(key)
1✔
715
                                }
1✔
716
                                if postDelHandler != nil {
1✔
717
                                        requeue = postDelHandler()
×
718
                                }
×
719
                        }
720
                        if requeue {
1✔
721
                                queue.AddRateLimited(key)
×
722
                        } else {
1✔
723
                                queue.Forget(key)
1✔
724
                        }
1✔
725
                        queue.Done(key)
1✔
726

727
                }
728
        }, time.Second, stopCh)
729
        <-stopCh
1✔
730
        queue.ShutDown()
1✔
731
}
732

733
func (cont *AciController) globalStaticObjs() apicapi.ApicSlice {
1✔
734
        return apicapi.ApicSlice{}
1✔
735
}
1✔
736

737
func (cont *AciController) aciNameForKey(ktype, key string) string {
1✔
738
        return util.AciNameForKey(cont.config.AciPrefix, ktype, key)
1✔
739
}
1✔
740

741
func (cont *AciController) initStaticObjs() {
1✔
742
        cont.env.InitStaticAciObjects()
1✔
743
        cont.apicConn.WriteStaticApicObjects(cont.config.AciPrefix+"_static",
1✔
744
                cont.globalStaticObjs())
1✔
745
}
1✔
746

747
func (cont *AciController) vmmDomainProvider() (vmmProv string) {
1✔
748
        vmmProv = "Kubernetes"
1✔
749
        if strings.ToLower(cont.config.AciVmmDomainType) == "openshift" {
1✔
750
                vmmProv = "OpenShift"
×
751
        }
×
752
        return
1✔
753
}
754

755
func (cont *AciController) Run(stopCh <-chan struct{}) {
1✔
756
        var err error
1✔
757
        var privKey []byte
1✔
758
        var apicCert []byte
1✔
759

1✔
760
        cont.config.AciVrfDn = "uni/tn-" + cont.config.AciVrfTenant + "/ctx-" + cont.config.AciVrf
1✔
761

1✔
762
        if cont.config.ApicPrivateKeyPath != "" {
1✔
763
                privKey, err = os.ReadFile(cont.config.ApicPrivateKeyPath)
×
764
                if err != nil {
×
765
                        panic(err)
×
766
                }
767
        }
768
        if cont.config.ApicCertPath != "" {
1✔
769
                apicCert, err = os.ReadFile(cont.config.ApicCertPath)
×
770
                if err != nil {
×
771
                        panic(err)
×
772
                }
773
        }
774
        // If not defined, default is 1800
775
        if cont.config.ApicRefreshTimer == "" {
2✔
776
                cont.config.ApicRefreshTimer = "1800"
1✔
777
        }
1✔
778
        refreshTimeout, err := strconv.Atoi(cont.config.ApicRefreshTimer)
1✔
779
        if err != nil {
1✔
780
                panic(err)
×
781
        }
782
        cont.log.Info("ApicRefreshTimer conf is set to: ", refreshTimeout)
1✔
783

1✔
784
        // Bailout if the refreshTimeout is more than 12Hours or less than 5Mins
1✔
785
        if refreshTimeout > (12*60*60) || refreshTimeout < (5*60) {
1✔
786
                cont.log.Info("ApicRefreshTimer can't be more than 12Hrs or less than 5Mins")
×
787
                panic(err)
×
788
        }
789

790
        // If RefreshTickerAdjustInterval is not defined, default to 150Sec.
791
        if cont.config.ApicRefreshTickerAdjust == "" {
2✔
792
                cont.config.ApicRefreshTickerAdjust = "210"
1✔
793
        }
1✔
794
        refreshTickerAdjust, err := strconv.Atoi(cont.config.ApicRefreshTickerAdjust)
1✔
795
        if err != nil {
1✔
796
                panic(err)
×
797
        }
798

799
        // If not defined, default to 900s
800
        if cont.config.LeafRebootCheckInterval == 0 {
2✔
801
                cont.config.LeafRebootCheckInterval = 900
1✔
802
        }
1✔
803

804
        //If ApicSubscriptionDelay is not defined, default to 100ms
805
        if cont.config.ApicSubscriptionDelay == 0 {
2✔
806
                cont.config.ApicSubscriptionDelay = 100
1✔
807
        }
1✔
808
        cont.log.Info("ApicSubscriptionDelay conf is set to: ", cont.config.ApicSubscriptionDelay)
1✔
809

1✔
810
        // If OpflexDeviceDeleteTimeout is not defined, default to 1800s
1✔
811
        if cont.config.OpflexDeviceDeleteTimeout == 0 {
2✔
812
                cont.config.OpflexDeviceDeleteTimeout = 1800
1✔
813
        }
1✔
814

815
        // If OpflexDeviceReconnectWaitTimeout is not defined, default to 25s
816
        if cont.config.OpflexDeviceReconnectWaitTimeout == 0 {
2✔
817
                cont.config.OpflexDeviceReconnectWaitTimeout = 25
1✔
818
        }
1✔
819
        cont.log.Debug("OpflexDeviceReconnectWaitTimeout set to: ", cont.config.OpflexDeviceReconnectWaitTimeout)
1✔
820

1✔
821
        // If SleepTimeSnatGlobalInfoSync is not defined, default to 60
1✔
822
        if cont.config.SleepTimeSnatGlobalInfoSync == 0 {
2✔
823
                cont.config.SleepTimeSnatGlobalInfoSync = 60
1✔
824
        }
1✔
825

826
        // If not defined, default to 32
827
        if cont.config.PodIpPoolChunkSize == 0 {
2✔
828
                cont.config.PodIpPoolChunkSize = 32
1✔
829
        }
1✔
830
        if !cont.isCNOEnabled() {
2✔
831
                cont.log.Info("PodIpPoolChunkSize conf is set to: ", cont.config.PodIpPoolChunkSize)
1✔
832
        }
1✔
833

834
        // If ApicConnectionRetryLimit is not defined, default to 5
835
        if cont.config.ApicConnectionRetryLimit == 0 {
2✔
836
                cont.config.ApicConnectionRetryLimit = 5
1✔
837
        }
1✔
838
        cont.log.Debug("ApicConnectionRetryLimit set to: ", cont.config.ApicConnectionRetryLimit)
1✔
839

1✔
840
        // If not valid, default to 5000-65000
1✔
841
        // other permissible values 1-65000
1✔
842
        defStart := 5000
1✔
843
        defEnd := 65000
1✔
844
        if cont.config.SnatDefaultPortRangeStart == 0 {
2✔
845
                cont.config.SnatDefaultPortRangeStart = defStart
1✔
846
        }
1✔
847
        if cont.config.SnatDefaultPortRangeEnd == 0 {
2✔
848
                cont.config.SnatDefaultPortRangeEnd = defEnd
1✔
849
        }
1✔
850
        if cont.config.SnatDefaultPortRangeStart < 0 || cont.config.SnatDefaultPortRangeEnd < 0 ||
1✔
851
                cont.config.SnatDefaultPortRangeStart > defEnd || cont.config.SnatDefaultPortRangeEnd > defEnd ||
1✔
852
                cont.config.SnatDefaultPortRangeStart > cont.config.SnatDefaultPortRangeEnd {
1✔
853
                cont.config.SnatDefaultPortRangeStart = defStart
×
854
                cont.config.SnatDefaultPortRangeEnd = defEnd
×
855
        }
×
856

857
        // Set default value for pbr programming delay if services list is not empty
858
        // and delay value is empty
859
        if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 &&
1✔
860
                cont.config.ServiceGraphEndpointAddDelay.Services != nil &&
1✔
861
                len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 {
1✔
862
                cont.config.ServiceGraphEndpointAddDelay.Delay = 90
×
863
        }
×
864
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
865
                cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay)
×
866
        }
×
867

868
        // Set contract scope for snat svc graph to global by default
869
        if cont.config.SnatSvcContractScope == "" {
2✔
870
                cont.config.SnatSvcContractScope = "global"
1✔
871
        }
1✔
872
        if cont.config.MaxSvcGraphNodes == 0 {
2✔
873
                cont.config.MaxSvcGraphNodes = 32
1✔
874
        }
1✔
875
        if !cont.isCNOEnabled() {
2✔
876
                cont.log.Info("Max number of nodes per svc graph is set to: ", cont.config.MaxSvcGraphNodes)
1✔
877
        }
1✔
878
        cont.apicConn, err = apicapi.New(cont.log, cont.config.ApicHosts,
1✔
879
                cont.config.ApicUsername, cont.config.ApicPassword,
1✔
880
                privKey, apicCert, cont.config.AciPrefix,
1✔
881
                refreshTimeout, refreshTickerAdjust, cont.config.LeafRebootCheckInterval, cont.config.ApicSubscriptionDelay,
1✔
882
                cont.config.AciVrfTenant, cont.UpdateLLDPIfLocked, cont.isCNOEnabled())
1✔
883
        if err != nil {
1✔
884
                panic(err)
×
885
        }
886

887
        cont.apicConn.FilterOpflexDevice = cont.config.FilterOpflexDevice
1✔
888
        cont.apicConn.Flavor = cont.config.Flavor
1✔
889
        cont.apicConn.VmmDomain = cont.config.AciVmmDomain
1✔
890
        cont.apicConn.ReconnectRetryLimit = cont.config.ApicConnectionRetryLimit
1✔
891
        cont.apicConn.RequestRetryDelay = cont.config.ApicRequestRetryDelay
1✔
892
        cont.apicConn.EnableRequestRetry = cont.config.EnableApicRequestRetry
1✔
893

1✔
894
        if len(cont.config.ApicHosts) != 0 {
1✔
895
        APIC_SWITCH:
×
896
                cont.log.WithFields(logrus.Fields{
×
897
                        "mod":  "APICAPI",
×
898
                        "host": cont.apicConn.Apic[cont.apicConn.ApicIndex],
×
899
                }).Debug("Connecting to APIC to determine the Version")
×
900

×
901
                version, err := cont.apicConn.GetVersion()
×
902
                if err != nil {
×
903
                        cont.log.Error("Could not get APIC version, switching to next APIC")
×
904
                        cont.apicConn.ApicIndex = (cont.apicConn.ApicIndex + 1) % len(cont.apicConn.Apic)
×
905
                        time.Sleep(cont.apicConn.ReconnectInterval)
×
906
                        goto APIC_SWITCH
×
907
                }
908
                cont.apicConn.CachedVersion = version
×
909
                apicapi.ApicVersion = version
×
910
                if version >= "4.2(4i)" {
×
911
                        cont.apicConn.SnatPbrFltrChain = true
×
912
                } else {
×
913
                        cont.apicConn.SnatPbrFltrChain = false
×
914
                }
×
915
                if version >= "5.2" {
×
916
                        cont.vmmClusterFaultSupported = true
×
917
                }
×
918
        } else { // For unit-tests
1✔
919
                cont.apicConn.SnatPbrFltrChain = true
1✔
920
        }
1✔
921

922
        if !cont.isCNOEnabled() {
2✔
923
                cont.log.Debug("SnatPbrFltrChain set to:", cont.apicConn.SnatPbrFltrChain)
1✔
924
                // Make sure Pod/NodeBDs are assoicated to same VRF.
1✔
925
                if len(cont.config.ApicHosts) != 0 && cont.config.AciPodBdDn != "" && cont.config.AciNodeBdDn != "" {
1✔
926
                        var expectedVrfRelations []string
×
927
                        expectedVrfRelations = append(expectedVrfRelations, cont.config.AciPodBdDn, cont.config.AciNodeBdDn)
×
928
                        cont.log.Debug("expectedVrfRelations:", expectedVrfRelations)
×
929
                        err = cont.apicConn.ValidateAciVrfAssociation(cont.config.AciVrfDn, expectedVrfRelations)
×
930
                        if err != nil {
×
931
                                cont.log.Error("Pod/NodeBDs and AciL3Out VRF association is incorrect")
×
932
                                panic(err)
×
933
                        }
934
                }
935
        }
936

937
        if len(cont.config.ApicHosts) != 0 && cont.vmmClusterFaultSupported && !cont.isCNOEnabled() {
1✔
938
                //Clear fault instances when the controller starts
×
939
                cont.clearFaultInstances()
×
940
                //Subscribe for vmmEpPD for a given domain
×
941
                var tnTargetFilterEpg string
×
942
                tnTargetFilterEpg += fmt.Sprintf("uni/vmmp-%s/dom-%s/", cont.vmmDomainProvider(), cont.config.AciVmmDomain)
×
943
                subnetTargetFilterEpg := fmt.Sprintf("and(wcard(vmmEpPD.dn,\"%s\"))", tnTargetFilterEpg)
×
944
                cont.apicConn.AddSubscriptionClass("vmmEpPD",
×
945
                        []string{"vmmEpPD"}, subnetTargetFilterEpg)
×
946
                cont.apicConn.SetSubscriptionHooks("vmmEpPD",
×
947
                        func(obj apicapi.ApicObject) bool {
×
948
                                cont.vmmEpPDChanged(obj)
×
949
                                return true
×
950
                        },
×
951
                        func(dn string) {
×
952
                                cont.vmmEpPDDeleted(dn)
×
953
                        })
×
954
        }
955

956
        cont.initStaticObjs()
1✔
957

1✔
958
        err = cont.env.PrepareRun(stopCh)
1✔
959
        if err != nil {
1✔
960
                panic(err.Error())
×
961
        }
962

963
        cont.apicConn.FullSyncHook = func() {
1✔
964
                // put a channel into each work queue and wait on it to
×
965
                // checkpoint object syncing in response to new subscription
×
966
                // updates
×
967
                cont.log.Debug("Starting checkpoint")
×
968
                var chans []chan struct{}
×
969
                qs := make([]workqueue.RateLimitingInterface, 0)
×
970
                _, ok := cont.env.(*K8sEnvironment)
×
971
                if ok {
×
972
                        qs = []workqueue.RateLimitingInterface{cont.podQueue}
×
973
                        if !cont.isCNOEnabled() {
×
974
                                if !cont.config.DisableHppRendering {
×
975
                                        qs = append(qs, cont.netPolQueue)
×
976
                                }
×
977
                                if cont.config.EnableHppDirect {
×
978
                                        qs = append(qs, cont.remIpContQueue)
×
979
                                }
×
980
                                qs = append(qs, cont.qosQueue, cont.serviceQueue,
×
981
                                        cont.snatQueue, cont.netflowQueue, cont.snatNodeInfoQueue,
×
982
                                        cont.rdConfigQueue, cont.erspanQueue,
×
983
                                        cont.epgDnCacheUpdateQueue)
×
984
                        }
985
                }
986
                for _, q := range qs {
×
987
                        c := make(chan struct{})
×
988
                        chans = append(chans, c)
×
989
                        q.Add(c)
×
990
                }
×
991
                for _, c := range chans {
×
992
                        <-c
×
993
                }
×
994
                cont.log.Debug("Checkpoint complete")
×
995
        }
996

997
        if len(cont.config.ApicHosts) != 0 && !cont.isCNOEnabled() {
1✔
998
                cont.BuildSubnetDnCache(cont.config.AciVrfDn, cont.config.AciVrfDn)
×
999
                cont.scheduleRdConfig()
×
1000
                if strings.Contains(cont.config.Flavor, "openstack") {
×
1001
                        cont.setOpenStackSystemId()
×
1002
                }
×
1003
        }
1004

1005
        if !cont.isCNOEnabled() {
2✔
1006
                if cont.config.AciPolicyTenant != cont.config.AciVrfTenant {
2✔
1007
                        cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
1008
                                []string{"hostprotPol"})
1✔
1009
                }
1✔
1010
        } else {
1✔
1011
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
1012
                        []string{"fvBD", "fvAp"})
1✔
1013
                cont.apicConn.AddSubscriptionClass("fvnsVlanInstP",
1✔
1014
                        []string{"fvnsVlanInstP"}, "")
1✔
1015
                cont.apicConn.AddSubscriptionClass("infraRsDomP",
1✔
1016
                        []string{"infraRsDomP"}, "")
1✔
1017
                cont.apicConn.AddSubscriptionClass("physDomP",
1✔
1018
                        []string{"physDomP"}, "")
1✔
1019
                cont.apicConn.AddSubscriptionClass("l3extDomP",
1✔
1020
                        []string{"l3extDomP"}, "")
1✔
1021
                cont.apicConn.AddSubscriptionClass("infraRsVlanNs",
1✔
1022
                        []string{"infraRsVlanNs"}, "")
1✔
1023
                cont.apicConn.AddSubscriptionClass("infraGeneric",
1✔
1024
                        []string{"infraGeneric", "infraRsFuncToEpg"}, "")
1✔
1025
                cont.apicConn.AddSubscriptionClass("l3extOut",
1✔
1026
                        []string{"l3extInstP", "l3extSubnet", "fvRsCons", "fvRsProv", "l3extRsEctx", "l3extRsL3DomAtt", "l3extLNodeP", "l3extRsNodeL3OutAtt", "ipRouteP", "ipNexthopP", "l3extLIfP", "l3extVirtualLIfP", "l3extRsDynPathAtt",
1✔
1027
                                "l3extRsPathL3OutAtt", "l3extMember", "l3extIp", "bgpExtP", "bgpPeerP", "bgpAsP", "bgpLocalAsnP", "bgpRsPeerPfxPol"}, "")
1✔
1028
                cont.apicConn.AddSubscriptionClass("bgpPeerPfxPol",
1✔
1029
                        []string{"bgpPeerPfxPol"}, "")
1✔
1030
        }
1✔
1031

1032
        if cont.config.VmmLite {
1✔
1033
                cont.apicConn.AddSubscriptionClass("infraAttEntityP",
×
1034
                        []string{"infraRsFuncToEpg"}, "")
×
1035

×
1036
                cont.apicConn.SetSubscriptionHooks(
×
1037
                        "infraAttEntityP",
×
1038
                        func(obj apicapi.ApicObject) bool {
×
1039
                                cont.log.Debug("EPG attached to AAEP")
×
1040
                                cont.handleAaepEpgAttach(obj)
×
1041
                                return true
×
1042
                        },
×
1043
                        func(dn string) {
×
1044
                                cont.log.Debug("EPG detached from AAEP")
×
1045
                                cont.handleAaepEpgDetach(dn)
×
1046
                        },
×
1047
                )
1048
        }
1049

1050
        if !cont.isCNOEnabled() {
2✔
1051
                // Websocket notifications for objects under vrfTenant are filtered in
1✔
1052
                // handleSocketUpdate() using DN-based prefix matching (conn.prefix + "_").
1✔
1053
                // All MO names created via AciNameForKey contain this prefix pattern.
1✔
1054
                subscribeMo := []string{"fvBD", "vnsLDevVip", "vnsAbsGraph", "vnsLDevCtx",
1✔
1055
                        "vzFilter", "vzBrCP", "l3extInstP", "vnsSvcRedirectPol",
1✔
1056
                        "vnsRedirectHealthGroup", "fvIPSLAMonitoringPol"}
1✔
1057
                if cont.config.AciPolicyTenant == cont.config.AciVrfTenant {
1✔
1058
                        subscribeMo = append(subscribeMo, "hostprotPol")
×
1059
                }
×
1060
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciVrfTenant,
1✔
1061
                        subscribeMo)
1✔
1062
                cont.apicConn.AddSubscriptionDn(fmt.Sprintf("uni/tn-%s/out-%s",
1✔
1063
                        cont.config.AciVrfTenant, cont.config.AciL3Out),
1✔
1064
                        []string{"fvRsCons"})
1✔
1065
                vmmDn := fmt.Sprintf("comp/prov-%s/ctrlr-[%s]-%s/injcont",
1✔
1066
                        cont.env.VmmPolicy(), cont.config.AciVmmDomain,
1✔
1067
                        cont.config.AciVmmController)
1✔
1068
                // Before subscribing to vmm objects, add vmmInjectedLabel as a child after explicit APIC version check
1✔
1069
                // Since it is not supported for APIC versions < "5.0"
1✔
1070
                cont.addVmmInjectedLabel()
1✔
1071
                cont.apicConn.AddSyncDn(vmmDn,
1✔
1072
                        []string{"vmmInjectedHost", "vmmInjectedNs"})
1✔
1073

1✔
1074
                var tnTargetFilter string
1✔
1075
                if len(cont.config.AciVrfRelatedTenants) > 0 {
1✔
1076
                        for _, tn := range cont.config.AciVrfRelatedTenants {
×
1077
                                tnTargetFilter += fmt.Sprintf("tn-%s/|", tn)
×
1078
                        }
×
1079
                } else {
1✔
1080
                        tnTargetFilter += fmt.Sprintf("tn-%s/|tn-%s/",
1✔
1081
                                cont.config.AciPolicyTenant, cont.config.AciVrfTenant)
1✔
1082
                }
1✔
1083
                subnetTargetFilter := fmt.Sprintf("and(wcard(fvSubnet.dn,\"%s\"))",
1✔
1084
                        tnTargetFilter)
1✔
1085
                cont.apicConn.AddSubscriptionClass("fvSubnet",
1✔
1086
                        []string{"fvSubnet"}, subnetTargetFilter)
1✔
1087

1✔
1088
                cont.apicConn.SetSubscriptionHooks("fvSubnet",
1✔
1089
                        func(obj apicapi.ApicObject) bool {
1✔
1090
                                cont.SubnetChanged(obj, cont.config.AciVrfDn)
×
1091
                                return true
×
1092
                        },
×
1093
                        func(dn string) {
×
1094
                                cont.SubnetDeleted(dn)
×
1095
                        })
×
1096

1097
                cont.apicConn.AddSubscriptionClass("opflexODev",
1✔
1098
                        []string{"opflexODev"}, "")
1✔
1099

1✔
1100
                cont.apicConn.SetSubscriptionHooks("opflexODev",
1✔
1101
                        func(obj apicapi.ApicObject) bool {
1✔
1102
                                cont.opflexDeviceChanged(obj)
×
1103
                                return true
×
1104
                        },
×
1105
                        func(dn string) {
×
1106
                                cont.opflexDeviceDeleted(dn)
×
1107
                        })
×
1108

1109
                if !cont.config.DisableServiceVlanPreprovisioning && !strings.Contains(cont.config.Flavor, "openstack") {
2✔
1110
                        if cont.config.AEP == "" {
2✔
1111
                                cont.log.Error("AEP is missing in configuration, preprovisioning of service vlan will be disabled")
1✔
1112
                        } else {
1✔
1113
                                infraRtAttEntPFilter := fmt.Sprintf("and(wcard(infraRtAttEntP.dn,\"/attentp-%s/\"))", cont.config.AEP)
×
1114
                                cont.apicConn.AddSubscriptionClass("infraRtAttEntP",
×
1115
                                        []string{"infraRtAttEntP"}, infraRtAttEntPFilter)
×
1116

×
1117
                                // For bare metal, the infraRtAttEntP associated with an AEP will be empty.
×
1118
                                // We should not receive any updates for such cases.
×
1119
                                cont.apicConn.SetSubscriptionHooks("infraRtAttEntP",
×
1120
                                        func(obj apicapi.ApicObject) bool {
×
1121
                                                cont.infraRtAttEntPChanged(obj)
×
1122
                                                return true
×
1123
                                        },
×
1124
                                        func(dn string) {
×
1125
                                                cont.infraRtAttEntPDeleted(dn)
×
1126
                                        })
×
1127

1128
                                cont.apicConn.AddSubscriptionClass("vpcIf",
×
1129
                                        []string{"vpcIf"}, "")
×
1130

×
1131
                                cont.apicConn.SetSubscriptionHooks("vpcIf",
×
1132
                                        func(obj apicapi.ApicObject) bool {
×
1133
                                                cont.vpcIfChanged(obj)
×
1134
                                                return true
×
1135
                                        },
×
1136
                                        func(dn string) {
×
1137
                                                cont.vpcIfDeleted(dn)
×
1138
                                        })
×
1139
                        }
1140
                }
1141

1142
                cont.apicConn.VersionUpdateHook =
1✔
1143
                        func() {
1✔
1144
                                cont.initStaticServiceObjs()
×
1145
                        }
×
1146
        } else if cont.config.VmmLite {
1✔
1147
                cont.apicConn.VMMLiteSyncHook = func() {
×
1148
                        cont.syncAndCleanNadCache()
×
1149
                        cont.syncAndCleanNads()
×
1150
                }
×
1151
        }
1152
        go cont.apicConn.Run(stopCh)
1✔
1153
}
1154

1155
func (cont *AciController) syncNodeAciPods(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1156
        cont.log.Debug("Go routine to periodically check the aci pod information in the opflexOdev of a node")
1✔
1157
        ticker := time.NewTicker(seconds * time.Second)
1✔
1158
        defer ticker.Stop()
1✔
1159

1✔
1160
        for {
2✔
1161
                select {
1✔
1162
                case <-ticker.C:
1✔
1163
                        if cont.config.EnableOpflexAgentReconnect {
1✔
1164
                                cont.checkChangeOfOpflexOdevAciPod()
×
1165
                        }
×
1166
                        if cont.config.AciMultipod {
1✔
1167
                                cont.checkChangeOfOdevAciPod()
×
1168
                        }
×
1169
                case <-stopCh:
1✔
1170
                        return
1✔
1171
                }
1172
        }
1173
}
1174

1175
func (cont *AciController) syncOpflexDevices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1176
        cont.log.Debug("Go routine to periodically delete old opflexdevices started")
1✔
1177
        ticker := time.NewTicker(seconds * time.Second)
1✔
1178
        defer ticker.Stop()
1✔
1179

1✔
1180
        for {
2✔
1181
                select {
1✔
1182
                case <-ticker.C:
1✔
1183
                        cont.deleteOldOpflexDevices()
1✔
1184
                case <-stopCh:
1✔
1185
                        return
1✔
1186
                }
1187
        }
1188
}
1189

1190
func (cont *AciController) syncDelayedEpSlices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1191
        cont.log.Debug("Go routine to periodically check and process the epslices having delay adding in service")
1✔
1192
        ticker := time.NewTicker(seconds * time.Second)
1✔
1193
        defer ticker.Stop()
1✔
1194

1✔
1195
        for {
2✔
1196
                select {
1✔
1197
                case <-ticker.C:
1✔
1198
                        cont.processDelayedEpSlices()
1✔
1199
                case <-stopCh:
1✔
1200
                        return
1✔
1201
                }
1202
        }
1203
}
1204

1205
func (cont *AciController) snatGlobalInfoSync(stopCh <-chan struct{}, seconds int) {
1✔
1206
        time.Sleep(time.Duration(seconds) * time.Second)
1✔
1207
        cont.log.Debug("Go routine to periodically sync globalinfo and nodeinfo started")
1✔
1208
        iteration := 0
1✔
1209
        for {
2✔
1210
                // To avoid noisy logs, only printing once in 5 minutes
1✔
1211
                if iteration%5 == 0 {
2✔
1212
                        cont.log.Debug("Syncing GlobalInfo with Node infos")
1✔
1213
                }
1✔
1214
                var nodeInfos []*nodeinfo.NodeInfo
1✔
1215
                cont.indexMutex.Lock()
1✔
1216
                cache.ListAll(cont.snatNodeInfoIndexer, labels.Everything(),
1✔
1217
                        func(nodeInfoObj interface{}) {
2✔
1218
                                nodeInfo := nodeInfoObj.(*nodeinfo.NodeInfo)
1✔
1219
                                nodeInfos = append(nodeInfos, nodeInfo)
1✔
1220
                        })
1✔
1221
                expectedmap := make(map[string]map[string]bool)
1✔
1222
                for _, glinfo := range cont.snatGlobalInfoCache {
2✔
1223
                        for nodename, entry := range glinfo {
2✔
1224
                                if _, found := expectedmap[nodename]; !found {
2✔
1225
                                        newentry := make(map[string]bool)
1✔
1226
                                        newentry[entry.SnatPolicyName] = true
1✔
1227
                                        expectedmap[nodename] = newentry
1✔
1228
                                } else {
2✔
1229
                                        currententry := expectedmap[nodename]
1✔
1230
                                        currententry[entry.SnatPolicyName] = true
1✔
1231
                                        expectedmap[nodename] = currententry
1✔
1232
                                }
1✔
1233
                        }
1234
                }
1235
                cont.indexMutex.Unlock()
1✔
1236

1✔
1237
                for _, value := range nodeInfos {
2✔
1238
                        marked := false
1✔
1239
                        policyNames := value.Spec.SnatPolicyNames
1✔
1240
                        nodeName := value.ObjectMeta.Name
1✔
1241
                        _, ok := expectedmap[nodeName]
1✔
1242
                        if !ok && len(policyNames) > 0 {
2✔
1243
                                cont.log.Info("Adding missing entry in snatglobalinfo for node: ", nodeName)
1✔
1244
                                cont.log.Debug("No snat policies found in snatglobalinfo")
1✔
1245
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1246
                                marked = true
1✔
1247
                        } else if len(policyNames) != len(expectedmap[nodeName]) {
3✔
1248
                                cont.log.Info("Adding missing snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1249
                                cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1250
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1251
                                marked = true
1✔
1252
                        } else {
2✔
1253
                                if len(policyNames) == 0 && len(expectedmap[nodeName]) == 0 {
1✔
1254
                                        // No snatpolicies present
×
1255
                                        continue
×
1256
                                }
1257
                                eq := reflect.DeepEqual(expectedmap[nodeName], policyNames)
1✔
1258
                                if !eq {
2✔
1259
                                        cont.log.Debug("Syncing inconsistent snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1260
                                        cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1261
                                        cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1262
                                        marked = true
1✔
1263
                                }
1✔
1264
                        }
1265
                        if marked {
2✔
1266
                                cont.log.Info("Nodeinfo and globalinfo out of sync for node: ", nodeName)
1✔
1267
                                nodeinfokey, err := cache.MetaNamespaceKeyFunc(value)
1✔
1268
                                if err != nil {
1✔
1269
                                        cont.log.Error("Not able to get key for node: ", nodeName)
×
1270
                                        continue
×
1271
                                }
1272
                                cont.log.Info("Queuing nodeinfokey for globalinfo sync: ", nodeinfokey)
1✔
1273
                                cont.queueNodeInfoUpdateByKey(nodeinfokey)
1✔
1274
                        } else if iteration%5 == 0 {
2✔
1275
                                cont.log.Info("Nodeinfo and globalinfo in sync for node: ", nodeName)
1✔
1276
                        }
1✔
1277
                }
1278
                time.Sleep(time.Duration(seconds) * time.Second)
1✔
1279
                iteration++
1✔
1280
        }
1281
}
1282

1283
func (cont *AciController) processSyncQueue(queue workqueue.RateLimitingInterface,
1284
        queueStop <-chan struct{}) {
1✔
1285
        go wait.Until(func() {
2✔
1286
                for {
2✔
1287
                        syncType, quit := queue.Get()
1✔
1288
                        if quit {
2✔
1289
                                break
1✔
1290
                        }
1291
                        var requeue bool
1✔
1292
                        if sType, ok := syncType.(string); ok {
2✔
1293
                                if f, ok := cont.syncProcessors[sType]; ok {
2✔
1294
                                        requeue = f()
1✔
1295
                                }
1✔
1296
                        }
1297
                        if requeue {
1✔
1298
                                queue.AddRateLimited(syncType)
×
1299
                        } else {
1✔
1300
                                queue.Forget(syncType)
1✔
1301
                        }
1✔
1302
                        queue.Done(syncType)
1✔
1303
                }
1304
        }, time.Second, queueStop)
1305
        <-queueStop
1✔
1306
        queue.ShutDown()
1✔
1307
}
1308

1309
func (cont *AciController) scheduleSyncGlobalInfo() {
1✔
1310
        cont.syncQueue.AddRateLimited("snatGlobalInfo")
1✔
1311
}
1✔
1312
func (cont *AciController) scheduleRdConfig() {
×
1313
        cont.syncQueue.AddRateLimited("rdConfig")
×
1314
}
×
1315
func (cont *AciController) scheduleCreateIstioCR() {
×
1316
        cont.syncQueue.AddRateLimited("istioCR")
×
1317
}
×
1318

1319
func (cont *AciController) addVmmInjectedLabel() {
1✔
1320
        if apicapi.ApicVersion >= "5.2" {
1✔
1321
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedLabel")
×
1322
                if err != nil {
×
1323
                        panic(err.Error())
×
1324
                }
1325
                err = apicapi.AddMetaDataChild("vmmInjectedSvc", "vmmInjectedLabel")
×
1326
                if err != nil {
×
1327
                        panic(err.Error())
×
1328
                }
1329
        }
1330
        if apicapi.ApicVersion >= "5.0" {
2✔
1331
                err := apicapi.AddMetaDataChild("vmmInjectedReplSet", "vmmInjectedLabel")
1✔
1332
                if err != nil {
1✔
1333
                        panic(err.Error())
×
1334
                }
1335
                err = apicapi.AddMetaDataChild("vmmInjectedContGrp", "vmmInjectedLabel")
1✔
1336
                if err != nil {
1✔
1337
                        panic(err.Error())
×
1338
                }
1339
                err = apicapi.AddMetaDataChild("vmmInjectedDepl", "vmmInjectedLabel")
1✔
1340
                if err != nil {
1✔
1341
                        panic(err.Error())
×
1342
                }
1343
        }
1344
}
1345

1346
func (cont *AciController) isCNOEnabled() bool {
1✔
1347
        return cont.config.ChainedMode || cont.config.VmmLite
1✔
1348
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc