• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 11144

22 Oct 2025 03:03PM UTC coverage: 65.289% (-0.6%) from 65.843%
11144

push

travis-pro

web-flow
Merge pull request #1597 from noironetworks/mmr-6.1.1-vmm_lite_bridge_nad

Add optional NAD bridge configuration fields to ControllerConfig

0 of 46 new or added lines in 1 file covered. (0.0%)

333 existing lines in 5 files now uncovered.

13383 of 20498 relevant lines covered (65.29%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.56
/pkg/controller/controller.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "encoding/json"
19
        "fmt"
20
        "net"
21
        "os"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/sirupsen/logrus"
29
        "github.com/yl2chen/cidranger"
30
        "golang.org/x/time/rate"
31

32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        v1net "k8s.io/api/networking/v1"
35
        "k8s.io/apimachinery/pkg/labels"
36
        "k8s.io/apimachinery/pkg/util/wait"
37
        "k8s.io/client-go/kubernetes"
38
        "k8s.io/client-go/tools/cache"
39
        "k8s.io/client-go/util/workqueue"
40

41
        "github.com/noironetworks/aci-containers/pkg/apicapi"
42
        fabattv1 "github.com/noironetworks/aci-containers/pkg/fabricattachment/apis/aci.fabricattachment/v1"
43
        fabattclset "github.com/noironetworks/aci-containers/pkg/fabricattachment/clientset/versioned"
44
        hppv1 "github.com/noironetworks/aci-containers/pkg/hpp/apis/aci.hpp/v1"
45
        "github.com/noironetworks/aci-containers/pkg/index"
46
        "github.com/noironetworks/aci-containers/pkg/ipam"
47
        istiov1 "github.com/noironetworks/aci-containers/pkg/istiocrd/apis/aci.istio/v1"
48
        "github.com/noironetworks/aci-containers/pkg/metadata"
49
        nodeinfo "github.com/noironetworks/aci-containers/pkg/nodeinfo/apis/aci.snat/v1"
50
        rdConfig "github.com/noironetworks/aci-containers/pkg/rdconfig/apis/aci.snat/v1"
51
        snatglobalinfo "github.com/noironetworks/aci-containers/pkg/snatglobalinfo/apis/aci.snat/v1"
52
        "github.com/noironetworks/aci-containers/pkg/util"
53
)
54

55
type podUpdateFunc func(*v1.Pod) (*v1.Pod, error)
56
type nodeUpdateFunc func(*v1.Node) (*v1.Node, error)
57
type serviceUpdateFunc func(*v1.Service) (*v1.Service, error)
58
type listNetworkPoliciesFunc func(string) (*v1net.NetworkPolicyList, error)
59
type listNamespacesFunc func() (*v1.NamespaceList, error)
60

61
type AciController struct {
62
        log    *logrus.Logger
63
        config *ControllerConfig
64
        env    Environment
65

66
        defaultEg string
67
        defaultSg string
68

69
        unitTestMode bool
70

71
        podQueue               workqueue.RateLimitingInterface
72
        netPolQueue            workqueue.RateLimitingInterface
73
        qosQueue               workqueue.RateLimitingInterface
74
        serviceQueue           workqueue.RateLimitingInterface
75
        snatQueue              workqueue.RateLimitingInterface
76
        netflowQueue           workqueue.RateLimitingInterface
77
        erspanQueue            workqueue.RateLimitingInterface
78
        snatNodeInfoQueue      workqueue.RateLimitingInterface
79
        rdConfigQueue          workqueue.RateLimitingInterface
80
        istioQueue             workqueue.RateLimitingInterface
81
        nodeFabNetAttQueue     workqueue.RateLimitingInterface
82
        netFabConfigQueue      workqueue.RateLimitingInterface
83
        nadVlanMapQueue        workqueue.RateLimitingInterface
84
        fabricVlanPoolQueue    workqueue.RateLimitingInterface
85
        netFabL3ConfigQueue    workqueue.RateLimitingInterface
86
        remIpContQueue         workqueue.RateLimitingInterface
87
        epgDnCacheUpdateQueue  workqueue.RateLimitingInterface
88
        aaepMonitorConfigQueue workqueue.RateLimitingInterface
89

90
        namespaceIndexer                     cache.Indexer
91
        namespaceInformer                    cache.Controller
92
        podIndexer                           cache.Indexer
93
        podInformer                          cache.Controller
94
        endpointsIndexer                     cache.Indexer
95
        endpointsInformer                    cache.Controller
96
        serviceIndexer                       cache.Indexer
97
        serviceInformer                      cache.Controller
98
        replicaSetIndexer                    cache.Indexer
99
        replicaSetInformer                   cache.Controller
100
        deploymentIndexer                    cache.Indexer
101
        deploymentInformer                   cache.Controller
102
        nodeIndexer                          cache.Indexer
103
        nodeInformer                         cache.Controller
104
        networkPolicyIndexer                 cache.Indexer
105
        networkPolicyInformer                cache.Controller
106
        snatIndexer                          cache.Indexer
107
        snatInformer                         cache.Controller
108
        snatNodeInfoIndexer                  cache.Indexer
109
        snatNodeInformer                     cache.Controller
110
        snatLocalInfoInformer                cache.Controller
111
        crdInformer                          cache.Controller
112
        rdConfigInformer                     cache.Controller
113
        rdConfigIndexer                      cache.Indexer
114
        qosIndexer                           cache.Indexer
115
        qosInformer                          cache.Controller
116
        netflowIndexer                       cache.Indexer
117
        netflowInformer                      cache.Controller
118
        erspanIndexer                        cache.Indexer
119
        erspanInformer                       cache.Controller
120
        nodePodIfIndexer                     cache.Indexer
121
        nodePodIfInformer                    cache.Controller
122
        istioIndexer                         cache.Indexer
123
        istioInformer                        cache.Controller
124
        endpointSliceIndexer                 cache.Indexer
125
        endpointSliceInformer                cache.Controller
126
        snatCfgInformer                      cache.Controller
127
        updatePod                            podUpdateFunc
128
        updateNode                           nodeUpdateFunc
129
        updateServiceStatus                  serviceUpdateFunc
130
        listNetworkPolicies                  listNetworkPoliciesFunc
131
        listNamespaces                       listNamespacesFunc
132
        nodeFabNetAttInformer                cache.SharedIndexInformer
133
        netFabConfigInformer                 cache.SharedIndexInformer
134
        nadVlanMapInformer                   cache.SharedIndexInformer
135
        fabricVlanPoolInformer               cache.SharedIndexInformer
136
        networkFabricL3ConfigurationInformer cache.SharedIndexInformer
137
        fabNetAttClient                      *fabattclset.Clientset
138
        proactiveConfInformer                cache.SharedIndexInformer
139
        aaepMonitorInformer                  cache.SharedIndexInformer
140
        poster                               *EventPoster
141

142
        indexMutex sync.Mutex
143
        hppMutex   sync.Mutex
144

145
        configuredPodNetworkIps *netIps
146
        podNetworkIps           *netIps
147
        serviceIps              *ipam.IpCache
148
        staticServiceIps        *netIps
149
        nodeServiceIps          *netIps
150

151
        // index of pods matched by deployments
152
        depPods *index.PodSelectorIndex
153
        // index of pods matched by network policies
154
        netPolPods *index.PodSelectorIndex
155
        // index of pods matched by network policy ingress rules
156
        netPolIngressPods *index.PodSelectorIndex
157
        // index of pods matched by network policy egress rules
158
        netPolEgressPods *index.PodSelectorIndex
159
        // index of IP addresses contained in endpoints objects
160
        endpointsIpIndex cidranger.Ranger
161
        // index of service target ports
162
        targetPortIndex map[string]*portIndexEntry
163
        // index of ip blocks referenced by network policy egress rules
164
        netPolSubnetIndex cidranger.Ranger
165
        // index of pods matched by erspan policies
166
        erspanPolPods *index.PodSelectorIndex
167

168
        apicConn *apicapi.ApicConnection
169

170
        nodeServiceMetaCache map[string]*nodeServiceMeta
171
        nodeACIPod           map[string]aciPodAnnot
172
        nodeACIPodAnnot      map[string]aciPodAnnot
173
        nodeOpflexDevice     map[string]apicapi.ApicSlice
174
        nodePodNetCache      map[string]*nodePodNetMeta
175
        serviceMetaCache     map[string]*serviceMeta
176
        snatPolicyCache      map[string]*ContSnatPolicy
177
        delayedEpSlices      []*DelayedEpSlice
178
        snatServices         map[string]bool
179
        snatNodeInfoCache    map[string]*nodeinfo.NodeInfo
180
        rdConfigCache        map[string]*rdConfig.RdConfig
181
        rdConfigSubnetCache  map[string]*rdConfig.RdConfigSpec
182
        istioCache           map[string]*istiov1.AciIstioOperator
183
        podIftoEp            map[string]*EndPointData
184
        // Node Name and Policy Name
185
        snatGlobalInfoCache map[string]map[string]*snatglobalinfo.GlobalInfo
186
        nodeSyncEnabled     bool
187
        serviceSyncEnabled  bool
188
        snatSyncEnabled     bool
189
        syncQueue           workqueue.RateLimitingInterface
190
        syncProcessors      map[string]func() bool
191
        serviceEndPoints    ServiceEndPointType
192
        crdHandlers         map[string]func(*AciController, <-chan struct{})
193
        stopCh              <-chan struct{}
194
        //index of containerportname to ctrPortNameEntry
195
        ctrPortNameCache map[string]*ctrPortNameEntry
196
        // named networkPolicies
197
        nmPortNp map[string]bool
198
        //maps network policy hash to hpp
199
        hppRef map[string]hppReference
200
        //map for ns to remoteIpConts
201
        nsRemoteIpCont map[string]remoteIpConts
202
        // cache to look for Epg DNs which are bound to Vmm domain
203
        cachedEpgDns             []string
204
        vmmClusterFaultSupported bool
205
        additionalNetworkCache   map[string]*AdditionalNetworkMeta
206
        //Used in Shared mode
207
        sharedEncapCache       map[int]*sharedEncapData
208
        sharedEncapAepCache    map[string]map[int]bool
209
        sharedEncapSviCache    map[int]*NfL3Data
210
        sharedEncapVrfCache    map[string]*NfVrfData
211
        sharedEncapTenantCache map[string]*NfTenantData
212
        nfl3configGenerationId int64
213
        // vlan to propertiesList
214
        sharedEncapNfcCache         map[int]*NfcData
215
        sharedEncapNfcVlanMap       map[int]*NfcData
216
        sharedEncapNfcLabelMap      map[string]*NfcData
217
        sharedEncapNfcAppProfileMap map[string]bool
218
        // nadVlanMap encapLabel to vlan
219
        sharedEncapLabelMap      map[string][]int
220
        lldpIfCache              map[string]*NfLLDPIfData
221
        globalVlanConfig         globalVlanConfig
222
        fabricVlanPoolMap        map[string]map[string]string
223
        openStackFabricPathDnMap map[string]openstackOpflexOdevInfo
224
        hostFabricPathDnMap      map[string]hostFabricInfo
225
        openStackSystemId        string
226
        sharedAaepMonitor        map[string]map[string]*AaepEpgAttachData
227
}
228

229
type hostFabricInfo struct {
230
        fabricPathDn string
231
        host         string
232
        vpcIfDn      map[string]struct{}
233
}
234

235
type NfLLDPIfData struct {
236
        LLDPIf string
237
        // As of now, manage at the NAD level
238
        // more granular introduces intf tracking complexities
239
        // for not sufficient benefits
240
        Refs map[string]bool
241
}
242

243
type NfL3OutData struct {
244
        // +kubebuilder:validation:Enum:"import"
245
        RtCtrl     string
246
        PodId      int
247
        RtrNodeMap map[int]*fabattv1.FabricL3OutRtrNode
248
        ExtEpgMap  map[string]*fabattv1.PolicyPrefixGroup
249
        SviMap     map[int]bool
250
}
251

252
type NfTenantData struct {
253
        CommonTenant     bool
254
        L3OutConfig      map[string]*NfL3OutData
255
        BGPPeerPfxConfig map[string]*fabattv1.BGPPeerPrefixPolicy
256
}
257

258
type NfVrfData struct {
259
        TenantConfig map[string]*NfTenantData
260
}
261

262
type NfL3Networks struct {
263
        fabattv1.PrimaryNetwork
264
        Subnets map[string]*fabattv1.FabricL3Subnet
265
}
266

267
type NfL3Data struct {
268
        Tenant      string
269
        Vrf         fabattv1.VRF
270
        PodId       int
271
        ConnectedNw *NfL3Networks
272
        NetAddr     map[string]*RoutedNetworkData
273
        Nodes       map[int]fabattv1.FabricL3OutNode
274
}
275

276
// maps pod name to remoteIpCont
277
type remoteIpConts map[string]remoteIpCont
278

279
// remoteIpCont maps ip to pod labels
280
type remoteIpCont map[string]map[string]string
281

282
type NfcData struct {
283
        Aeps map[string]bool
284
        Epg  fabattv1.Epg
285
}
286

287
type sharedEncapData struct {
288
        //node to NAD to pods
289
        Pods   map[string]map[string][]string
290
        NetRef map[string]*AdditionalNetworkMeta
291
        Aeps   map[string]bool
292
}
293

294
type globalVlanConfig struct {
295
        SharedPhysDom apicapi.ApicObject
296
        SharedL3Dom   apicapi.ApicObject
297
}
298

299
type hppReference struct {
300
        RefCount uint              `json:"ref-count,omitempty"`
301
        Npkeys   []string          `json:"npkeys,omitempty"`
302
        HppObj   apicapi.ApicSlice `json:"hpp-obj,omitempty"`
303
        HppCr    hppv1.HostprotPol `json:"hpp-cr,omitempty"`
304
}
305

306
type DelayedEpSlice struct {
307
        ServiceKey  string
308
        OldEpSlice  *discovery.EndpointSlice
309
        NewEpSlice  *discovery.EndpointSlice
310
        DelayedTime time.Time
311
}
312

313
type aciPodAnnot struct {
314
        aciPod             string
315
        isSingleOpflexOdev bool
316
        disconnectTime     time.Time
317
        connectTime        time.Time
318
        lastErrorTime      time.Time
319
}
320

321
type nodeServiceMeta struct {
322
        serviceEp metadata.ServiceEndpoint
323
}
324

325
type nodePodNetMeta struct {
326
        nodePods            map[string]bool
327
        podNetIps           metadata.NetIps
328
        podNetIpsAnnotation string
329
}
330

331
type openstackOpflexOdevInfo struct {
332
        opflexODevDn map[string]struct{}
333
        fabricPathDn string
334
}
335

336
type serviceMeta struct {
337
        requestedIps     []net.IP
338
        ingressIps       []net.IP
339
        staticIngressIps []net.IP
340
}
341

342
type ipIndexEntry struct {
343
        ipNet net.IPNet
344
        keys  map[string]bool
345
}
346

347
type targetPort struct {
348
        proto v1.Protocol
349
        ports []int
350
}
351

352
type portIndexEntry struct {
353
        port              targetPort
354
        serviceKeys       map[string]bool
355
        networkPolicyKeys map[string]bool
356
}
357

358
type portRangeSnat struct {
359
        start int
360
        end   int
361
}
362

363
// EndPointData holds PodIF data in controller.
364
type EndPointData struct {
365
        MacAddr    string
366
        EPG        string
367
        Namespace  string
368
        AppProfile string
369
}
370

371
type ctrPortNameEntry struct {
372
        // Proto+port->pods
373
        ctrNmpToPods map[string]map[string]bool
374
}
375

376
type LinkData struct {
377
        Link []string
378
        Pods []string
379
}
380

381
type RoutedNodeData struct {
382
        addr string
383
        idx  int
384
}
385

386
type RoutedNetworkData struct {
387
        subnet       string
388
        netAddress   string
389
        maskLen      int
390
        numAllocated int
391
        maxAddresses int
392
        baseAddress  net.IP
393
        nodeMap      map[string]RoutedNodeData
394
        availableMap map[int]bool
395
}
396

397
type AdditionalNetworkMeta struct {
398
        NetworkName string
399
        EncapVlan   string
400
        //node+localiface->fabricLinks
401
        FabricLink map[string]map[string]LinkData
402
        NodeCache  map[string]*fabattv1.NodeFabricNetworkAttachment
403
        Mode       util.EncapMode
404
}
405

406
type ServiceEndPointType interface {
407
        InitClientInformer(kubeClient *kubernetes.Clientset)
408
        Run(stopCh <-chan struct{})
409
        Wait(stopCh <-chan struct{})
410
        UpdateServicesForNode(nodename string)
411
        GetnodesMetadata(key string, service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint)
412
        SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool
413
        SetNpServiceAugmentForService(servicekey string, service *v1.Service, prs *portRemoteSubnet,
414
                portAugments map[string]*portServiceAugment, subnetIndex cidranger.Ranger, logger *logrus.Entry)
415
}
416

417
type serviceEndpoint struct {
418
        cont *AciController
419
}
420
type serviceEndpointSlice struct {
421
        cont *AciController
422
}
423

424
type AaepEpgAttachData struct {
425
        encapVlan     int
426
        nadName       string
427
        namespaceName string
428
        nadCreated    bool
429
}
430

431
type EpgVlanMap struct {
432
        epgDn     string
433
        encapVlan int
434
}
435

436
func (sep *serviceEndpoint) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
437
        sep.cont.initEndpointsInformerFromClient(kubeClient)
×
UNCOV
438
}
×
439

440
func (seps *serviceEndpointSlice) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
441
        seps.cont.initEndpointSliceInformerFromClient(kubeClient)
×
UNCOV
442
}
×
443

444
func (sep *serviceEndpoint) Run(stopCh <-chan struct{}) {
1✔
445
        go sep.cont.endpointsInformer.Run(stopCh)
1✔
446
}
1✔
447

448
func (seps *serviceEndpointSlice) Run(stopCh <-chan struct{}) {
1✔
449
        go seps.cont.endpointSliceInformer.Run(stopCh)
1✔
450
}
1✔
451

452
func (sep *serviceEndpoint) Wait(stopCh <-chan struct{}) {
1✔
453
        cache.WaitForCacheSync(stopCh,
1✔
454
                sep.cont.endpointsInformer.HasSynced,
1✔
455
                sep.cont.serviceInformer.HasSynced)
1✔
456
}
1✔
457

458
func (seps *serviceEndpointSlice) Wait(stopCh <-chan struct{}) {
1✔
459
        seps.cont.log.Debug("Waiting for EndPointSlicecache sync")
1✔
460
        cache.WaitForCacheSync(stopCh,
1✔
461
                seps.cont.endpointSliceInformer.HasSynced,
1✔
462
                seps.cont.serviceInformer.HasSynced)
1✔
463
}
1✔
464

465
func (e *ipIndexEntry) Network() net.IPNet {
1✔
466
        return e.ipNet
1✔
467
}
1✔
468

469
func newNodePodNetMeta() *nodePodNetMeta {
1✔
470
        return &nodePodNetMeta{
1✔
471
                nodePods: make(map[string]bool),
1✔
472
        }
1✔
473
}
1✔
474

475
func createQueue(name string) workqueue.RateLimitingInterface {
1✔
476
        return workqueue.NewNamedRateLimitingQueue(
1✔
477
                workqueue.NewMaxOfRateLimiter(
1✔
478
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
479
                                10*time.Second),
1✔
480
                        &workqueue.BucketRateLimiter{
1✔
481
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
482
                        },
1✔
483
                ),
1✔
484
                "delta")
1✔
485
}
1✔
486

487
func NewController(config *ControllerConfig, env Environment, log *logrus.Logger, unittestmode bool) *AciController {
1✔
488
        cont := &AciController{
1✔
489
                log:          log,
1✔
490
                config:       config,
1✔
491
                env:          env,
1✔
492
                defaultEg:    "",
1✔
493
                defaultSg:    "",
1✔
494
                unitTestMode: unittestmode,
1✔
495

1✔
496
                podQueue:               createQueue("pod"),
1✔
497
                netPolQueue:            createQueue("networkPolicy"),
1✔
498
                qosQueue:               createQueue("qos"),
1✔
499
                netflowQueue:           createQueue("netflow"),
1✔
500
                erspanQueue:            createQueue("erspan"),
1✔
501
                serviceQueue:           createQueue("service"),
1✔
502
                snatQueue:              createQueue("snat"),
1✔
503
                snatNodeInfoQueue:      createQueue("snatnodeinfo"),
1✔
504
                rdConfigQueue:          createQueue("rdconfig"),
1✔
505
                istioQueue:             createQueue("istio"),
1✔
506
                nodeFabNetAttQueue:     createQueue("nodefabricnetworkattachment"),
1✔
507
                netFabConfigQueue:      createQueue("networkfabricconfiguration"),
1✔
508
                nadVlanMapQueue:        createQueue("nadvlanmap"),
1✔
509
                fabricVlanPoolQueue:    createQueue("fabricvlanpool"),
1✔
510
                netFabL3ConfigQueue:    createQueue("networkfabricl3configuration"),
1✔
511
                remIpContQueue:         createQueue("remoteIpContainer"),
1✔
512
                epgDnCacheUpdateQueue:  createQueue("epgDnCache"),
1✔
513
                aaepMonitorConfigQueue: createQueue("aaepepgmap"),
1✔
514
                syncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
515
                        &workqueue.BucketRateLimiter{
1✔
516
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
517
                        }, "sync"),
1✔
518

1✔
519
                configuredPodNetworkIps: newNetIps(),
1✔
520
                podNetworkIps:           newNetIps(),
1✔
521
                serviceIps:              ipam.NewIpCache(),
1✔
522
                staticServiceIps:        newNetIps(),
1✔
523
                nodeServiceIps:          newNetIps(),
1✔
524

1✔
525
                nodeACIPod:       make(map[string]aciPodAnnot),
1✔
526
                nodeACIPodAnnot:  make(map[string]aciPodAnnot),
1✔
527
                nodeOpflexDevice: make(map[string]apicapi.ApicSlice),
1✔
528

1✔
529
                nodeServiceMetaCache:        make(map[string]*nodeServiceMeta),
1✔
530
                nodePodNetCache:             make(map[string]*nodePodNetMeta),
1✔
531
                serviceMetaCache:            make(map[string]*serviceMeta),
1✔
532
                snatPolicyCache:             make(map[string]*ContSnatPolicy),
1✔
533
                snatServices:                make(map[string]bool),
1✔
534
                snatNodeInfoCache:           make(map[string]*nodeinfo.NodeInfo),
1✔
535
                rdConfigCache:               make(map[string]*rdConfig.RdConfig),
1✔
536
                rdConfigSubnetCache:         make(map[string]*rdConfig.RdConfigSpec),
1✔
537
                podIftoEp:                   make(map[string]*EndPointData),
1✔
538
                snatGlobalInfoCache:         make(map[string]map[string]*snatglobalinfo.GlobalInfo),
1✔
539
                istioCache:                  make(map[string]*istiov1.AciIstioOperator),
1✔
540
                crdHandlers:                 make(map[string]func(*AciController, <-chan struct{})),
1✔
541
                ctrPortNameCache:            make(map[string]*ctrPortNameEntry),
1✔
542
                nmPortNp:                    make(map[string]bool),
1✔
543
                hppRef:                      make(map[string]hppReference),
1✔
544
                additionalNetworkCache:      make(map[string]*AdditionalNetworkMeta),
1✔
545
                sharedEncapCache:            make(map[int]*sharedEncapData),
1✔
546
                sharedEncapAepCache:         make(map[string]map[int]bool),
1✔
547
                sharedEncapSviCache:         make(map[int]*NfL3Data),
1✔
548
                sharedEncapVrfCache:         make(map[string]*NfVrfData),
1✔
549
                sharedEncapTenantCache:      make(map[string]*NfTenantData),
1✔
550
                sharedEncapNfcCache:         make(map[int]*NfcData),
1✔
551
                sharedEncapNfcVlanMap:       make(map[int]*NfcData),
1✔
552
                sharedEncapNfcLabelMap:      make(map[string]*NfcData),
1✔
553
                sharedEncapNfcAppProfileMap: make(map[string]bool),
1✔
554
                sharedEncapLabelMap:         make(map[string][]int),
1✔
555
                lldpIfCache:                 make(map[string]*NfLLDPIfData),
1✔
556
                fabricVlanPoolMap:           make(map[string]map[string]string),
1✔
557
                openStackFabricPathDnMap:    make(map[string]openstackOpflexOdevInfo),
1✔
558
                hostFabricPathDnMap:         make(map[string]hostFabricInfo),
1✔
559
                nsRemoteIpCont:              make(map[string]remoteIpConts),
1✔
560
                sharedAaepMonitor:           make(map[string]map[string]*AaepEpgAttachData),
1✔
561
        }
1✔
562
        cont.syncProcessors = map[string]func() bool{
1✔
563
                "snatGlobalInfo": cont.syncSnatGlobalInfo,
1✔
564
                "rdConfig":       cont.syncRdConfig,
1✔
565
                /* Commenting code to remove dependency from istio.io/istio package.
1✔
566
                   Vulnerabilties were detected by quay.io security scan of aci-containers-controller
1✔
567
                   and aci-containers-operator images for istio.io/istio package
1✔
568

1✔
569
                "istioCR":        cont.createIstioCR,
1✔
570
                */
1✔
571
        }
1✔
572
        return cont
1✔
573
}
1✔
574

575
func (cont *AciController) Init() {
×
576
        if cont.config.ChainedMode {
×
577
                cont.log.Info("In chained mode")
×
578
        }
×
579
        if cont.config.VmmLite {
×
580
                cont.log.Info("In VMM lite mode")
×
UNCOV
581
        }
×
582

583
        egdata, err := json.Marshal(cont.config.DefaultEg)
×
584
        if err != nil {
×
585
                cont.log.Error("Could not serialize default endpoint group")
×
UNCOV
586
                panic(err.Error())
×
587
        }
588
        cont.defaultEg = string(egdata)
×
589

×
590
        sgdata, err := json.Marshal(cont.config.DefaultSg)
×
591
        if err != nil {
×
592
                cont.log.Error("Could not serialize default security groups")
×
UNCOV
593
                panic(err.Error())
×
594
        }
595
        cont.defaultSg = string(sgdata)
×
596

×
597
        cont.log.Debug("Initializing IPAM")
×
598
        cont.initIpam()
×
599
        // check if the cluster supports endpoint slices
×
600
        // if cluster doesn't have the support fallback to endpoints
×
601
        kubeClient := cont.env.(*K8sEnvironment).kubeClient
×
602
        if util.IsEndPointSlicesSupported(kubeClient) {
×
603
                cont.serviceEndPoints = &serviceEndpointSlice{}
×
604
                cont.serviceEndPoints.(*serviceEndpointSlice).cont = cont
×
605
                cont.log.Info("Initializing ServiceEndpointSlices")
×
606
        } else {
×
607
                cont.serviceEndPoints = &serviceEndpoint{}
×
608
                cont.serviceEndPoints.(*serviceEndpoint).cont = cont
×
609
                cont.log.Info("Initializing ServiceEndpoints")
×
UNCOV
610
        }
×
611

612
        err = cont.env.Init(cont)
×
613
        if err != nil {
×
UNCOV
614
                panic(err.Error())
×
615
        }
616
}
617

618
func (cont *AciController) processQueue(queue workqueue.RateLimitingInterface,
619
        store cache.Store, handler func(interface{}) bool,
620
        deleteHandler func(string) bool,
621
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
622
        go wait.Until(func() {
2✔
623
                for {
2✔
624
                        key, quit := queue.Get()
1✔
625
                        if quit {
2✔
626
                                break
1✔
627
                        }
628

629
                        var requeue bool
1✔
630
                        switch key := key.(type) {
1✔
631
                        case chan struct{}:
×
UNCOV
632
                                close(key)
×
633
                        case string:
1✔
634
                                if strings.HasPrefix(key, "DELETED_") {
2✔
635
                                        delKey := strings.Trim(key, "DELETED_")
1✔
636
                                        requeue = deleteHandler(delKey)
1✔
637
                                } else {
2✔
638
                                        obj, exists, err := store.GetByKey(key)
1✔
639
                                        if err != nil {
1✔
640
                                                cont.log.Debugf("Error fetching object with key %s from store: %v", key, err)
×
UNCOV
641
                                        }
×
642
                                        //Handle Add/Update/Delete
643
                                        if exists && handler != nil {
2✔
644
                                                requeue = handler(obj)
1✔
645
                                        }
1✔
646
                                        //Handle Post Delete
647
                                        if !exists && postDelHandler != nil {
1✔
648
                                                requeue = postDelHandler()
×
UNCOV
649
                                        }
×
650
                                }
651
                        }
652
                        if requeue {
2✔
653
                                queue.AddRateLimited(key)
1✔
654
                        } else {
2✔
655
                                queue.Forget(key)
1✔
656
                        }
1✔
657
                        queue.Done(key)
1✔
658
                }
659
        }, time.Second, stopCh)
660
        <-stopCh
1✔
661
        queue.ShutDown()
1✔
662
}
663

664
func (cont *AciController) processRemIpContQueue(queue workqueue.RateLimitingInterface,
665
        handler func(interface{}) bool,
666
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
667
        go wait.Until(func() {
2✔
668
                for {
2✔
669
                        key, quit := queue.Get()
1✔
670
                        if quit {
2✔
671
                                break
1✔
672
                        }
673

674
                        var requeue bool
1✔
675
                        switch key := key.(type) {
1✔
676
                        case chan struct{}:
×
UNCOV
677
                                close(key)
×
678
                        case string:
1✔
679
                                if handler != nil {
2✔
680
                                        requeue = handler(key)
1✔
681
                                }
1✔
682
                                if postDelHandler != nil {
2✔
683
                                        requeue = postDelHandler()
1✔
684
                                }
1✔
685
                        }
686
                        if requeue {
1✔
UNCOV
687
                                queue.AddRateLimited(key)
×
688
                        } else {
1✔
689
                                queue.Forget(key)
1✔
690
                        }
1✔
691
                        queue.Done(key)
1✔
692

693
                }
694
        }, time.Second, stopCh)
695
        <-stopCh
1✔
696
        queue.ShutDown()
1✔
697
}
698

699
func (cont *AciController) processEpgDnCacheUpdateQueue(queue workqueue.RateLimitingInterface,
700
        handler func(interface{}) bool,
701
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
702
        go wait.Until(func() {
2✔
703
                for {
2✔
704
                        key, quit := queue.Get()
1✔
705
                        if quit {
2✔
706
                                break
1✔
707
                        }
708

709
                        var requeue bool
1✔
710
                        switch key := key.(type) {
1✔
711
                        case chan struct{}:
×
UNCOV
712
                                close(key)
×
713
                        case bool:
1✔
714
                                if handler != nil {
2✔
715
                                        requeue = handler(key)
1✔
716
                                }
1✔
717
                                if postDelHandler != nil {
1✔
718
                                        requeue = postDelHandler()
×
UNCOV
719
                                }
×
720
                        }
721
                        if requeue {
1✔
UNCOV
722
                                queue.AddRateLimited(key)
×
723
                        } else {
1✔
724
                                queue.Forget(key)
1✔
725
                        }
1✔
726
                        queue.Done(key)
1✔
727

728
                }
729
        }, time.Second, stopCh)
730
        <-stopCh
1✔
731
        queue.ShutDown()
1✔
732
}
733

734
func (cont *AciController) globalStaticObjs() apicapi.ApicSlice {
1✔
735
        return apicapi.ApicSlice{}
1✔
736
}
1✔
737

738
func (cont *AciController) aciNameForKey(ktype, key string) string {
1✔
739
        return util.AciNameForKey(cont.config.AciPrefix, ktype, key)
1✔
740
}
1✔
741

742
func (cont *AciController) initStaticObjs() {
1✔
743
        cont.env.InitStaticAciObjects()
1✔
744
        cont.apicConn.WriteStaticApicObjects(cont.config.AciPrefix+"_static",
1✔
745
                cont.globalStaticObjs())
1✔
746
}
1✔
747

748
func (cont *AciController) vmmDomainProvider() (vmmProv string) {
1✔
749
        vmmProv = "Kubernetes"
1✔
750
        if strings.ToLower(cont.config.AciVmmDomainType) == "openshift" {
1✔
751
                vmmProv = "OpenShift"
×
UNCOV
752
        }
×
753
        return
1✔
754
}
755

756
func (cont *AciController) Run(stopCh <-chan struct{}) {
1✔
757
        var err error
1✔
758
        var privKey []byte
1✔
759
        var apicCert []byte
1✔
760

1✔
761
        cont.config.AciVrfDn = "uni/tn-" + cont.config.AciVrfTenant + "/ctx-" + cont.config.AciVrf
1✔
762

1✔
763
        if cont.config.ApicPrivateKeyPath != "" {
1✔
764
                privKey, err = os.ReadFile(cont.config.ApicPrivateKeyPath)
×
765
                if err != nil {
×
UNCOV
766
                        panic(err)
×
767
                }
768
        }
769
        if cont.config.ApicCertPath != "" {
1✔
770
                apicCert, err = os.ReadFile(cont.config.ApicCertPath)
×
771
                if err != nil {
×
UNCOV
772
                        panic(err)
×
773
                }
774
        }
775
        // If not defined, default is 1800
776
        if cont.config.ApicRefreshTimer == "" {
2✔
777
                cont.config.ApicRefreshTimer = "1800"
1✔
778
        }
1✔
779
        refreshTimeout, err := strconv.Atoi(cont.config.ApicRefreshTimer)
1✔
780
        if err != nil {
1✔
UNCOV
781
                panic(err)
×
782
        }
783
        cont.log.Info("ApicRefreshTimer conf is set to: ", refreshTimeout)
1✔
784

1✔
785
        // Bailout if the refreshTimeout is more than 12Hours or less than 5Mins
1✔
786
        if refreshTimeout > (12*60*60) || refreshTimeout < (5*60) {
1✔
787
                cont.log.Info("ApicRefreshTimer can't be more than 12Hrs or less than 5Mins")
×
UNCOV
788
                panic(err)
×
789
        }
790

791
        // If RefreshTickerAdjustInterval is not defined, default to 150Sec.
792
        if cont.config.ApicRefreshTickerAdjust == "" {
2✔
793
                cont.config.ApicRefreshTickerAdjust = "210"
1✔
794
        }
1✔
795
        refreshTickerAdjust, err := strconv.Atoi(cont.config.ApicRefreshTickerAdjust)
1✔
796
        if err != nil {
1✔
UNCOV
797
                panic(err)
×
798
        }
799

800
        //If ApicSubscriptionDelay is not defined, default to 100ms
801
        if cont.config.ApicSubscriptionDelay == 0 {
2✔
802
                cont.config.ApicSubscriptionDelay = 100
1✔
803
        }
1✔
804
        cont.log.Info("ApicSubscriptionDelay conf is set to: ", cont.config.ApicSubscriptionDelay)
1✔
805

1✔
806
        // If OpflexDeviceDeleteTimeout is not defined, default to 1800s
1✔
807
        if cont.config.OpflexDeviceDeleteTimeout == 0 {
2✔
808
                cont.config.OpflexDeviceDeleteTimeout = 1800
1✔
809
        }
1✔
810

811
        // If OpflexDeviceReconnectWaitTimeout is not defined, default to 25s
812
        if cont.config.OpflexDeviceReconnectWaitTimeout == 0 {
2✔
813
                cont.config.OpflexDeviceReconnectWaitTimeout = 25
1✔
814
        }
1✔
815
        cont.log.Debug("OpflexDeviceReconnectWaitTimeout set to: ", cont.config.OpflexDeviceReconnectWaitTimeout)
1✔
816

1✔
817
        // If SleepTimeSnatGlobalInfoSync is not defined, default to 60
1✔
818
        if cont.config.SleepTimeSnatGlobalInfoSync == 0 {
2✔
819
                cont.config.SleepTimeSnatGlobalInfoSync = 60
1✔
820
        }
1✔
821

822
        // If not defined, default to 32
823
        if cont.config.PodIpPoolChunkSize == 0 {
2✔
824
                cont.config.PodIpPoolChunkSize = 32
1✔
825
        }
1✔
826
        if !cont.isCNOEnabled() {
2✔
827
                cont.log.Info("PodIpPoolChunkSize conf is set to: ", cont.config.PodIpPoolChunkSize)
1✔
828
        }
1✔
829

830
        // If ApicConnectionRetryLimit is not defined, default to 5
831
        if cont.config.ApicConnectionRetryLimit == 0 {
2✔
832
                cont.config.ApicConnectionRetryLimit = 5
1✔
833
        }
1✔
834
        cont.log.Debug("ApicConnectionRetryLimit set to: ", cont.config.ApicConnectionRetryLimit)
1✔
835

1✔
836
        // If not valid, default to 5000-65000
1✔
837
        // other permissible values 1-65000
1✔
838
        defStart := 5000
1✔
839
        defEnd := 65000
1✔
840
        if cont.config.SnatDefaultPortRangeStart == 0 {
2✔
841
                cont.config.SnatDefaultPortRangeStart = defStart
1✔
842
        }
1✔
843
        if cont.config.SnatDefaultPortRangeEnd == 0 {
2✔
844
                cont.config.SnatDefaultPortRangeEnd = defEnd
1✔
845
        }
1✔
846
        if cont.config.SnatDefaultPortRangeStart < 0 || cont.config.SnatDefaultPortRangeEnd < 0 ||
1✔
847
                cont.config.SnatDefaultPortRangeStart > defEnd || cont.config.SnatDefaultPortRangeEnd > defEnd ||
1✔
848
                cont.config.SnatDefaultPortRangeStart > cont.config.SnatDefaultPortRangeEnd {
1✔
849
                cont.config.SnatDefaultPortRangeStart = defStart
×
850
                cont.config.SnatDefaultPortRangeEnd = defEnd
×
UNCOV
851
        }
×
852

853
        // Set default value for pbr programming delay if services list is not empty
854
        // and delay value is empty
855
        if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 &&
1✔
856
                cont.config.ServiceGraphEndpointAddDelay.Services != nil &&
1✔
857
                len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 {
1✔
858
                cont.config.ServiceGraphEndpointAddDelay.Delay = 90
×
UNCOV
859
        }
×
860
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
861
                cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay)
×
UNCOV
862
        }
×
863

864
        // Set contract scope for snat svc graph to global by default
865
        if cont.config.SnatSvcContractScope == "" {
2✔
866
                cont.config.SnatSvcContractScope = "global"
1✔
867
        }
1✔
868
        if cont.config.MaxSvcGraphNodes == 0 {
2✔
869
                cont.config.MaxSvcGraphNodes = 32
1✔
870
        }
1✔
871
        if !cont.isCNOEnabled() {
2✔
872
                cont.log.Info("Max number of nodes per svc graph is set to: ", cont.config.MaxSvcGraphNodes)
1✔
873
        }
1✔
874
        cont.apicConn, err = apicapi.New(cont.log, cont.config.ApicHosts,
1✔
875
                cont.config.ApicUsername, cont.config.ApicPassword,
1✔
876
                privKey, apicCert, cont.config.AciPrefix,
1✔
877
                refreshTimeout, refreshTickerAdjust, cont.config.ApicSubscriptionDelay,
1✔
878
                cont.config.AciVrfTenant, cont.UpdateLLDPIfLocked)
1✔
879
        if err != nil {
1✔
UNCOV
880
                panic(err)
×
881
        }
882

883
        cont.apicConn.FilterOpflexDevice = cont.config.FilterOpflexDevice
1✔
884
        cont.apicConn.Flavor = cont.config.Flavor
1✔
885
        cont.apicConn.VmmDomain = cont.config.AciVmmDomain
1✔
886
        cont.apicConn.ReconnectRetryLimit = cont.config.ApicConnectionRetryLimit
1✔
887
        cont.apicConn.RequestRetryDelay = cont.config.ApicRequestRetryDelay
1✔
888
        cont.apicConn.EnableRequestRetry = cont.config.EnableApicRequestRetry
1✔
889

1✔
890
        if len(cont.config.ApicHosts) != 0 {
1✔
891
        APIC_SWITCH:
×
892
                cont.log.WithFields(logrus.Fields{
×
893
                        "mod":  "APICAPI",
×
894
                        "host": cont.apicConn.Apic[cont.apicConn.ApicIndex],
×
895
                }).Debug("Connecting to APIC to determine the Version")
×
896

×
897
                version, err := cont.apicConn.GetVersion()
×
898
                if err != nil {
×
899
                        cont.log.Error("Could not get APIC version, switching to next APIC")
×
900
                        cont.apicConn.ApicIndex = (cont.apicConn.ApicIndex + 1) % len(cont.apicConn.Apic)
×
901
                        time.Sleep(cont.apicConn.ReconnectInterval)
×
UNCOV
902
                        goto APIC_SWITCH
×
903
                }
904
                cont.apicConn.CachedVersion = version
×
905
                apicapi.ApicVersion = version
×
906
                if version >= "4.2(4i)" {
×
907
                        cont.apicConn.SnatPbrFltrChain = true
×
908
                } else {
×
909
                        cont.apicConn.SnatPbrFltrChain = false
×
910
                }
×
911
                if version >= "5.2" {
×
912
                        cont.vmmClusterFaultSupported = true
×
UNCOV
913
                }
×
914
        } else { // For unit-tests
1✔
915
                cont.apicConn.SnatPbrFltrChain = true
1✔
916
        }
1✔
917

918
        if !cont.isCNOEnabled() {
2✔
919
                cont.log.Debug("SnatPbrFltrChain set to:", cont.apicConn.SnatPbrFltrChain)
1✔
920
                // Make sure Pod/NodeBDs are assoicated to same VRF.
1✔
921
                if len(cont.config.ApicHosts) != 0 && cont.config.AciPodBdDn != "" && cont.config.AciNodeBdDn != "" {
1✔
922
                        var expectedVrfRelations []string
×
923
                        expectedVrfRelations = append(expectedVrfRelations, cont.config.AciPodBdDn, cont.config.AciNodeBdDn)
×
924
                        cont.log.Debug("expectedVrfRelations:", expectedVrfRelations)
×
925
                        err = cont.apicConn.ValidateAciVrfAssociation(cont.config.AciVrfDn, expectedVrfRelations)
×
926
                        if err != nil {
×
927
                                cont.log.Error("Pod/NodeBDs and AciL3Out VRF association is incorrect")
×
UNCOV
928
                                panic(err)
×
929
                        }
930
                }
931
        }
932

933
        if len(cont.config.ApicHosts) != 0 && cont.vmmClusterFaultSupported && !cont.isCNOEnabled() {
1✔
934
                //Clear fault instances when the controller starts
×
935
                cont.clearFaultInstances()
×
936
                //Subscribe for vmmEpPD for a given domain
×
937
                var tnTargetFilterEpg string
×
938
                tnTargetFilterEpg += fmt.Sprintf("uni/vmmp-%s/dom-%s/", cont.vmmDomainProvider(), cont.config.AciVmmDomain)
×
939
                subnetTargetFilterEpg := fmt.Sprintf("and(wcard(vmmEpPD.dn,\"%s\"))", tnTargetFilterEpg)
×
940
                cont.apicConn.AddSubscriptionClass("vmmEpPD",
×
941
                        []string{"vmmEpPD"}, subnetTargetFilterEpg)
×
942
                cont.apicConn.SetSubscriptionHooks("vmmEpPD",
×
943
                        func(obj apicapi.ApicObject) bool {
×
944
                                cont.vmmEpPDChanged(obj)
×
945
                                return true
×
946
                        },
×
947
                        func(dn string) {
×
948
                                cont.vmmEpPDDeleted(dn)
×
UNCOV
949
                        })
×
950
        }
951

952
        cont.initStaticObjs()
1✔
953

1✔
954
        err = cont.env.PrepareRun(stopCh)
1✔
955
        if err != nil {
1✔
UNCOV
956
                panic(err.Error())
×
957
        }
958

959
        cont.apicConn.FullSyncHook = func() {
1✔
960
                // put a channel into each work queue and wait on it to
×
961
                // checkpoint object syncing in response to new subscription
×
962
                // updates
×
963
                cont.log.Debug("Starting checkpoint")
×
964
                var chans []chan struct{}
×
965
                qs := make([]workqueue.RateLimitingInterface, 0)
×
966
                _, ok := cont.env.(*K8sEnvironment)
×
967
                if ok {
×
968
                        qs = []workqueue.RateLimitingInterface{cont.podQueue}
×
969
                        if !cont.isCNOEnabled() {
×
970
                                if !cont.config.DisableHppRendering {
×
971
                                        qs = append(qs, cont.netPolQueue)
×
972
                                }
×
973
                                if cont.config.EnableHppDirect {
×
974
                                        qs = append(qs, cont.remIpContQueue)
×
975
                                }
×
976
                                qs = append(qs, cont.qosQueue, cont.serviceQueue,
×
977
                                        cont.snatQueue, cont.netflowQueue, cont.snatNodeInfoQueue,
×
978
                                        cont.rdConfigQueue, cont.erspanQueue,
×
UNCOV
979
                                        cont.epgDnCacheUpdateQueue)
×
980
                        }
981
                }
982
                for _, q := range qs {
×
983
                        c := make(chan struct{})
×
984
                        chans = append(chans, c)
×
985
                        q.Add(c)
×
986
                }
×
987
                for _, c := range chans {
×
988
                        <-c
×
989
                }
×
UNCOV
990
                cont.log.Debug("Checkpoint complete")
×
991
        }
992

993
        if len(cont.config.ApicHosts) != 0 && !cont.isCNOEnabled() {
1✔
994
                cont.BuildSubnetDnCache(cont.config.AciVrfDn, cont.config.AciVrfDn)
×
995
                cont.scheduleRdConfig()
×
996
                if strings.Contains(cont.config.Flavor, "openstack") {
×
997
                        cont.setOpenStackSystemId()
×
UNCOV
998
                }
×
999
        }
1000

1001
        if !cont.isCNOEnabled() {
2✔
1002
                if cont.config.AciPolicyTenant != cont.config.AciVrfTenant {
2✔
1003
                        cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
1004
                                []string{"hostprotPol"})
1✔
1005
                }
1✔
1006
        } else {
1✔
1007
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
1008
                        []string{"fvBD", "fvAp"})
1✔
1009
                cont.apicConn.AddSubscriptionClass("fvnsVlanInstP",
1✔
1010
                        []string{"fvnsVlanInstP"}, "")
1✔
1011
                cont.apicConn.AddSubscriptionClass("infraRsDomP",
1✔
1012
                        []string{"infraRsDomP"}, "")
1✔
1013
                cont.apicConn.AddSubscriptionClass("physDomP",
1✔
1014
                        []string{"physDomP"}, "")
1✔
1015
                cont.apicConn.AddSubscriptionClass("l3extDomP",
1✔
1016
                        []string{"l3extDomP"}, "")
1✔
1017
                cont.apicConn.AddSubscriptionClass("infraRsVlanNs",
1✔
1018
                        []string{"infraRsVlanNs"}, "")
1✔
1019
                cont.apicConn.AddSubscriptionClass("infraGeneric",
1✔
1020
                        []string{"infraGeneric", "infraRsFuncToEpg"}, "")
1✔
1021
                cont.apicConn.AddSubscriptionClass("l3extOut",
1✔
1022
                        []string{"l3extInstP", "l3extSubnet", "fvRsCons", "fvRsProv", "l3extRsEctx", "l3extRsL3DomAtt", "l3extLNodeP", "l3extRsNodeL3OutAtt", "ipRouteP", "ipNexthopP", "l3extLIfP", "l3extVirtualLIfP", "l3extRsDynPathAtt",
1✔
1023
                                "l3extRsPathL3OutAtt", "l3extMember", "l3extIp", "bgpExtP", "bgpPeerP", "bgpAsP", "bgpLocalAsnP", "bgpRsPeerPfxPol"}, "")
1✔
1024
                cont.apicConn.AddSubscriptionClass("bgpPeerPfxPol",
1✔
1025
                        []string{"bgpPeerPfxPol"}, "")
1✔
1026
        }
1✔
1027

1028
        if cont.config.VmmLite {
1✔
1029
                cont.apicConn.AddSubscriptionClass("infraAttEntityP",
×
1030
                        []string{"infraRsFuncToEpg"}, "")
×
1031

×
1032
                cont.apicConn.SetSubscriptionHooks(
×
1033
                        "infraAttEntityP",
×
1034
                        func(obj apicapi.ApicObject) bool {
×
1035
                                cont.log.Debug("EPG attached to AAEP")
×
1036
                                cont.handleAaepEpgAttach(obj)
×
1037
                                return true
×
1038
                        },
×
1039
                        func(dn string) {
×
1040
                                cont.log.Debug("EPG detached from AAEP")
×
1041
                                cont.handleAaepEpgDetach(dn)
×
UNCOV
1042
                        },
×
1043
                )
1044
        }
1045

1046
        if !cont.isCNOEnabled() {
2✔
1047
                // When a new class is added for subscriptio, check if its name attribute
1✔
1048
                // is in the format aciPrefix-<some value>, if so add it in nameAttrClass
1✔
1049
                // in apicapi.go
1✔
1050
                subscribeMo := []string{"fvBD", "vnsLDevVip", "vnsAbsGraph", "vnsLDevCtx",
1✔
1051
                        "vzFilter", "vzBrCP", "l3extInstP", "vnsSvcRedirectPol",
1✔
1052
                        "vnsRedirectHealthGroup", "fvIPSLAMonitoringPol"}
1✔
1053
                if cont.config.AciPolicyTenant == cont.config.AciVrfTenant {
1✔
1054
                        subscribeMo = append(subscribeMo, "hostprotPol")
×
UNCOV
1055
                }
×
1056
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciVrfTenant,
1✔
1057
                        subscribeMo)
1✔
1058
                cont.apicConn.AddSubscriptionDn(fmt.Sprintf("uni/tn-%s/out-%s",
1✔
1059
                        cont.config.AciVrfTenant, cont.config.AciL3Out),
1✔
1060
                        []string{"fvRsCons"})
1✔
1061
                vmmDn := fmt.Sprintf("comp/prov-%s/ctrlr-[%s]-%s/injcont",
1✔
1062
                        cont.env.VmmPolicy(), cont.config.AciVmmDomain,
1✔
1063
                        cont.config.AciVmmController)
1✔
1064
                // Before subscribing to vmm objects, add vmmInjectedLabel as a child after explicit APIC version check
1✔
1065
                // Since it is not supported for APIC versions < "5.0"
1✔
1066
                cont.addVmmInjectedLabel()
1✔
1067
                cont.apicConn.AddSubscriptionDn(vmmDn,
1✔
1068
                        []string{"vmmInjectedHost", "vmmInjectedNs"})
1✔
1069

1✔
1070
                var tnTargetFilter string
1✔
1071
                if len(cont.config.AciVrfRelatedTenants) > 0 {
1✔
1072
                        for _, tn := range cont.config.AciVrfRelatedTenants {
×
1073
                                tnTargetFilter += fmt.Sprintf("tn-%s/|", tn)
×
UNCOV
1074
                        }
×
1075
                } else {
1✔
1076
                        tnTargetFilter += fmt.Sprintf("tn-%s/|tn-%s/",
1✔
1077
                                cont.config.AciPolicyTenant, cont.config.AciVrfTenant)
1✔
1078
                }
1✔
1079
                subnetTargetFilter := fmt.Sprintf("and(wcard(fvSubnet.dn,\"%s\"))",
1✔
1080
                        tnTargetFilter)
1✔
1081
                cont.apicConn.AddSubscriptionClass("fvSubnet",
1✔
1082
                        []string{"fvSubnet"}, subnetTargetFilter)
1✔
1083

1✔
1084
                cont.apicConn.SetSubscriptionHooks("fvSubnet",
1✔
1085
                        func(obj apicapi.ApicObject) bool {
1✔
1086
                                cont.SubnetChanged(obj, cont.config.AciVrfDn)
×
1087
                                return true
×
1088
                        },
×
1089
                        func(dn string) {
×
1090
                                cont.SubnetDeleted(dn)
×
UNCOV
1091
                        })
×
1092

1093
                cont.apicConn.AddSubscriptionClass("opflexODev",
1✔
1094
                        []string{"opflexODev"}, "")
1✔
1095

1✔
1096
                cont.apicConn.SetSubscriptionHooks("opflexODev",
1✔
1097
                        func(obj apicapi.ApicObject) bool {
1✔
1098
                                cont.opflexDeviceChanged(obj)
×
1099
                                return true
×
1100
                        },
×
1101
                        func(dn string) {
×
1102
                                cont.opflexDeviceDeleted(dn)
×
UNCOV
1103
                        })
×
1104

1105
                if !cont.config.DisableServiceVlanPreprovisioning && !strings.Contains(cont.config.Flavor, "openstack") {
2✔
1106
                        if cont.config.AEP == "" {
2✔
1107
                                cont.log.Error("AEP is missing in configuration, preprovisioning of service vlan will be disabled")
1✔
1108
                        } else {
1✔
1109
                                infraRtAttEntPFilter := fmt.Sprintf("and(wcard(infraRtAttEntP.dn,\"/attentp-%s/\"))", cont.config.AEP)
×
1110
                                cont.apicConn.AddSubscriptionClass("infraRtAttEntP",
×
1111
                                        []string{"infraRtAttEntP"}, infraRtAttEntPFilter)
×
1112

×
1113
                                // For bare metal, the infraRtAttEntP associated with an AEP will be empty.
×
1114
                                // We should not receive any updates for such cases.
×
1115
                                cont.apicConn.SetSubscriptionHooks("infraRtAttEntP",
×
1116
                                        func(obj apicapi.ApicObject) bool {
×
1117
                                                cont.infraRtAttEntPChanged(obj)
×
1118
                                                return true
×
1119
                                        },
×
1120
                                        func(dn string) {
×
1121
                                                cont.infraRtAttEntPDeleted(dn)
×
UNCOV
1122
                                        })
×
1123

1124
                                cont.apicConn.AddSubscriptionClass("vpcIf",
×
1125
                                        []string{"vpcIf"}, "")
×
1126

×
1127
                                cont.apicConn.SetSubscriptionHooks("vpcIf",
×
1128
                                        func(obj apicapi.ApicObject) bool {
×
1129
                                                cont.vpcIfChanged(obj)
×
1130
                                                return true
×
1131
                                        },
×
1132
                                        func(dn string) {
×
1133
                                                cont.vpcIfDeleted(dn)
×
UNCOV
1134
                                        })
×
1135
                        }
1136
                }
1137

1138
                cont.apicConn.VersionUpdateHook =
1✔
1139
                        func() {
1✔
1140
                                cont.initStaticServiceObjs()
×
UNCOV
1141
                        }
×
1142
        }
1143
        go cont.apicConn.Run(stopCh)
1✔
1144
}
1145

1146
func (cont *AciController) syncNodeAciPods(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1147
        cont.log.Debug("Go routine to periodically check the aci pod information in the opflexOdev of a node")
1✔
1148
        ticker := time.NewTicker(seconds * time.Second)
1✔
1149
        defer ticker.Stop()
1✔
1150

1✔
1151
        for {
2✔
1152
                select {
1✔
1153
                case <-ticker.C:
1✔
1154
                        if cont.config.EnableOpflexAgentReconnect {
1✔
1155
                                cont.checkChangeOfOpflexOdevAciPod()
×
UNCOV
1156
                        }
×
1157
                        if cont.config.AciMultipod {
1✔
1158
                                cont.checkChangeOfOdevAciPod()
×
UNCOV
1159
                        }
×
1160
                case <-stopCh:
1✔
1161
                        return
1✔
1162
                }
1163
        }
1164
}
1165

1166
func (cont *AciController) syncOpflexDevices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1167
        cont.log.Debug("Go routine to periodically delete old opflexdevices started")
1✔
1168
        ticker := time.NewTicker(seconds * time.Second)
1✔
1169
        defer ticker.Stop()
1✔
1170

1✔
1171
        for {
2✔
1172
                select {
1✔
1173
                case <-ticker.C:
1✔
1174
                        cont.deleteOldOpflexDevices()
1✔
1175
                case <-stopCh:
1✔
1176
                        return
1✔
1177
                }
1178
        }
1179
}
1180

1181
func (cont *AciController) syncDelayedEpSlices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1182
        cont.log.Debug("Go routine to periodically check and process the epslices having delay adding in service")
1✔
1183
        ticker := time.NewTicker(seconds * time.Second)
1✔
1184
        defer ticker.Stop()
1✔
1185

1✔
1186
        for {
2✔
1187
                select {
1✔
1188
                case <-ticker.C:
1✔
1189
                        cont.processDelayedEpSlices()
1✔
1190
                case <-stopCh:
1✔
1191
                        return
1✔
1192
                }
1193
        }
1194
}
1195

1196
func (cont *AciController) snatGlobalInfoSync(stopCh <-chan struct{}, seconds int) {
1✔
1197
        time.Sleep(time.Duration(seconds) * time.Second)
1✔
1198
        cont.log.Debug("Go routine to periodically sync globalinfo and nodeinfo started")
1✔
1199
        iteration := 0
1✔
1200
        for {
2✔
1201
                // To avoid noisy logs, only printing once in 5 minutes
1✔
1202
                if iteration%5 == 0 {
2✔
1203
                        cont.log.Debug("Syncing GlobalInfo with Node infos")
1✔
1204
                }
1✔
1205
                var nodeInfos []*nodeinfo.NodeInfo
1✔
1206
                cont.indexMutex.Lock()
1✔
1207
                cache.ListAll(cont.snatNodeInfoIndexer, labels.Everything(),
1✔
1208
                        func(nodeInfoObj interface{}) {
2✔
1209
                                nodeInfo := nodeInfoObj.(*nodeinfo.NodeInfo)
1✔
1210
                                nodeInfos = append(nodeInfos, nodeInfo)
1✔
1211
                        })
1✔
1212
                expectedmap := make(map[string]map[string]bool)
1✔
1213
                for _, glinfo := range cont.snatGlobalInfoCache {
2✔
1214
                        for nodename, entry := range glinfo {
2✔
1215
                                if _, found := expectedmap[nodename]; !found {
2✔
1216
                                        newentry := make(map[string]bool)
1✔
1217
                                        newentry[entry.SnatPolicyName] = true
1✔
1218
                                        expectedmap[nodename] = newentry
1✔
1219
                                } else {
2✔
1220
                                        currententry := expectedmap[nodename]
1✔
1221
                                        currententry[entry.SnatPolicyName] = true
1✔
1222
                                        expectedmap[nodename] = currententry
1✔
1223
                                }
1✔
1224
                        }
1225
                }
1226
                cont.indexMutex.Unlock()
1✔
1227

1✔
1228
                for _, value := range nodeInfos {
2✔
1229
                        marked := false
1✔
1230
                        policyNames := value.Spec.SnatPolicyNames
1✔
1231
                        nodeName := value.ObjectMeta.Name
1✔
1232
                        _, ok := expectedmap[nodeName]
1✔
1233
                        if !ok && len(policyNames) > 0 {
2✔
1234
                                cont.log.Info("Adding missing entry in snatglobalinfo for node: ", nodeName)
1✔
1235
                                cont.log.Debug("No snat policies found in snatglobalinfo")
1✔
1236
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1237
                                marked = true
1✔
1238
                        } else if len(policyNames) != len(expectedmap[nodeName]) {
3✔
1239
                                cont.log.Info("Adding missing snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1240
                                cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1241
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1242
                                marked = true
1✔
1243
                        } else {
2✔
1244
                                if len(policyNames) == 0 && len(expectedmap[nodeName]) == 0 {
1✔
1245
                                        // No snatpolicies present
×
UNCOV
1246
                                        continue
×
1247
                                }
1248
                                eq := reflect.DeepEqual(expectedmap[nodeName], policyNames)
1✔
1249
                                if !eq {
2✔
1250
                                        cont.log.Debug("Syncing inconsistent snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1251
                                        cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1252
                                        cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1253
                                        marked = true
1✔
1254
                                }
1✔
1255
                        }
1256
                        if marked {
2✔
1257
                                cont.log.Info("Nodeinfo and globalinfo out of sync for node: ", nodeName)
1✔
1258
                                nodeinfokey, err := cache.MetaNamespaceKeyFunc(value)
1✔
1259
                                if err != nil {
1✔
1260
                                        cont.log.Error("Not able to get key for node: ", nodeName)
×
UNCOV
1261
                                        continue
×
1262
                                }
1263
                                cont.log.Info("Queuing nodeinfokey for globalinfo sync: ", nodeinfokey)
1✔
1264
                                cont.queueNodeInfoUpdateByKey(nodeinfokey)
1✔
1265
                        } else if iteration%5 == 0 {
2✔
1266
                                cont.log.Info("Nodeinfo and globalinfo in sync for node: ", nodeName)
1✔
1267
                        }
1✔
1268
                }
1269
                time.Sleep(time.Duration(seconds) * time.Second)
1✔
1270
                iteration++
1✔
1271
        }
1272
}
1273

1274
func (cont *AciController) processSyncQueue(queue workqueue.RateLimitingInterface,
1275
        queueStop <-chan struct{}) {
1✔
1276
        go wait.Until(func() {
2✔
1277
                for {
2✔
1278
                        syncType, quit := queue.Get()
1✔
1279
                        if quit {
2✔
1280
                                break
1✔
1281
                        }
1282
                        var requeue bool
1✔
1283
                        if sType, ok := syncType.(string); ok {
2✔
1284
                                if f, ok := cont.syncProcessors[sType]; ok {
2✔
1285
                                        requeue = f()
1✔
1286
                                }
1✔
1287
                        }
1288
                        if requeue {
1✔
UNCOV
1289
                                queue.AddRateLimited(syncType)
×
1290
                        } else {
1✔
1291
                                queue.Forget(syncType)
1✔
1292
                        }
1✔
1293
                        queue.Done(syncType)
1✔
1294
                }
1295
        }, time.Second, queueStop)
1296
        <-queueStop
1✔
1297
        queue.ShutDown()
1✔
1298
}
1299

1300
func (cont *AciController) scheduleSyncGlobalInfo() {
1✔
1301
        cont.syncQueue.AddRateLimited("snatGlobalInfo")
1✔
1302
}
1✔
1303
func (cont *AciController) scheduleRdConfig() {
×
1304
        cont.syncQueue.AddRateLimited("rdConfig")
×
1305
}
×
1306
func (cont *AciController) scheduleCreateIstioCR() {
×
1307
        cont.syncQueue.AddRateLimited("istioCR")
×
UNCOV
1308
}
×
1309

1310
func (cont *AciController) addVmmInjectedLabel() {
1✔
1311
        if apicapi.ApicVersion >= "5.2" {
1✔
1312
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedLabel")
×
1313
                if err != nil {
×
UNCOV
1314
                        panic(err.Error())
×
1315
                }
1316
                err = apicapi.AddMetaDataChild("vmmInjectedSvc", "vmmInjectedLabel")
×
1317
                if err != nil {
×
UNCOV
1318
                        panic(err.Error())
×
1319
                }
1320
        }
1321
        if apicapi.ApicVersion >= "5.0" {
2✔
1322
                err := apicapi.AddMetaDataChild("vmmInjectedReplSet", "vmmInjectedLabel")
1✔
1323
                if err != nil {
1✔
UNCOV
1324
                        panic(err.Error())
×
1325
                }
1326
                err = apicapi.AddMetaDataChild("vmmInjectedContGrp", "vmmInjectedLabel")
1✔
1327
                if err != nil {
1✔
UNCOV
1328
                        panic(err.Error())
×
1329
                }
1330
                err = apicapi.AddMetaDataChild("vmmInjectedDepl", "vmmInjectedLabel")
1✔
1331
                if err != nil {
1✔
UNCOV
1332
                        panic(err.Error())
×
1333
                }
1334
        }
1335
}
1336

1337
func (cont *AciController) isCNOEnabled() bool {
1✔
1338
        return cont.config.ChainedMode || cont.config.VmmLite
1✔
1339
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc