• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 10668

22 May 2025 08:30PM UTC coverage: 68.664% (-0.3%) from 68.948%
10668

push

travis-pro

web-flow
Merge pull request #1529 from noironetworks/subscribe-vpcif-backport

Subscribe vpcIf for service VLAN pre-provisioning

2 of 115 new or added lines in 2 files covered. (1.74%)

10 existing lines in 3 files now uncovered.

13316 of 19393 relevant lines covered (68.66%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.88
/pkg/controller/controller.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "encoding/json"
19
        "fmt"
20
        "net"
21
        "os"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/sirupsen/logrus"
29
        "github.com/yl2chen/cidranger"
30
        "golang.org/x/time/rate"
31

32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        v1net "k8s.io/api/networking/v1"
35
        "k8s.io/apimachinery/pkg/labels"
36
        "k8s.io/apimachinery/pkg/util/wait"
37
        "k8s.io/client-go/kubernetes"
38
        "k8s.io/client-go/tools/cache"
39
        "k8s.io/client-go/util/workqueue"
40

41
        "github.com/noironetworks/aci-containers/pkg/apicapi"
42
        fabattv1 "github.com/noironetworks/aci-containers/pkg/fabricattachment/apis/aci.fabricattachment/v1"
43
        fabattclset "github.com/noironetworks/aci-containers/pkg/fabricattachment/clientset/versioned"
44
        hppv1 "github.com/noironetworks/aci-containers/pkg/hpp/apis/aci.hpp/v1"
45
        "github.com/noironetworks/aci-containers/pkg/index"
46
        "github.com/noironetworks/aci-containers/pkg/ipam"
47
        istiov1 "github.com/noironetworks/aci-containers/pkg/istiocrd/apis/aci.istio/v1"
48
        "github.com/noironetworks/aci-containers/pkg/metadata"
49
        nodeinfo "github.com/noironetworks/aci-containers/pkg/nodeinfo/apis/aci.snat/v1"
50
        rdConfig "github.com/noironetworks/aci-containers/pkg/rdconfig/apis/aci.snat/v1"
51
        snatglobalinfo "github.com/noironetworks/aci-containers/pkg/snatglobalinfo/apis/aci.snat/v1"
52
        "github.com/noironetworks/aci-containers/pkg/util"
53
)
54

55
type podUpdateFunc func(*v1.Pod) (*v1.Pod, error)
56
type nodeUpdateFunc func(*v1.Node) (*v1.Node, error)
57
type serviceUpdateFunc func(*v1.Service) (*v1.Service, error)
58
type listNetworkPoliciesFunc func(string) (*v1net.NetworkPolicyList, error)
59
type listNamespacesFunc func() (*v1.NamespaceList, error)
60

61
type AciController struct {
62
        log    *logrus.Logger
63
        config *ControllerConfig
64
        env    Environment
65

66
        defaultEg string
67
        defaultSg string
68

69
        unitTestMode bool
70

71
        podQueue              workqueue.RateLimitingInterface
72
        netPolQueue           workqueue.RateLimitingInterface
73
        qosQueue              workqueue.RateLimitingInterface
74
        serviceQueue          workqueue.RateLimitingInterface
75
        snatQueue             workqueue.RateLimitingInterface
76
        netflowQueue          workqueue.RateLimitingInterface
77
        erspanQueue           workqueue.RateLimitingInterface
78
        snatNodeInfoQueue     workqueue.RateLimitingInterface
79
        rdConfigQueue         workqueue.RateLimitingInterface
80
        istioQueue            workqueue.RateLimitingInterface
81
        nodeFabNetAttQueue    workqueue.RateLimitingInterface
82
        netFabConfigQueue     workqueue.RateLimitingInterface
83
        nadVlanMapQueue       workqueue.RateLimitingInterface
84
        fabricVlanPoolQueue   workqueue.RateLimitingInterface
85
        netFabL3ConfigQueue   workqueue.RateLimitingInterface
86
        remIpContQueue        workqueue.RateLimitingInterface
87
        epgDnCacheUpdateQueue workqueue.RateLimitingInterface
88

89
        namespaceIndexer                     cache.Indexer
90
        namespaceInformer                    cache.Controller
91
        podIndexer                           cache.Indexer
92
        podInformer                          cache.Controller
93
        endpointsIndexer                     cache.Indexer
94
        endpointsInformer                    cache.Controller
95
        serviceIndexer                       cache.Indexer
96
        serviceInformer                      cache.Controller
97
        replicaSetIndexer                    cache.Indexer
98
        replicaSetInformer                   cache.Controller
99
        deploymentIndexer                    cache.Indexer
100
        deploymentInformer                   cache.Controller
101
        nodeIndexer                          cache.Indexer
102
        nodeInformer                         cache.Controller
103
        networkPolicyIndexer                 cache.Indexer
104
        networkPolicyInformer                cache.Controller
105
        snatIndexer                          cache.Indexer
106
        snatInformer                         cache.Controller
107
        snatNodeInfoIndexer                  cache.Indexer
108
        snatNodeInformer                     cache.Controller
109
        snatLocalInfoInformer                cache.Controller
110
        crdInformer                          cache.Controller
111
        rdConfigInformer                     cache.Controller
112
        rdConfigIndexer                      cache.Indexer
113
        qosIndexer                           cache.Indexer
114
        qosInformer                          cache.Controller
115
        netflowIndexer                       cache.Indexer
116
        netflowInformer                      cache.Controller
117
        erspanIndexer                        cache.Indexer
118
        erspanInformer                       cache.Controller
119
        nodePodIfIndexer                     cache.Indexer
120
        nodePodIfInformer                    cache.Controller
121
        istioIndexer                         cache.Indexer
122
        istioInformer                        cache.Controller
123
        endpointSliceIndexer                 cache.Indexer
124
        endpointSliceInformer                cache.Controller
125
        snatCfgInformer                      cache.Controller
126
        updatePod                            podUpdateFunc
127
        updateNode                           nodeUpdateFunc
128
        updateServiceStatus                  serviceUpdateFunc
129
        listNetworkPolicies                  listNetworkPoliciesFunc
130
        listNamespaces                       listNamespacesFunc
131
        nodeFabNetAttInformer                cache.SharedIndexInformer
132
        netFabConfigInformer                 cache.SharedIndexInformer
133
        nadVlanMapInformer                   cache.SharedIndexInformer
134
        fabricVlanPoolInformer               cache.SharedIndexInformer
135
        networkFabricL3ConfigurationInformer cache.SharedIndexInformer
136
        fabNetAttClient                      *fabattclset.Clientset
137
        proactiveConfInformer                cache.SharedIndexInformer
138

139
        indexMutex sync.Mutex
140
        hppMutex   sync.Mutex
141

142
        configuredPodNetworkIps *netIps
143
        podNetworkIps           *netIps
144
        serviceIps              *ipam.IpCache
145
        staticServiceIps        *netIps
146
        nodeServiceIps          *netIps
147

148
        // index of pods matched by deployments
149
        depPods *index.PodSelectorIndex
150
        // index of pods matched by network policies
151
        netPolPods *index.PodSelectorIndex
152
        // index of pods matched by network policy ingress rules
153
        netPolIngressPods *index.PodSelectorIndex
154
        // index of pods matched by network policy egress rules
155
        netPolEgressPods *index.PodSelectorIndex
156
        // index of IP addresses contained in endpoints objects
157
        endpointsIpIndex cidranger.Ranger
158
        // index of service target ports
159
        targetPortIndex map[string]*portIndexEntry
160
        // index of ip blocks referenced by network policy egress rules
161
        netPolSubnetIndex cidranger.Ranger
162
        // index of pods matched by erspan policies
163
        erspanPolPods *index.PodSelectorIndex
164

165
        apicConn *apicapi.ApicConnection
166

167
        nodeServiceMetaCache map[string]*nodeServiceMeta
168
        nodeACIPod           map[string]aciPodAnnot
169
        nodeACIPodAnnot      map[string]aciPodAnnot
170
        nodeOpflexDevice     map[string]apicapi.ApicSlice
171
        nodePodNetCache      map[string]*nodePodNetMeta
172
        serviceMetaCache     map[string]*serviceMeta
173
        snatPolicyCache      map[string]*ContSnatPolicy
174
        delayedEpSlices      []*DelayedEpSlice
175
        snatServices         map[string]bool
176
        snatNodeInfoCache    map[string]*nodeinfo.NodeInfo
177
        rdConfigCache        map[string]*rdConfig.RdConfig
178
        rdConfigSubnetCache  map[string]*rdConfig.RdConfigSpec
179
        istioCache           map[string]*istiov1.AciIstioOperator
180
        podIftoEp            map[string]*EndPointData
181
        // Node Name and Policy Name
182
        snatGlobalInfoCache map[string]map[string]*snatglobalinfo.GlobalInfo
183
        nodeSyncEnabled     bool
184
        serviceSyncEnabled  bool
185
        snatSyncEnabled     bool
186
        syncQueue           workqueue.RateLimitingInterface
187
        syncProcessors      map[string]func() bool
188
        serviceEndPoints    ServiceEndPointType
189
        crdHandlers         map[string]func(*AciController, <-chan struct{})
190
        stopCh              <-chan struct{}
191
        //index of containerportname to ctrPortNameEntry
192
        ctrPortNameCache map[string]*ctrPortNameEntry
193
        // named networkPolicies
194
        nmPortNp map[string]bool
195
        //maps network policy hash to hpp
196
        hppRef map[string]hppReference
197
        //map for ns to remoteIpConts
198
        nsRemoteIpCont map[string]remoteIpConts
199
        // cache to look for Epg DNs which are bound to Vmm domain
200
        cachedEpgDns             []string
201
        vmmClusterFaultSupported bool
202
        additionalNetworkCache   map[string]*AdditionalNetworkMeta
203
        //Used in Shared mode
204
        sharedEncapCache       map[int]*sharedEncapData
205
        sharedEncapAepCache    map[string]map[int]bool
206
        sharedEncapSviCache    map[int]*NfL3Data
207
        sharedEncapVrfCache    map[string]*NfVrfData
208
        sharedEncapTenantCache map[string]*NfTenantData
209
        nfl3configGenerationId int64
210
        // vlan to propertiesList
211
        sharedEncapNfcCache         map[int]*NfcData
212
        sharedEncapNfcVlanMap       map[int]*NfcData
213
        sharedEncapNfcLabelMap      map[string]*NfcData
214
        sharedEncapNfcAppProfileMap map[string]bool
215
        // nadVlanMap encapLabel to vlan
216
        sharedEncapLabelMap      map[string][]int
217
        lldpIfCache              map[string]*NfLLDPIfData
218
        globalVlanConfig         globalVlanConfig
219
        fabricVlanPoolMap        map[string]map[string]string
220
        openStackFabricPathDnMap map[string]openstackOpflexOdevInfo
221
        hostFabricPathDnMap      map[string]hostFabricInfo
222
        openStackSystemId        string
223
}
224

225
type hostFabricInfo struct {
226
        fabricPathDn string
227
        host         string
228
        vpcIfDn      map[string]struct{}
229
}
230

231
type NfLLDPIfData struct {
232
        LLDPIf string
233
        // As of now, manage at the NAD level
234
        // more granular introduces intf tracking complexities
235
        // for not sufficient benefits
236
        Refs map[string]bool
237
}
238

239
type NfL3OutData struct {
240
        // +kubebuilder:validation:Enum:"import"
241
        RtCtrl     string
242
        PodId      int
243
        RtrNodeMap map[int]*fabattv1.FabricL3OutRtrNode
244
        ExtEpgMap  map[string]*fabattv1.PolicyPrefixGroup
245
        SviMap     map[int]bool
246
}
247

248
type NfTenantData struct {
249
        CommonTenant     bool
250
        L3OutConfig      map[string]*NfL3OutData
251
        BGPPeerPfxConfig map[string]*fabattv1.BGPPeerPrefixPolicy
252
}
253

254
type NfVrfData struct {
255
        TenantConfig map[string]*NfTenantData
256
}
257

258
type NfL3Networks struct {
259
        fabattv1.PrimaryNetwork
260
        Subnets map[string]*fabattv1.FabricL3Subnet
261
}
262

263
type NfL3Data struct {
264
        Tenant      string
265
        Vrf         fabattv1.VRF
266
        PodId       int
267
        ConnectedNw *NfL3Networks
268
        NetAddr     map[string]*RoutedNetworkData
269
        Nodes       map[int]fabattv1.FabricL3OutNode
270
}
271

272
// maps pod name to remoteIpCont
273
type remoteIpConts map[string]remoteIpCont
274

275
// remoteIpCont maps ip to pod labels
276
type remoteIpCont map[string]map[string]string
277

278
type NfcData struct {
279
        Aeps map[string]bool
280
        Epg  fabattv1.Epg
281
}
282

283
type sharedEncapData struct {
284
        //node to NAD to pods
285
        Pods   map[string]map[string][]string
286
        NetRef map[string]*AdditionalNetworkMeta
287
        Aeps   map[string]bool
288
}
289

290
type globalVlanConfig struct {
291
        SharedPhysDom apicapi.ApicObject
292
        SharedL3Dom   apicapi.ApicObject
293
}
294

295
type hppReference struct {
296
        RefCount uint              `json:"ref-count,omitempty"`
297
        Npkeys   []string          `json:"npkeys,omitempty"`
298
        HppObj   apicapi.ApicSlice `json:"hpp-obj,omitempty"`
299
        HppCr    hppv1.HostprotPol `json:"hpp-cr,omitempty"`
300
}
301

302
type DelayedEpSlice struct {
303
        ServiceKey  string
304
        OldEpSlice  *discovery.EndpointSlice
305
        NewEpSlice  *discovery.EndpointSlice
306
        DelayedTime time.Time
307
}
308

309
type aciPodAnnot struct {
310
        aciPod             string
311
        isSingleOpflexOdev bool
312
        disconnectTime     time.Time
313
        connectTime        time.Time
314
        lastErrorTime      time.Time
315
}
316

317
type nodeServiceMeta struct {
318
        serviceEp metadata.ServiceEndpoint
319
}
320

321
type nodePodNetMeta struct {
322
        nodePods            map[string]bool
323
        podNetIps           metadata.NetIps
324
        podNetIpsAnnotation string
325
}
326

327
type openstackOpflexOdevInfo struct {
328
        opflexODevDn map[string]struct{}
329
        fabricPathDn string
330
}
331

332
type serviceMeta struct {
333
        requestedIps     []net.IP
334
        ingressIps       []net.IP
335
        staticIngressIps []net.IP
336
}
337

338
type ipIndexEntry struct {
339
        ipNet net.IPNet
340
        keys  map[string]bool
341
}
342

343
type targetPort struct {
344
        proto v1.Protocol
345
        ports []int
346
}
347

348
type portIndexEntry struct {
349
        port              targetPort
350
        serviceKeys       map[string]bool
351
        networkPolicyKeys map[string]bool
352
}
353

354
type portRangeSnat struct {
355
        start int
356
        end   int
357
}
358

359
// EndPointData holds PodIF data in controller.
360
type EndPointData struct {
361
        MacAddr    string
362
        EPG        string
363
        Namespace  string
364
        AppProfile string
365
}
366

367
type ctrPortNameEntry struct {
368
        // Proto+port->pods
369
        ctrNmpToPods map[string]map[string]bool
370
}
371

372
type LinkData struct {
373
        Link []string
374
        Pods []string
375
}
376

377
type RoutedNodeData struct {
378
        addr string
379
        idx  int
380
}
381

382
type RoutedNetworkData struct {
383
        subnet       string
384
        netAddress   string
385
        maskLen      int
386
        numAllocated int
387
        maxAddresses int
388
        baseAddress  net.IP
389
        nodeMap      map[string]RoutedNodeData
390
        availableMap map[int]bool
391
}
392

393
type AdditionalNetworkMeta struct {
394
        NetworkName string
395
        EncapVlan   string
396
        //node+localiface->fabricLinks
397
        FabricLink map[string]map[string]LinkData
398
        NodeCache  map[string]*fabattv1.NodeFabricNetworkAttachment
399
        Mode       util.EncapMode
400
}
401

402
type ServiceEndPointType interface {
403
        InitClientInformer(kubeClient *kubernetes.Clientset)
404
        Run(stopCh <-chan struct{})
405
        Wait(stopCh <-chan struct{})
406
        UpdateServicesForNode(nodename string)
407
        GetnodesMetadata(key string, service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint)
408
        SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool
409
        SetNpServiceAugmentForService(servicekey string, service *v1.Service, prs *portRemoteSubnet,
410
                portAugments map[string]*portServiceAugment, subnetIndex cidranger.Ranger, logger *logrus.Entry)
411
}
412

413
type serviceEndpoint struct {
414
        cont *AciController
415
}
416
type serviceEndpointSlice struct {
417
        cont *AciController
418
}
419

420
func (sep *serviceEndpoint) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
421
        sep.cont.initEndpointsInformerFromClient(kubeClient)
×
422
}
×
423

424
func (seps *serviceEndpointSlice) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
425
        seps.cont.initEndpointSliceInformerFromClient(kubeClient)
×
426
}
×
427

428
func (sep *serviceEndpoint) Run(stopCh <-chan struct{}) {
1✔
429
        go sep.cont.endpointsInformer.Run(stopCh)
1✔
430
}
1✔
431

432
func (seps *serviceEndpointSlice) Run(stopCh <-chan struct{}) {
1✔
433
        go seps.cont.endpointSliceInformer.Run(stopCh)
1✔
434
}
1✔
435

436
func (sep *serviceEndpoint) Wait(stopCh <-chan struct{}) {
1✔
437
        cache.WaitForCacheSync(stopCh,
1✔
438
                sep.cont.endpointsInformer.HasSynced,
1✔
439
                sep.cont.serviceInformer.HasSynced)
1✔
440
}
1✔
441

442
func (seps *serviceEndpointSlice) Wait(stopCh <-chan struct{}) {
1✔
443
        seps.cont.log.Debug("Waiting for EndPointSlicecache sync")
1✔
444
        cache.WaitForCacheSync(stopCh,
1✔
445
                seps.cont.endpointSliceInformer.HasSynced,
1✔
446
                seps.cont.serviceInformer.HasSynced)
1✔
447
}
1✔
448

449
func (e *ipIndexEntry) Network() net.IPNet {
1✔
450
        return e.ipNet
1✔
451
}
1✔
452

453
func newNodePodNetMeta() *nodePodNetMeta {
1✔
454
        return &nodePodNetMeta{
1✔
455
                nodePods: make(map[string]bool),
1✔
456
        }
1✔
457
}
1✔
458

459
func createQueue(name string) workqueue.RateLimitingInterface {
1✔
460
        return workqueue.NewNamedRateLimitingQueue(
1✔
461
                workqueue.NewMaxOfRateLimiter(
1✔
462
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
463
                                10*time.Second),
1✔
464
                        &workqueue.BucketRateLimiter{
1✔
465
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
466
                        },
1✔
467
                ),
1✔
468
                "delta")
1✔
469
}
1✔
470

471
func NewController(config *ControllerConfig, env Environment, log *logrus.Logger, unittestmode bool) *AciController {
1✔
472
        cont := &AciController{
1✔
473
                log:          log,
1✔
474
                config:       config,
1✔
475
                env:          env,
1✔
476
                defaultEg:    "",
1✔
477
                defaultSg:    "",
1✔
478
                unitTestMode: unittestmode,
1✔
479

1✔
480
                podQueue:              createQueue("pod"),
1✔
481
                netPolQueue:           createQueue("networkPolicy"),
1✔
482
                qosQueue:              createQueue("qos"),
1✔
483
                netflowQueue:          createQueue("netflow"),
1✔
484
                erspanQueue:           createQueue("erspan"),
1✔
485
                serviceQueue:          createQueue("service"),
1✔
486
                snatQueue:             createQueue("snat"),
1✔
487
                snatNodeInfoQueue:     createQueue("snatnodeinfo"),
1✔
488
                rdConfigQueue:         createQueue("rdconfig"),
1✔
489
                istioQueue:            createQueue("istio"),
1✔
490
                nodeFabNetAttQueue:    createQueue("nodefabricnetworkattachment"),
1✔
491
                netFabConfigQueue:     createQueue("networkfabricconfiguration"),
1✔
492
                nadVlanMapQueue:       createQueue("nadvlanmap"),
1✔
493
                fabricVlanPoolQueue:   createQueue("fabricvlanpool"),
1✔
494
                netFabL3ConfigQueue:   createQueue("networkfabricl3configuration"),
1✔
495
                remIpContQueue:        createQueue("remoteIpContainer"),
1✔
496
                epgDnCacheUpdateQueue: createQueue("epgDnCache"),
1✔
497
                syncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
498
                        &workqueue.BucketRateLimiter{
1✔
499
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
500
                        }, "sync"),
1✔
501

1✔
502
                configuredPodNetworkIps: newNetIps(),
1✔
503
                podNetworkIps:           newNetIps(),
1✔
504
                serviceIps:              ipam.NewIpCache(),
1✔
505
                staticServiceIps:        newNetIps(),
1✔
506
                nodeServiceIps:          newNetIps(),
1✔
507

1✔
508
                nodeACIPod:       make(map[string]aciPodAnnot),
1✔
509
                nodeACIPodAnnot:  make(map[string]aciPodAnnot),
1✔
510
                nodeOpflexDevice: make(map[string]apicapi.ApicSlice),
1✔
511

1✔
512
                nodeServiceMetaCache:        make(map[string]*nodeServiceMeta),
1✔
513
                nodePodNetCache:             make(map[string]*nodePodNetMeta),
1✔
514
                serviceMetaCache:            make(map[string]*serviceMeta),
1✔
515
                snatPolicyCache:             make(map[string]*ContSnatPolicy),
1✔
516
                snatServices:                make(map[string]bool),
1✔
517
                snatNodeInfoCache:           make(map[string]*nodeinfo.NodeInfo),
1✔
518
                rdConfigCache:               make(map[string]*rdConfig.RdConfig),
1✔
519
                rdConfigSubnetCache:         make(map[string]*rdConfig.RdConfigSpec),
1✔
520
                podIftoEp:                   make(map[string]*EndPointData),
1✔
521
                snatGlobalInfoCache:         make(map[string]map[string]*snatglobalinfo.GlobalInfo),
1✔
522
                istioCache:                  make(map[string]*istiov1.AciIstioOperator),
1✔
523
                crdHandlers:                 make(map[string]func(*AciController, <-chan struct{})),
1✔
524
                ctrPortNameCache:            make(map[string]*ctrPortNameEntry),
1✔
525
                nmPortNp:                    make(map[string]bool),
1✔
526
                hppRef:                      make(map[string]hppReference),
1✔
527
                additionalNetworkCache:      make(map[string]*AdditionalNetworkMeta),
1✔
528
                sharedEncapCache:            make(map[int]*sharedEncapData),
1✔
529
                sharedEncapAepCache:         make(map[string]map[int]bool),
1✔
530
                sharedEncapSviCache:         make(map[int]*NfL3Data),
1✔
531
                sharedEncapVrfCache:         make(map[string]*NfVrfData),
1✔
532
                sharedEncapTenantCache:      make(map[string]*NfTenantData),
1✔
533
                sharedEncapNfcCache:         make(map[int]*NfcData),
1✔
534
                sharedEncapNfcVlanMap:       make(map[int]*NfcData),
1✔
535
                sharedEncapNfcLabelMap:      make(map[string]*NfcData),
1✔
536
                sharedEncapNfcAppProfileMap: make(map[string]bool),
1✔
537
                sharedEncapLabelMap:         make(map[string][]int),
1✔
538
                lldpIfCache:                 make(map[string]*NfLLDPIfData),
1✔
539
                fabricVlanPoolMap:           make(map[string]map[string]string),
1✔
540
                openStackFabricPathDnMap:    make(map[string]openstackOpflexOdevInfo),
1✔
541
                hostFabricPathDnMap:         make(map[string]hostFabricInfo),
1✔
542
                nsRemoteIpCont:              make(map[string]remoteIpConts),
1✔
543
        }
1✔
544
        cont.syncProcessors = map[string]func() bool{
1✔
545
                "snatGlobalInfo": cont.syncSnatGlobalInfo,
1✔
546
                "rdConfig":       cont.syncRdConfig,
1✔
547
                /* Commenting code to remove dependency from istio.io/istio package.
1✔
548
                   Vulnerabilties were detected by quay.io security scan of aci-containers-controller
1✔
549
                   and aci-containers-operator images for istio.io/istio package
1✔
550

1✔
551
                "istioCR":        cont.createIstioCR,
1✔
552
                */
1✔
553
        }
1✔
554
        return cont
1✔
555
}
1✔
556

557
func (cont *AciController) Init() {
×
558
        if cont.config.ChainedMode {
×
559
                cont.log.Info("In chained mode")
×
560
        }
×
561

562
        egdata, err := json.Marshal(cont.config.DefaultEg)
×
563
        if err != nil {
×
564
                cont.log.Error("Could not serialize default endpoint group")
×
565
                panic(err.Error())
×
566
        }
567
        cont.defaultEg = string(egdata)
×
568

×
569
        sgdata, err := json.Marshal(cont.config.DefaultSg)
×
570
        if err != nil {
×
571
                cont.log.Error("Could not serialize default security groups")
×
572
                panic(err.Error())
×
573
        }
574
        cont.defaultSg = string(sgdata)
×
575

×
576
        cont.log.Debug("Initializing IPAM")
×
577
        cont.initIpam()
×
578
        // check if the cluster supports endpoint slices
×
579
        // if cluster doesn't have the support fallback to endpoints
×
580
        kubeClient := cont.env.(*K8sEnvironment).kubeClient
×
581
        if util.IsEndPointSlicesSupported(kubeClient) {
×
582
                cont.serviceEndPoints = &serviceEndpointSlice{}
×
583
                cont.serviceEndPoints.(*serviceEndpointSlice).cont = cont
×
584
                cont.log.Info("Initializing ServiceEndpointSlices")
×
585
        } else {
×
586
                cont.serviceEndPoints = &serviceEndpoint{}
×
587
                cont.serviceEndPoints.(*serviceEndpoint).cont = cont
×
588
                cont.log.Info("Initializing ServiceEndpoints")
×
589
        }
×
590

591
        err = cont.env.Init(cont)
×
592
        if err != nil {
×
593
                panic(err.Error())
×
594
        }
595
}
596

597
func (cont *AciController) processQueue(queue workqueue.RateLimitingInterface,
598
        store cache.Store, handler func(interface{}) bool,
599
        deleteHandler func(string) bool,
600
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
601
        go wait.Until(func() {
2✔
602
                for {
2✔
603
                        key, quit := queue.Get()
1✔
604
                        if quit {
2✔
605
                                break
1✔
606
                        }
607

608
                        var requeue bool
1✔
609
                        switch key := key.(type) {
1✔
610
                        case chan struct{}:
×
611
                                close(key)
×
612
                        case string:
1✔
613
                                if strings.HasPrefix(key, "DELETED_") {
2✔
614
                                        delKey := strings.Trim(key, "DELETED_")
1✔
615
                                        requeue = deleteHandler(delKey)
1✔
616
                                } else {
2✔
617
                                        obj, exists, err := store.GetByKey(key)
1✔
618
                                        if err != nil {
1✔
619
                                                cont.log.Debugf("Error fetching object with key %s from store: %v", key, err)
×
620
                                        }
×
621
                                        //Handle Add/Update/Delete
622
                                        if exists && handler != nil {
2✔
623
                                                requeue = handler(obj)
1✔
624
                                        }
1✔
625
                                        //Handle Post Delete
626
                                        if !exists && postDelHandler != nil {
1✔
627
                                                requeue = postDelHandler()
×
628
                                        }
×
629
                                }
630
                        }
631
                        if requeue {
2✔
632
                                queue.AddRateLimited(key)
1✔
633
                        } else {
2✔
634
                                queue.Forget(key)
1✔
635
                        }
1✔
636
                        queue.Done(key)
1✔
637
                }
638
        }, time.Second, stopCh)
639
        <-stopCh
1✔
640
        queue.ShutDown()
1✔
641
}
642

643
func (cont *AciController) processRemIpContQueue(queue workqueue.RateLimitingInterface,
644
        handler func(interface{}) bool,
645
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
646
        go wait.Until(func() {
2✔
647
                for {
2✔
648
                        key, quit := queue.Get()
1✔
649
                        if quit {
2✔
650
                                break
1✔
651
                        }
652

653
                        var requeue bool
1✔
654
                        switch key := key.(type) {
1✔
655
                        case chan struct{}:
×
656
                                close(key)
×
657
                        case string:
1✔
658
                                if handler != nil {
2✔
659
                                        requeue = handler(key)
1✔
660
                                }
1✔
661
                                if postDelHandler != nil {
2✔
662
                                        requeue = postDelHandler()
1✔
663
                                }
1✔
664
                        }
665
                        if requeue {
1✔
666
                                queue.AddRateLimited(key)
×
667
                        } else {
1✔
668
                                queue.Forget(key)
1✔
669
                        }
1✔
670
                        queue.Done(key)
1✔
671

672
                }
673
        }, time.Second, stopCh)
674
        <-stopCh
1✔
675
        queue.ShutDown()
1✔
676
}
677

678
func (cont *AciController) processEpgDnCacheUpdateQueue(queue workqueue.RateLimitingInterface,
679
        handler func(interface{}) bool,
680
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
681
        go wait.Until(func() {
2✔
682
                for {
2✔
683
                        key, quit := queue.Get()
1✔
684
                        if quit {
2✔
685
                                break
1✔
686
                        }
687

688
                        var requeue bool
1✔
689
                        switch key := key.(type) {
1✔
690
                        case chan struct{}:
×
691
                                close(key)
×
692
                        case bool:
1✔
693
                                if handler != nil {
2✔
694
                                        requeue = handler(key)
1✔
695
                                }
1✔
696
                                if postDelHandler != nil {
1✔
697
                                        requeue = postDelHandler()
×
698
                                }
×
699
                        }
700
                        if requeue {
1✔
701
                                queue.AddRateLimited(key)
×
702
                        } else {
1✔
703
                                queue.Forget(key)
1✔
704
                        }
1✔
705
                        queue.Done(key)
1✔
706

707
                }
708
        }, time.Second, stopCh)
709
        <-stopCh
1✔
710
        queue.ShutDown()
1✔
711
}
712

713
func (cont *AciController) globalStaticObjs() apicapi.ApicSlice {
1✔
714
        return apicapi.ApicSlice{}
1✔
715
}
1✔
716

717
func (cont *AciController) aciNameForKey(ktype, key string) string {
1✔
718
        return util.AciNameForKey(cont.config.AciPrefix, ktype, key)
1✔
719
}
1✔
720

721
func (cont *AciController) initStaticObjs() {
1✔
722
        cont.env.InitStaticAciObjects()
1✔
723
        cont.apicConn.WriteStaticApicObjects(cont.config.AciPrefix+"_static",
1✔
724
                cont.globalStaticObjs())
1✔
725
}
1✔
726

727
func (cont *AciController) vmmDomainProvider() (vmmProv string) {
1✔
728
        vmmProv = "Kubernetes"
1✔
729
        if strings.ToLower(cont.config.AciVmmDomainType) == "openshift" {
1✔
730
                vmmProv = "OpenShift"
×
731
        }
×
732
        return
1✔
733
}
734

735
func (cont *AciController) Run(stopCh <-chan struct{}) {
1✔
736
        var err error
1✔
737
        var privKey []byte
1✔
738
        var apicCert []byte
1✔
739

1✔
740
        cont.config.AciVrfDn = "uni/tn-" + cont.config.AciVrfTenant + "/ctx-" + cont.config.AciVrf
1✔
741

1✔
742
        if cont.config.ApicPrivateKeyPath != "" {
1✔
743
                privKey, err = os.ReadFile(cont.config.ApicPrivateKeyPath)
×
744
                if err != nil {
×
745
                        panic(err)
×
746
                }
747
        }
748
        if cont.config.ApicCertPath != "" {
1✔
749
                apicCert, err = os.ReadFile(cont.config.ApicCertPath)
×
750
                if err != nil {
×
751
                        panic(err)
×
752
                }
753
        }
754
        // If not defined, default is 1800
755
        if cont.config.ApicRefreshTimer == "" {
2✔
756
                cont.config.ApicRefreshTimer = "1800"
1✔
757
        }
1✔
758
        refreshTimeout, err := strconv.Atoi(cont.config.ApicRefreshTimer)
1✔
759
        if err != nil {
1✔
760
                panic(err)
×
761
        }
762
        cont.log.Info("ApicRefreshTimer conf is set to: ", refreshTimeout)
1✔
763

1✔
764
        // Bailout if the refreshTimeout is more than 12Hours or less than 5Mins
1✔
765
        if refreshTimeout > (12*60*60) || refreshTimeout < (5*60) {
1✔
766
                cont.log.Info("ApicRefreshTimer can't be more than 12Hrs or less than 5Mins")
×
767
                panic(err)
×
768
        }
769

770
        // If RefreshTickerAdjustInterval is not defined, default to 150Sec.
771
        if cont.config.ApicRefreshTickerAdjust == "" {
2✔
772
                cont.config.ApicRefreshTickerAdjust = "210"
1✔
773
        }
1✔
774
        refreshTickerAdjust, err := strconv.Atoi(cont.config.ApicRefreshTickerAdjust)
1✔
775
        if err != nil {
1✔
776
                panic(err)
×
777
        }
778

779
        //If ApicSubscriptionDelay is not defined, default to 100ms
780
        if cont.config.ApicSubscriptionDelay == 0 {
2✔
781
                cont.config.ApicSubscriptionDelay = 100
1✔
782
        }
1✔
783
        cont.log.Info("ApicSubscriptionDelay conf is set to: ", cont.config.ApicSubscriptionDelay)
1✔
784

1✔
785
        // If OpflexDeviceDeleteTimeout is not defined, default to 1800s
1✔
786
        if cont.config.OpflexDeviceDeleteTimeout == 0 {
2✔
787
                cont.config.OpflexDeviceDeleteTimeout = 1800
1✔
788
        }
1✔
789

790
        // If OpflexDeviceReconnectWaitTimeout is not defined, default to 25s
791
        if cont.config.OpflexDeviceReconnectWaitTimeout == 0 {
2✔
792
                cont.config.OpflexDeviceReconnectWaitTimeout = 25
1✔
793
        }
1✔
794
        cont.log.Debug("OpflexDeviceReconnectWaitTimeout set to: ", cont.config.OpflexDeviceReconnectWaitTimeout)
1✔
795

1✔
796
        // If SleepTimeSnatGlobalInfoSync is not defined, default to 60
1✔
797
        if cont.config.SleepTimeSnatGlobalInfoSync == 0 {
2✔
798
                cont.config.SleepTimeSnatGlobalInfoSync = 60
1✔
799
        }
1✔
800

801
        // If not defined, default to 32
802
        if cont.config.PodIpPoolChunkSize == 0 {
2✔
803
                cont.config.PodIpPoolChunkSize = 32
1✔
804
        }
1✔
805
        if !cont.config.ChainedMode {
2✔
806
                cont.log.Info("PodIpPoolChunkSize conf is set to: ", cont.config.PodIpPoolChunkSize)
1✔
807
        }
1✔
808

809
        // If ApicConnectionRetryLimit is not defined, default to 5
810
        if cont.config.ApicConnectionRetryLimit == 0 {
2✔
811
                cont.config.ApicConnectionRetryLimit = 5
1✔
812
        }
1✔
813
        cont.log.Debug("ApicConnectionRetryLimit set to: ", cont.config.ApicConnectionRetryLimit)
1✔
814

1✔
815
        // If not valid, default to 5000-65000
1✔
816
        // other permissible values 1-65000
1✔
817
        defStart := 5000
1✔
818
        defEnd := 65000
1✔
819
        if cont.config.SnatDefaultPortRangeStart == 0 {
2✔
820
                cont.config.SnatDefaultPortRangeStart = defStart
1✔
821
        }
1✔
822
        if cont.config.SnatDefaultPortRangeEnd == 0 {
2✔
823
                cont.config.SnatDefaultPortRangeEnd = defEnd
1✔
824
        }
1✔
825
        if cont.config.SnatDefaultPortRangeStart < 0 || cont.config.SnatDefaultPortRangeEnd < 0 ||
1✔
826
                cont.config.SnatDefaultPortRangeStart > defEnd || cont.config.SnatDefaultPortRangeEnd > defEnd ||
1✔
827
                cont.config.SnatDefaultPortRangeStart > cont.config.SnatDefaultPortRangeEnd {
1✔
828
                cont.config.SnatDefaultPortRangeStart = defStart
×
829
                cont.config.SnatDefaultPortRangeEnd = defEnd
×
830
        }
×
831

832
        // Set default value for pbr programming delay if services list is not empty
833
        // and delay value is empty
834
        if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 &&
1✔
835
                cont.config.ServiceGraphEndpointAddDelay.Services != nil &&
1✔
836
                len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 {
1✔
837
                cont.config.ServiceGraphEndpointAddDelay.Delay = 90
×
838
        }
×
839
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
840
                cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay)
×
841
        }
×
842

843
        // Set contract scope for snat svc graph to global by default
844
        if cont.config.SnatSvcContractScope == "" {
2✔
845
                cont.config.SnatSvcContractScope = "global"
1✔
846
        }
1✔
847
        if cont.config.MaxSvcGraphNodes == 0 {
2✔
848
                cont.config.MaxSvcGraphNodes = 32
1✔
849
        }
1✔
850
        if !cont.config.ChainedMode {
2✔
851
                cont.log.Info("Max number of nodes per svc graph is set to: ", cont.config.MaxSvcGraphNodes)
1✔
852
        }
1✔
853
        cont.apicConn, err = apicapi.New(cont.log, cont.config.ApicHosts,
1✔
854
                cont.config.ApicUsername, cont.config.ApicPassword,
1✔
855
                privKey, apicCert, cont.config.AciPrefix,
1✔
856
                refreshTimeout, refreshTickerAdjust, cont.config.ApicSubscriptionDelay,
1✔
857
                cont.config.AciVrfTenant, cont.UpdateLLDPIfLocked)
1✔
858
        if err != nil {
1✔
859
                panic(err)
×
860
        }
861

862
        cont.apicConn.ReconnectRetryLimit = cont.config.ApicConnectionRetryLimit
1✔
863
        cont.apicConn.RequestRetryDelay = cont.config.ApicRequestRetryDelay
1✔
864
        cont.apicConn.EnableRequestRetry = cont.config.EnableApicRequestRetry
1✔
865

1✔
866
        if len(cont.config.ApicHosts) != 0 {
1✔
867
        APIC_SWITCH:
×
868
                cont.log.WithFields(logrus.Fields{
×
869
                        "mod":  "APICAPI",
×
870
                        "host": cont.apicConn.Apic[cont.apicConn.ApicIndex],
×
871
                }).Debug("Connecting to APIC to determine the Version")
×
872

×
873
                version, err := cont.apicConn.GetVersion()
×
874
                if err != nil {
×
875
                        cont.log.Error("Could not get APIC version, switching to next APIC")
×
876
                        cont.apicConn.ApicIndex = (cont.apicConn.ApicIndex + 1) % len(cont.apicConn.Apic)
×
877
                        time.Sleep(cont.apicConn.ReconnectInterval)
×
878
                        goto APIC_SWITCH
×
879
                }
880
                cont.apicConn.CachedVersion = version
×
881
                apicapi.ApicVersion = version
×
882
                if version >= "4.2(4i)" {
×
883
                        cont.apicConn.SnatPbrFltrChain = true
×
884
                } else {
×
885
                        cont.apicConn.SnatPbrFltrChain = false
×
886
                }
×
887
                if version >= "5.2" {
×
888
                        cont.vmmClusterFaultSupported = true
×
889
                }
×
890
        } else { // For unit-tests
1✔
891
                cont.apicConn.SnatPbrFltrChain = true
1✔
892
        }
1✔
893

894
        if !cont.config.ChainedMode {
2✔
895
                cont.log.Debug("SnatPbrFltrChain set to:", cont.apicConn.SnatPbrFltrChain)
1✔
896
                // Make sure Pod/NodeBDs are assoicated to same VRF.
1✔
897
                if len(cont.config.ApicHosts) != 0 && cont.config.AciPodBdDn != "" && cont.config.AciNodeBdDn != "" {
1✔
898
                        var expectedVrfRelations []string
×
899
                        expectedVrfRelations = append(expectedVrfRelations, cont.config.AciPodBdDn, cont.config.AciNodeBdDn)
×
900
                        cont.log.Debug("expectedVrfRelations:", expectedVrfRelations)
×
901
                        err = cont.apicConn.ValidateAciVrfAssociation(cont.config.AciVrfDn, expectedVrfRelations)
×
902
                        if err != nil {
×
903
                                cont.log.Error("Pod/NodeBDs and AciL3Out VRF association is incorrect")
×
904
                                panic(err)
×
905
                        }
906
                }
907
        }
908

909
        if len(cont.config.ApicHosts) != 0 && cont.vmmClusterFaultSupported && !cont.config.ChainedMode {
1✔
910
                //Clear fault instances when the controller starts
×
911
                cont.clearFaultInstances()
×
912
                //Subscribe for vmmEpPD for a given domain
×
913
                var tnTargetFilterEpg string
×
914
                tnTargetFilterEpg += fmt.Sprintf("uni/vmmp-%s/dom-%s/", cont.vmmDomainProvider(), cont.config.AciVmmDomain)
×
915
                subnetTargetFilterEpg := fmt.Sprintf("and(wcard(vmmEpPD.dn,\"%s\"))", tnTargetFilterEpg)
×
916
                cont.apicConn.AddSubscriptionClass("vmmEpPD",
×
917
                        []string{"vmmEpPD"}, subnetTargetFilterEpg)
×
918
                cont.apicConn.SetSubscriptionHooks("vmmEpPD",
×
919
                        func(obj apicapi.ApicObject) bool {
×
920
                                cont.vmmEpPDChanged(obj)
×
921
                                return true
×
922
                        },
×
923
                        func(dn string) {
×
924
                                cont.vmmEpPDDeleted(dn)
×
925
                        })
×
926
        }
927

928
        cont.initStaticObjs()
1✔
929

1✔
930
        err = cont.env.PrepareRun(stopCh)
1✔
931
        if err != nil {
1✔
932
                panic(err.Error())
×
933
        }
934

935
        cont.apicConn.FullSyncHook = func() {
1✔
936
                // put a channel into each work queue and wait on it to
×
937
                // checkpoint object syncing in response to new subscription
×
938
                // updates
×
939
                cont.log.Debug("Starting checkpoint")
×
940
                var chans []chan struct{}
×
941
                qs := make([]workqueue.RateLimitingInterface, 0)
×
942
                _, ok := cont.env.(*K8sEnvironment)
×
943
                if ok {
×
944
                        qs = []workqueue.RateLimitingInterface{cont.podQueue}
×
945
                        if !cont.config.ChainedMode {
×
946
                                if !cont.config.DisableHppRendering {
×
947
                                        qs = append(qs, cont.netPolQueue)
×
948
                                }
×
949
                                if cont.config.EnableHppDirect {
×
950
                                        qs = append(qs, cont.remIpContQueue)
×
951
                                }
×
952
                                qs = append(qs, cont.qosQueue, cont.serviceQueue,
×
953
                                        cont.snatQueue, cont.netflowQueue, cont.snatNodeInfoQueue,
×
954
                                        cont.rdConfigQueue, cont.erspanQueue,
×
955
                                        cont.epgDnCacheUpdateQueue)
×
956
                        }
957
                }
958
                for _, q := range qs {
×
959
                        c := make(chan struct{})
×
960
                        chans = append(chans, c)
×
961
                        q.Add(c)
×
962
                }
×
963
                for _, c := range chans {
×
964
                        <-c
×
965
                }
×
966
                cont.log.Debug("Checkpoint complete")
×
967
        }
968

969
        if len(cont.config.ApicHosts) != 0 && !cont.config.ChainedMode {
1✔
970
                cont.BuildSubnetDnCache(cont.config.AciVrfDn, cont.config.AciVrfDn)
×
971
                cont.scheduleRdConfig()
×
972
                if strings.Contains(cont.config.Flavor, "openstack") {
×
973
                        cont.setOpenStackSystemId()
×
974
                }
×
975
        }
976

977
        if !cont.config.ChainedMode {
2✔
978
                if cont.config.AciPolicyTenant != cont.config.AciVrfTenant {
2✔
979
                        cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
980
                                []string{"hostprotPol"})
1✔
981
                }
1✔
982
        } else {
1✔
983
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
984
                        []string{"fvBD", "fvAp"})
1✔
985
                cont.apicConn.AddSubscriptionClass("fvnsVlanInstP",
1✔
986
                        []string{"fvnsVlanInstP"}, "")
1✔
987
                cont.apicConn.AddSubscriptionClass("infraRsDomP",
1✔
988
                        []string{"infraRsDomP"}, "")
1✔
989
                cont.apicConn.AddSubscriptionClass("physDomP",
1✔
990
                        []string{"physDomP"}, "")
1✔
991
                cont.apicConn.AddSubscriptionClass("l3extDomP",
1✔
992
                        []string{"l3extDomP"}, "")
1✔
993
                cont.apicConn.AddSubscriptionClass("infraRsVlanNs",
1✔
994
                        []string{"infraRsVlanNs"}, "")
1✔
995
                cont.apicConn.AddSubscriptionClass("infraGeneric",
1✔
996
                        []string{"infraGeneric", "infraRsFuncToEpg"}, "")
1✔
997
                cont.apicConn.AddSubscriptionClass("l3extOut",
1✔
998
                        []string{"l3extInstP", "l3extSubnet", "fvRsCons", "fvRsProv", "l3extRsEctx", "l3extRsL3DomAtt", "l3extLNodeP", "l3extRsNodeL3OutAtt", "ipRouteP", "ipNexthopP", "l3extLIfP", "l3extVirtualLIfP", "l3extRsDynPathAtt",
1✔
999
                                "l3extRsPathL3OutAtt", "l3extMember", "l3extIp", "bgpExtP", "bgpPeerP", "bgpAsP", "bgpLocalAsnP", "bgpRsPeerPfxPol"}, "")
1✔
1000
                cont.apicConn.AddSubscriptionClass("bgpPeerPfxPol",
1✔
1001
                        []string{"bgpPeerPfxPol"}, "")
1✔
1002
        }
1✔
1003
        if !cont.config.ChainedMode {
2✔
1004
                // When a new class is added for subscriptio, check if its name attribute
1✔
1005
                // is in the format aciPrefix-<some value>, if so add it in nameAttrClass
1✔
1006
                // in apicapi.go
1✔
1007
                subscribeMo := []string{"fvBD", "vnsLDevVip", "vnsAbsGraph", "vnsLDevCtx",
1✔
1008
                        "vzFilter", "vzBrCP", "l3extInstP", "vnsSvcRedirectPol",
1✔
1009
                        "vnsRedirectHealthGroup", "fvIPSLAMonitoringPol"}
1✔
1010
                if cont.config.AciPolicyTenant == cont.config.AciVrfTenant {
1✔
1011
                        subscribeMo = append(subscribeMo, "hostprotPol")
×
1012
                }
×
1013
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciVrfTenant,
1✔
1014
                        subscribeMo)
1✔
1015
                cont.apicConn.AddSubscriptionDn(fmt.Sprintf("uni/tn-%s/out-%s",
1✔
1016
                        cont.config.AciVrfTenant, cont.config.AciL3Out),
1✔
1017
                        []string{"fvRsCons"})
1✔
1018
                vmmDn := fmt.Sprintf("comp/prov-%s/ctrlr-[%s]-%s/injcont",
1✔
1019
                        cont.env.VmmPolicy(), cont.config.AciVmmDomain,
1✔
1020
                        cont.config.AciVmmController)
1✔
1021
                // Before subscribing to vmm objects, add vmmInjectedLabel as a child after explicit APIC version check
1✔
1022
                // Since it is not supported for APIC versions < "5.0"
1✔
1023
                cont.addVmmInjectedLabel()
1✔
1024
                cont.apicConn.AddSubscriptionDn(vmmDn,
1✔
1025
                        []string{"vmmInjectedHost", "vmmInjectedNs"})
1✔
1026

1✔
1027
                var tnTargetFilter string
1✔
1028
                if len(cont.config.AciVrfRelatedTenants) > 0 {
1✔
1029
                        for _, tn := range cont.config.AciVrfRelatedTenants {
×
1030
                                tnTargetFilter += fmt.Sprintf("tn-%s/|", tn)
×
1031
                        }
×
1032
                } else {
1✔
1033
                        tnTargetFilter += fmt.Sprintf("tn-%s/|tn-%s/",
1✔
1034
                                cont.config.AciPolicyTenant, cont.config.AciVrfTenant)
1✔
1035
                }
1✔
1036
                subnetTargetFilter := fmt.Sprintf("and(wcard(fvSubnet.dn,\"%s\"))",
1✔
1037
                        tnTargetFilter)
1✔
1038
                cont.apicConn.AddSubscriptionClass("fvSubnet",
1✔
1039
                        []string{"fvSubnet"}, subnetTargetFilter)
1✔
1040

1✔
1041
                cont.apicConn.SetSubscriptionHooks("fvSubnet",
1✔
1042
                        func(obj apicapi.ApicObject) bool {
1✔
1043
                                cont.SubnetChanged(obj, cont.config.AciVrfDn)
×
1044
                                return true
×
1045
                        },
×
1046
                        func(dn string) {
×
1047
                                cont.SubnetDeleted(dn)
×
1048
                        })
×
1049

1050
                var opflexODevFilter string
1✔
1051
                if strings.Contains(cont.config.Flavor, "openstack") {
1✔
1052
                        opflexODevFilter = fmt.Sprintf("or(eq(opflexODev.domName,\"%s\"),wcard(opflexODev.compHvDn,\"prov-OpenStack\"))", cont.config.AciVmmDomain)
×
1053
                } else {
1✔
1054
                        opflexODevFilter = fmt.Sprintf("eq(opflexODev.domName,\"%s\")", cont.config.AciVmmDomain)
1✔
1055
                }
1✔
1056
                cont.apicConn.AddSubscriptionClass("opflexODev",
1✔
1057
                        []string{"opflexODev"}, opflexODevFilter)
1✔
1058

1✔
1059
                cont.apicConn.SetSubscriptionHooks("opflexODev",
1✔
1060
                        func(obj apicapi.ApicObject) bool {
1✔
1061
                                cont.opflexDeviceChanged(obj)
×
1062
                                return true
×
1063
                        },
×
1064
                        func(dn string) {
×
1065
                                cont.opflexDeviceDeleted(dn)
×
1066
                        })
×
1067

1068
                if !cont.config.DisableServiceVlanPreprovisioning && !strings.Contains(cont.config.Flavor, "openstack") {
2✔
1069
                        if cont.config.AEP == "" {
2✔
1070
                                cont.log.Error("AEP is missing in configuration, preprovisioning of service vlan will be disabled")
1✔
1071
                        } else {
1✔
1072
                                infraRtAttEntPFilter := fmt.Sprintf("and(wcard(infraRtAttEntP.dn,\"/attentp-%s/\"))", cont.config.AEP)
×
1073
                                cont.apicConn.AddSubscriptionClass("infraRtAttEntP",
×
1074
                                        []string{"infraRtAttEntP"}, infraRtAttEntPFilter)
×
1075

×
1076
                                // For bare metal, the infraRtAttEntP associated with an AEP will be empty.
×
1077
                                // We should not receive any updates for such cases.
×
1078
                                cont.apicConn.SetSubscriptionHooks("infraRtAttEntP",
×
1079
                                        func(obj apicapi.ApicObject) bool {
×
1080
                                                cont.infraRtAttEntPChanged(obj)
×
1081
                                                return true
×
1082
                                        },
×
1083
                                        func(dn string) {
×
1084
                                                cont.infraRtAttEntPDeleted(dn)
×
1085
                                        })
×
1086

NEW
1087
                                cont.apicConn.AddSubscriptionClass("vpcIf",
×
NEW
1088
                                        []string{"vpcIf"}, "")
×
NEW
1089

×
NEW
1090
                                cont.apicConn.SetSubscriptionHooks("vpcIf",
×
NEW
1091
                                        func(obj apicapi.ApicObject) bool {
×
NEW
1092
                                                cont.vpcIfChanged(obj)
×
NEW
1093
                                                return true
×
NEW
1094
                                        },
×
NEW
1095
                                        func(dn string) {
×
NEW
1096
                                                cont.vpcIfDeleted(dn)
×
NEW
1097
                                        })
×
1098
                        }
1099
                }
1100

1101
                cont.apicConn.VersionUpdateHook =
1✔
1102
                        func() {
1✔
1103
                                cont.initStaticServiceObjs()
×
1104
                        }
×
1105
        }
1106
        go cont.apicConn.Run(stopCh)
1✔
1107
}
1108

1109
func (cont *AciController) syncNodeAciPods(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1110
        cont.log.Debug("Go routine to periodically check the aci pod information in the opflexOdev of a node")
1✔
1111
        ticker := time.NewTicker(seconds * time.Second)
1✔
1112
        defer ticker.Stop()
1✔
1113

1✔
1114
        for {
2✔
1115
                select {
1✔
1116
                case <-ticker.C:
1✔
1117
                        if cont.config.EnableOpflexAgentReconnect {
1✔
1118
                                cont.checkChangeOfOpflexOdevAciPod()
×
1119
                        }
×
1120
                        if cont.config.AciMultipod {
1✔
1121
                                cont.checkChangeOfOdevAciPod()
×
1122
                        }
×
1123
                case <-stopCh:
1✔
1124
                        return
1✔
1125
                }
1126
        }
1127
}
1128

1129
func (cont *AciController) syncOpflexDevices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1130
        cont.log.Debug("Go routine to periodically delete old opflexdevices started")
1✔
1131
        ticker := time.NewTicker(seconds * time.Second)
1✔
1132
        defer ticker.Stop()
1✔
1133

1✔
1134
        for {
2✔
1135
                select {
1✔
1136
                case <-ticker.C:
1✔
1137
                        cont.deleteOldOpflexDevices()
1✔
1138
                case <-stopCh:
1✔
1139
                        return
1✔
1140
                }
1141
        }
1142
}
1143

1144
func (cont *AciController) syncDelayedEpSlices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1145
        cont.log.Debug("Go routine to periodically check and process the epslices having delay adding in service")
1✔
1146
        ticker := time.NewTicker(seconds * time.Second)
1✔
1147
        defer ticker.Stop()
1✔
1148

1✔
1149
        for {
2✔
1150
                select {
1✔
1151
                case <-ticker.C:
1✔
1152
                        cont.processDelayedEpSlices()
1✔
1153
                case <-stopCh:
1✔
1154
                        return
1✔
1155
                }
1156
        }
1157
}
1158

1159
func (cont *AciController) snatGlobalInfoSync(stopCh <-chan struct{}, seconds int) {
1✔
1160
        time.Sleep(time.Duration(seconds) * time.Second)
1✔
1161
        cont.log.Debug("Go routine to periodically sync globalinfo and nodeinfo started")
1✔
1162
        iteration := 0
1✔
1163
        for {
2✔
1164
                // To avoid noisy logs, only printing once in 5 minutes
1✔
1165
                if iteration%5 == 0 {
2✔
1166
                        cont.log.Debug("Syncing GlobalInfo with Node infos")
1✔
1167
                }
1✔
1168
                var nodeInfos []*nodeinfo.NodeInfo
1✔
1169
                cont.indexMutex.Lock()
1✔
1170
                cache.ListAll(cont.snatNodeInfoIndexer, labels.Everything(),
1✔
1171
                        func(nodeInfoObj interface{}) {
2✔
1172
                                nodeInfo := nodeInfoObj.(*nodeinfo.NodeInfo)
1✔
1173
                                nodeInfos = append(nodeInfos, nodeInfo)
1✔
1174
                        })
1✔
1175
                expectedmap := make(map[string]map[string]bool)
1✔
1176
                for _, glinfo := range cont.snatGlobalInfoCache {
2✔
1177
                        for nodename, entry := range glinfo {
2✔
1178
                                if _, found := expectedmap[nodename]; !found {
2✔
1179
                                        newentry := make(map[string]bool)
1✔
1180
                                        newentry[entry.SnatPolicyName] = true
1✔
1181
                                        expectedmap[nodename] = newentry
1✔
1182
                                } else {
2✔
1183
                                        currententry := expectedmap[nodename]
1✔
1184
                                        currententry[entry.SnatPolicyName] = true
1✔
1185
                                        expectedmap[nodename] = currententry
1✔
1186
                                }
1✔
1187
                        }
1188
                }
1189
                cont.indexMutex.Unlock()
1✔
1190

1✔
1191
                for _, value := range nodeInfos {
2✔
1192
                        marked := false
1✔
1193
                        policyNames := value.Spec.SnatPolicyNames
1✔
1194
                        nodeName := value.ObjectMeta.Name
1✔
1195
                        _, ok := expectedmap[nodeName]
1✔
1196
                        if !ok && len(policyNames) > 0 {
2✔
1197
                                cont.log.Info("Adding missing entry in snatglobalinfo for node: ", nodeName)
1✔
1198
                                cont.log.Debug("No snat policies found in snatglobalinfo")
1✔
1199
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1200
                                marked = true
1✔
1201
                        } else if len(policyNames) != len(expectedmap[nodeName]) {
3✔
1202
                                cont.log.Info("Adding missing snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1203
                                cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1204
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1205
                                marked = true
1✔
1206
                        } else {
2✔
1207
                                if len(policyNames) == 0 && len(expectedmap[nodeName]) == 0 {
1✔
1208
                                        // No snatpolicies present
×
1209
                                        continue
×
1210
                                }
1211
                                eq := reflect.DeepEqual(expectedmap[nodeName], policyNames)
1✔
1212
                                if !eq {
2✔
1213
                                        cont.log.Debug("Syncing inconsistent snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1214
                                        cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1215
                                        cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1216
                                        marked = true
1✔
1217
                                }
1✔
1218
                        }
1219
                        if marked {
2✔
1220
                                cont.log.Info("Nodeinfo and globalinfo out of sync for node: ", nodeName)
1✔
1221
                                nodeinfokey, err := cache.MetaNamespaceKeyFunc(value)
1✔
1222
                                if err != nil {
1✔
1223
                                        cont.log.Error("Not able to get key for node: ", nodeName)
×
1224
                                        continue
×
1225
                                }
1226
                                cont.log.Info("Queuing nodeinfokey for globalinfo sync: ", nodeinfokey)
1✔
1227
                                cont.queueNodeInfoUpdateByKey(nodeinfokey)
1✔
1228
                        } else if iteration%5 == 0 {
2✔
1229
                                cont.log.Info("Nodeinfo and globalinfo in sync for node: ", nodeName)
1✔
1230
                        }
1✔
1231
                }
1232
                time.Sleep(time.Duration(seconds) * time.Second)
1✔
1233
                iteration++
1✔
1234
        }
1235
}
1236

1237
func (cont *AciController) processSyncQueue(queue workqueue.RateLimitingInterface,
1238
        queueStop <-chan struct{}) {
1✔
1239
        go wait.Until(func() {
2✔
1240
                for {
2✔
1241
                        syncType, quit := queue.Get()
1✔
1242
                        if quit {
2✔
1243
                                break
1✔
1244
                        }
1245
                        var requeue bool
1✔
1246
                        if sType, ok := syncType.(string); ok {
2✔
1247
                                if f, ok := cont.syncProcessors[sType]; ok {
2✔
1248
                                        requeue = f()
1✔
1249
                                }
1✔
1250
                        }
1251
                        if requeue {
1✔
1252
                                queue.AddRateLimited(syncType)
×
1253
                        } else {
1✔
1254
                                queue.Forget(syncType)
1✔
1255
                        }
1✔
1256
                        queue.Done(syncType)
1✔
1257
                }
1258
        }, time.Second, queueStop)
1259
        <-queueStop
1✔
1260
        queue.ShutDown()
1✔
1261
}
1262

1263
func (cont *AciController) scheduleSyncGlobalInfo() {
1✔
1264
        cont.syncQueue.AddRateLimited("snatGlobalInfo")
1✔
1265
}
1✔
1266
func (cont *AciController) scheduleRdConfig() {
×
1267
        cont.syncQueue.AddRateLimited("rdConfig")
×
1268
}
×
1269
func (cont *AciController) scheduleCreateIstioCR() {
×
1270
        cont.syncQueue.AddRateLimited("istioCR")
×
1271
}
×
1272

1273
func (cont *AciController) addVmmInjectedLabel() {
1✔
1274
        if apicapi.ApicVersion >= "5.2" {
1✔
1275
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedLabel")
×
1276
                if err != nil {
×
1277
                        panic(err.Error())
×
1278
                }
1279
                err = apicapi.AddMetaDataChild("vmmInjectedSvc", "vmmInjectedLabel")
×
1280
                if err != nil {
×
1281
                        panic(err.Error())
×
1282
                }
1283
        }
1284
        if apicapi.ApicVersion >= "5.0" {
2✔
1285
                err := apicapi.AddMetaDataChild("vmmInjectedReplSet", "vmmInjectedLabel")
1✔
1286
                if err != nil {
1✔
1287
                        panic(err.Error())
×
1288
                }
1289
                err = apicapi.AddMetaDataChild("vmmInjectedContGrp", "vmmInjectedLabel")
1✔
1290
                if err != nil {
1✔
1291
                        panic(err.Error())
×
1292
                }
1293
                err = apicapi.AddMetaDataChild("vmmInjectedDepl", "vmmInjectedLabel")
1✔
1294
                if err != nil {
1✔
1295
                        panic(err.Error())
×
1296
                }
1297
        }
1298
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc