• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 10628

16 May 2025 06:30PM UTC coverage: 68.723% (-0.07%) from 68.789%
10628

push

travis-pro

web-flow
Merge pull request #1522 from noironetworks/service-vlan-fix-backport

Skip service VLAN pre-provisioning for single leaf

0 of 13 new or added lines in 2 files covered. (0.0%)

11 existing lines in 4 files now uncovered.

13324 of 19388 relevant lines covered (68.72%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.86
/pkg/controller/controller.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "encoding/json"
19
        "fmt"
20
        "net"
21
        "os"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/sirupsen/logrus"
29
        "github.com/yl2chen/cidranger"
30
        "golang.org/x/time/rate"
31

32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        v1net "k8s.io/api/networking/v1"
35
        "k8s.io/apimachinery/pkg/labels"
36
        "k8s.io/apimachinery/pkg/util/wait"
37
        "k8s.io/client-go/kubernetes"
38
        "k8s.io/client-go/tools/cache"
39
        "k8s.io/client-go/util/workqueue"
40

41
        "github.com/noironetworks/aci-containers/pkg/apicapi"
42
        fabattv1 "github.com/noironetworks/aci-containers/pkg/fabricattachment/apis/aci.fabricattachment/v1"
43
        fabattclset "github.com/noironetworks/aci-containers/pkg/fabricattachment/clientset/versioned"
44
        hppv1 "github.com/noironetworks/aci-containers/pkg/hpp/apis/aci.hpp/v1"
45
        "github.com/noironetworks/aci-containers/pkg/index"
46
        "github.com/noironetworks/aci-containers/pkg/ipam"
47
        istiov1 "github.com/noironetworks/aci-containers/pkg/istiocrd/apis/aci.istio/v1"
48
        "github.com/noironetworks/aci-containers/pkg/metadata"
49
        nodeinfo "github.com/noironetworks/aci-containers/pkg/nodeinfo/apis/aci.snat/v1"
50
        rdConfig "github.com/noironetworks/aci-containers/pkg/rdconfig/apis/aci.snat/v1"
51
        snatglobalinfo "github.com/noironetworks/aci-containers/pkg/snatglobalinfo/apis/aci.snat/v1"
52
        "github.com/noironetworks/aci-containers/pkg/util"
53
)
54

55
type podUpdateFunc func(*v1.Pod) (*v1.Pod, error)
56
type nodeUpdateFunc func(*v1.Node) (*v1.Node, error)
57
type serviceUpdateFunc func(*v1.Service) (*v1.Service, error)
58
type listNetworkPoliciesFunc func(string) (*v1net.NetworkPolicyList, error)
59
type listNamespacesFunc func() (*v1.NamespaceList, error)
60

61
type AciController struct {
62
        log    *logrus.Logger
63
        config *ControllerConfig
64
        env    Environment
65

66
        defaultEg string
67
        defaultSg string
68

69
        unitTestMode bool
70

71
        podQueue              workqueue.RateLimitingInterface
72
        netPolQueue           workqueue.RateLimitingInterface
73
        qosQueue              workqueue.RateLimitingInterface
74
        serviceQueue          workqueue.RateLimitingInterface
75
        snatQueue             workqueue.RateLimitingInterface
76
        netflowQueue          workqueue.RateLimitingInterface
77
        erspanQueue           workqueue.RateLimitingInterface
78
        snatNodeInfoQueue     workqueue.RateLimitingInterface
79
        rdConfigQueue         workqueue.RateLimitingInterface
80
        istioQueue            workqueue.RateLimitingInterface
81
        nodeFabNetAttQueue    workqueue.RateLimitingInterface
82
        netFabConfigQueue     workqueue.RateLimitingInterface
83
        nadVlanMapQueue       workqueue.RateLimitingInterface
84
        fabricVlanPoolQueue   workqueue.RateLimitingInterface
85
        netFabL3ConfigQueue   workqueue.RateLimitingInterface
86
        remIpContQueue        workqueue.RateLimitingInterface
87
        epgDnCacheUpdateQueue workqueue.RateLimitingInterface
88

89
        namespaceIndexer                     cache.Indexer
90
        namespaceInformer                    cache.Controller
91
        podIndexer                           cache.Indexer
92
        podInformer                          cache.Controller
93
        endpointsIndexer                     cache.Indexer
94
        endpointsInformer                    cache.Controller
95
        serviceIndexer                       cache.Indexer
96
        serviceInformer                      cache.Controller
97
        replicaSetIndexer                    cache.Indexer
98
        replicaSetInformer                   cache.Controller
99
        deploymentIndexer                    cache.Indexer
100
        deploymentInformer                   cache.Controller
101
        nodeIndexer                          cache.Indexer
102
        nodeInformer                         cache.Controller
103
        networkPolicyIndexer                 cache.Indexer
104
        networkPolicyInformer                cache.Controller
105
        snatIndexer                          cache.Indexer
106
        snatInformer                         cache.Controller
107
        snatNodeInfoIndexer                  cache.Indexer
108
        snatNodeInformer                     cache.Controller
109
        snatLocalInfoInformer                cache.Controller
110
        crdInformer                          cache.Controller
111
        rdConfigInformer                     cache.Controller
112
        rdConfigIndexer                      cache.Indexer
113
        qosIndexer                           cache.Indexer
114
        qosInformer                          cache.Controller
115
        netflowIndexer                       cache.Indexer
116
        netflowInformer                      cache.Controller
117
        erspanIndexer                        cache.Indexer
118
        erspanInformer                       cache.Controller
119
        nodePodIfIndexer                     cache.Indexer
120
        nodePodIfInformer                    cache.Controller
121
        istioIndexer                         cache.Indexer
122
        istioInformer                        cache.Controller
123
        endpointSliceIndexer                 cache.Indexer
124
        endpointSliceInformer                cache.Controller
125
        snatCfgInformer                      cache.Controller
126
        updatePod                            podUpdateFunc
127
        updateNode                           nodeUpdateFunc
128
        updateServiceStatus                  serviceUpdateFunc
129
        listNetworkPolicies                  listNetworkPoliciesFunc
130
        listNamespaces                       listNamespacesFunc
131
        nodeFabNetAttInformer                cache.SharedIndexInformer
132
        netFabConfigInformer                 cache.SharedIndexInformer
133
        nadVlanMapInformer                   cache.SharedIndexInformer
134
        fabricVlanPoolInformer               cache.SharedIndexInformer
135
        networkFabricL3ConfigurationInformer cache.SharedIndexInformer
136
        fabNetAttClient                      *fabattclset.Clientset
137
        proactiveConfInformer                cache.SharedIndexInformer
138

139
        indexMutex sync.Mutex
140
        hppMutex   sync.Mutex
141

142
        configuredPodNetworkIps *netIps
143
        podNetworkIps           *netIps
144
        serviceIps              *ipam.IpCache
145
        staticServiceIps        *netIps
146
        nodeServiceIps          *netIps
147

148
        // index of pods matched by deployments
149
        depPods *index.PodSelectorIndex
150
        // index of pods matched by network policies
151
        netPolPods *index.PodSelectorIndex
152
        // index of pods matched by network policy ingress rules
153
        netPolIngressPods *index.PodSelectorIndex
154
        // index of pods matched by network policy egress rules
155
        netPolEgressPods *index.PodSelectorIndex
156
        // index of IP addresses contained in endpoints objects
157
        endpointsIpIndex cidranger.Ranger
158
        // index of service target ports
159
        targetPortIndex map[string]*portIndexEntry
160
        // index of ip blocks referenced by network policy egress rules
161
        netPolSubnetIndex cidranger.Ranger
162
        // index of pods matched by erspan policies
163
        erspanPolPods *index.PodSelectorIndex
164

165
        apicConn *apicapi.ApicConnection
166

167
        nodeServiceMetaCache map[string]*nodeServiceMeta
168
        nodeACIPod           map[string]aciPodAnnot
169
        nodeACIPodAnnot      map[string]aciPodAnnot
170
        nodeOpflexDevice     map[string]apicapi.ApicSlice
171
        nodePodNetCache      map[string]*nodePodNetMeta
172
        serviceMetaCache     map[string]*serviceMeta
173
        snatPolicyCache      map[string]*ContSnatPolicy
174
        delayedEpSlices      []*DelayedEpSlice
175
        snatServices         map[string]bool
176
        snatNodeInfoCache    map[string]*nodeinfo.NodeInfo
177
        rdConfigCache        map[string]*rdConfig.RdConfig
178
        rdConfigSubnetCache  map[string]*rdConfig.RdConfigSpec
179
        istioCache           map[string]*istiov1.AciIstioOperator
180
        podIftoEp            map[string]*EndPointData
181
        // Node Name and Policy Name
182
        snatGlobalInfoCache map[string]map[string]*snatglobalinfo.GlobalInfo
183
        nodeSyncEnabled     bool
184
        serviceSyncEnabled  bool
185
        snatSyncEnabled     bool
186
        syncQueue           workqueue.RateLimitingInterface
187
        syncProcessors      map[string]func() bool
188
        serviceEndPoints    ServiceEndPointType
189
        crdHandlers         map[string]func(*AciController, <-chan struct{})
190
        stopCh              <-chan struct{}
191
        //index of containerportname to ctrPortNameEntry
192
        ctrPortNameCache map[string]*ctrPortNameEntry
193
        // named networkPolicies
194
        nmPortNp map[string]bool
195
        //maps network policy hash to hpp
196
        hppRef map[string]hppReference
197
        //map for ns to remoteIpConts
198
        nsRemoteIpCont map[string]remoteIpConts
199
        // cache to look for Epg DNs which are bound to Vmm domain
200
        cachedEpgDns             []string
201
        vmmClusterFaultSupported bool
202
        additionalNetworkCache   map[string]*AdditionalNetworkMeta
203
        //Used in Shared mode
204
        sharedEncapCache       map[int]*sharedEncapData
205
        sharedEncapAepCache    map[string]map[int]bool
206
        sharedEncapSviCache    map[int]*NfL3Data
207
        sharedEncapVrfCache    map[string]*NfVrfData
208
        sharedEncapTenantCache map[string]*NfTenantData
209
        nfl3configGenerationId int64
210
        // vlan to propertiesList
211
        sharedEncapNfcCache         map[int]*NfcData
212
        sharedEncapNfcVlanMap       map[int]*NfcData
213
        sharedEncapNfcLabelMap      map[string]*NfcData
214
        sharedEncapNfcAppProfileMap map[string]bool
215
        // nadVlanMap encapLabel to vlan
216
        sharedEncapLabelMap      map[string][]int
217
        lldpIfCache              map[string]*NfLLDPIfData
218
        globalVlanConfig         globalVlanConfig
219
        fabricVlanPoolMap        map[string]map[string]string
220
        openStackFabricPathDnMap map[string]openstackOpflexOdevInfo
221
        hostFabricPathDnMap      map[string]string
222
        openStackSystemId        string
223
}
224

225
type NfLLDPIfData struct {
226
        LLDPIf string
227
        // As of now, manage at the NAD level
228
        // more granular introduces intf tracking complexities
229
        // for not sufficient benefits
230
        Refs map[string]bool
231
}
232

233
type NfL3OutData struct {
234
        // +kubebuilder:validation:Enum:"import"
235
        RtCtrl     string
236
        PodId      int
237
        RtrNodeMap map[int]*fabattv1.FabricL3OutRtrNode
238
        ExtEpgMap  map[string]*fabattv1.PolicyPrefixGroup
239
        SviMap     map[int]bool
240
}
241

242
type NfTenantData struct {
243
        CommonTenant     bool
244
        L3OutConfig      map[string]*NfL3OutData
245
        BGPPeerPfxConfig map[string]*fabattv1.BGPPeerPrefixPolicy
246
}
247

248
type NfVrfData struct {
249
        TenantConfig map[string]*NfTenantData
250
}
251

252
type NfL3Networks struct {
253
        fabattv1.PrimaryNetwork
254
        Subnets map[string]*fabattv1.FabricL3Subnet
255
}
256

257
type NfL3Data struct {
258
        Tenant      string
259
        Vrf         fabattv1.VRF
260
        PodId       int
261
        ConnectedNw *NfL3Networks
262
        NetAddr     map[string]*RoutedNetworkData
263
        Nodes       map[int]fabattv1.FabricL3OutNode
264
}
265

266
// maps pod name to remoteIpCont
267
type remoteIpConts map[string]remoteIpCont
268

269
// remoteIpCont maps ip to pod labels
270
type remoteIpCont map[string]map[string]string
271

272
type NfcData struct {
273
        Aeps map[string]bool
274
        Epg  fabattv1.Epg
275
}
276

277
type sharedEncapData struct {
278
        //node to NAD to pods
279
        Pods   map[string]map[string][]string
280
        NetRef map[string]*AdditionalNetworkMeta
281
        Aeps   map[string]bool
282
}
283

284
type globalVlanConfig struct {
285
        SharedPhysDom apicapi.ApicObject
286
        SharedL3Dom   apicapi.ApicObject
287
}
288

289
type hppReference struct {
290
        RefCount uint              `json:"ref-count,omitempty"`
291
        Npkeys   []string          `json:"npkeys,omitempty"`
292
        HppObj   apicapi.ApicSlice `json:"hpp-obj,omitempty"`
293
        HppCr    hppv1.HostprotPol `json:"hpp-cr,omitempty"`
294
}
295

296
type DelayedEpSlice struct {
297
        ServiceKey  string
298
        OldEpSlice  *discovery.EndpointSlice
299
        NewEpSlice  *discovery.EndpointSlice
300
        DelayedTime time.Time
301
}
302

303
type aciPodAnnot struct {
304
        aciPod             string
305
        isSingleOpflexOdev bool
306
        disconnectTime     time.Time
307
        connectTime        time.Time
308
        lastErrorTime      time.Time
309
}
310

311
type nodeServiceMeta struct {
312
        serviceEp metadata.ServiceEndpoint
313
}
314

315
type nodePodNetMeta struct {
316
        nodePods            map[string]bool
317
        podNetIps           metadata.NetIps
318
        podNetIpsAnnotation string
319
}
320

321
type openstackOpflexOdevInfo struct {
322
        opflexODevDn map[string]struct{}
323
        fabricPathDn string
324
}
325

326
type serviceMeta struct {
327
        requestedIps     []net.IP
328
        ingressIps       []net.IP
329
        staticIngressIps []net.IP
330
}
331

332
type ipIndexEntry struct {
333
        ipNet net.IPNet
334
        keys  map[string]bool
335
}
336

337
type targetPort struct {
338
        proto v1.Protocol
339
        ports []int
340
}
341

342
type portIndexEntry struct {
343
        port              targetPort
344
        serviceKeys       map[string]bool
345
        networkPolicyKeys map[string]bool
346
}
347

348
type portRangeSnat struct {
349
        start int
350
        end   int
351
}
352

353
// EndPointData holds PodIF data in controller.
354
type EndPointData struct {
355
        MacAddr    string
356
        EPG        string
357
        Namespace  string
358
        AppProfile string
359
}
360

361
type ctrPortNameEntry struct {
362
        // Proto+port->pods
363
        ctrNmpToPods map[string]map[string]bool
364
}
365

366
type LinkData struct {
367
        Link []string
368
        Pods []string
369
}
370

371
type RoutedNodeData struct {
372
        addr string
373
        idx  int
374
}
375

376
type RoutedNetworkData struct {
377
        subnet       string
378
        netAddress   string
379
        maskLen      int
380
        numAllocated int
381
        maxAddresses int
382
        baseAddress  net.IP
383
        nodeMap      map[string]RoutedNodeData
384
        availableMap map[int]bool
385
}
386

387
type AdditionalNetworkMeta struct {
388
        NetworkName string
389
        EncapVlan   string
390
        //node+localiface->fabricLinks
391
        FabricLink map[string]map[string]LinkData
392
        NodeCache  map[string]*fabattv1.NodeFabricNetworkAttachment
393
        Mode       util.EncapMode
394
}
395

396
type ServiceEndPointType interface {
397
        InitClientInformer(kubeClient *kubernetes.Clientset)
398
        Run(stopCh <-chan struct{})
399
        Wait(stopCh <-chan struct{})
400
        UpdateServicesForNode(nodename string)
401
        GetnodesMetadata(key string, service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint)
402
        SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool
403
        SetNpServiceAugmentForService(servicekey string, service *v1.Service, prs *portRemoteSubnet,
404
                portAugments map[string]*portServiceAugment, subnetIndex cidranger.Ranger, logger *logrus.Entry)
405
}
406

407
type serviceEndpoint struct {
408
        cont *AciController
409
}
410
type serviceEndpointSlice struct {
411
        cont *AciController
412
}
413

414
func (sep *serviceEndpoint) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
415
        sep.cont.initEndpointsInformerFromClient(kubeClient)
×
416
}
×
417

418
func (seps *serviceEndpointSlice) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
419
        seps.cont.initEndpointSliceInformerFromClient(kubeClient)
×
420
}
×
421

422
func (sep *serviceEndpoint) Run(stopCh <-chan struct{}) {
1✔
423
        go sep.cont.endpointsInformer.Run(stopCh)
1✔
424
}
1✔
425

426
func (seps *serviceEndpointSlice) Run(stopCh <-chan struct{}) {
1✔
427
        go seps.cont.endpointSliceInformer.Run(stopCh)
1✔
428
}
1✔
429

430
func (sep *serviceEndpoint) Wait(stopCh <-chan struct{}) {
1✔
431
        cache.WaitForCacheSync(stopCh,
1✔
432
                sep.cont.endpointsInformer.HasSynced,
1✔
433
                sep.cont.serviceInformer.HasSynced)
1✔
434
}
1✔
435

436
func (seps *serviceEndpointSlice) Wait(stopCh <-chan struct{}) {
1✔
437
        seps.cont.log.Debug("Waiting for EndPointSlicecache sync")
1✔
438
        cache.WaitForCacheSync(stopCh,
1✔
439
                seps.cont.endpointSliceInformer.HasSynced,
1✔
440
                seps.cont.serviceInformer.HasSynced)
1✔
441
}
1✔
442

443
func (e *ipIndexEntry) Network() net.IPNet {
1✔
444
        return e.ipNet
1✔
445
}
1✔
446

447
func newNodePodNetMeta() *nodePodNetMeta {
1✔
448
        return &nodePodNetMeta{
1✔
449
                nodePods: make(map[string]bool),
1✔
450
        }
1✔
451
}
1✔
452

453
func createQueue(name string) workqueue.RateLimitingInterface {
1✔
454
        return workqueue.NewNamedRateLimitingQueue(
1✔
455
                workqueue.NewMaxOfRateLimiter(
1✔
456
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
457
                                10*time.Second),
1✔
458
                        &workqueue.BucketRateLimiter{
1✔
459
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
460
                        },
1✔
461
                ),
1✔
462
                "delta")
1✔
463
}
1✔
464

465
func NewController(config *ControllerConfig, env Environment, log *logrus.Logger, unittestmode bool) *AciController {
1✔
466
        cont := &AciController{
1✔
467
                log:          log,
1✔
468
                config:       config,
1✔
469
                env:          env,
1✔
470
                defaultEg:    "",
1✔
471
                defaultSg:    "",
1✔
472
                unitTestMode: unittestmode,
1✔
473

1✔
474
                podQueue:              createQueue("pod"),
1✔
475
                netPolQueue:           createQueue("networkPolicy"),
1✔
476
                qosQueue:              createQueue("qos"),
1✔
477
                netflowQueue:          createQueue("netflow"),
1✔
478
                erspanQueue:           createQueue("erspan"),
1✔
479
                serviceQueue:          createQueue("service"),
1✔
480
                snatQueue:             createQueue("snat"),
1✔
481
                snatNodeInfoQueue:     createQueue("snatnodeinfo"),
1✔
482
                rdConfigQueue:         createQueue("rdconfig"),
1✔
483
                istioQueue:            createQueue("istio"),
1✔
484
                nodeFabNetAttQueue:    createQueue("nodefabricnetworkattachment"),
1✔
485
                netFabConfigQueue:     createQueue("networkfabricconfiguration"),
1✔
486
                nadVlanMapQueue:       createQueue("nadvlanmap"),
1✔
487
                fabricVlanPoolQueue:   createQueue("fabricvlanpool"),
1✔
488
                netFabL3ConfigQueue:   createQueue("networkfabricl3configuration"),
1✔
489
                remIpContQueue:        createQueue("remoteIpContainer"),
1✔
490
                epgDnCacheUpdateQueue: createQueue("epgDnCache"),
1✔
491
                syncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
492
                        &workqueue.BucketRateLimiter{
1✔
493
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
494
                        }, "sync"),
1✔
495

1✔
496
                configuredPodNetworkIps: newNetIps(),
1✔
497
                podNetworkIps:           newNetIps(),
1✔
498
                serviceIps:              ipam.NewIpCache(),
1✔
499
                staticServiceIps:        newNetIps(),
1✔
500
                nodeServiceIps:          newNetIps(),
1✔
501

1✔
502
                nodeACIPod:       make(map[string]aciPodAnnot),
1✔
503
                nodeACIPodAnnot:  make(map[string]aciPodAnnot),
1✔
504
                nodeOpflexDevice: make(map[string]apicapi.ApicSlice),
1✔
505

1✔
506
                nodeServiceMetaCache:        make(map[string]*nodeServiceMeta),
1✔
507
                nodePodNetCache:             make(map[string]*nodePodNetMeta),
1✔
508
                serviceMetaCache:            make(map[string]*serviceMeta),
1✔
509
                snatPolicyCache:             make(map[string]*ContSnatPolicy),
1✔
510
                snatServices:                make(map[string]bool),
1✔
511
                snatNodeInfoCache:           make(map[string]*nodeinfo.NodeInfo),
1✔
512
                rdConfigCache:               make(map[string]*rdConfig.RdConfig),
1✔
513
                rdConfigSubnetCache:         make(map[string]*rdConfig.RdConfigSpec),
1✔
514
                podIftoEp:                   make(map[string]*EndPointData),
1✔
515
                snatGlobalInfoCache:         make(map[string]map[string]*snatglobalinfo.GlobalInfo),
1✔
516
                istioCache:                  make(map[string]*istiov1.AciIstioOperator),
1✔
517
                crdHandlers:                 make(map[string]func(*AciController, <-chan struct{})),
1✔
518
                ctrPortNameCache:            make(map[string]*ctrPortNameEntry),
1✔
519
                nmPortNp:                    make(map[string]bool),
1✔
520
                hppRef:                      make(map[string]hppReference),
1✔
521
                additionalNetworkCache:      make(map[string]*AdditionalNetworkMeta),
1✔
522
                sharedEncapCache:            make(map[int]*sharedEncapData),
1✔
523
                sharedEncapAepCache:         make(map[string]map[int]bool),
1✔
524
                sharedEncapSviCache:         make(map[int]*NfL3Data),
1✔
525
                sharedEncapVrfCache:         make(map[string]*NfVrfData),
1✔
526
                sharedEncapTenantCache:      make(map[string]*NfTenantData),
1✔
527
                sharedEncapNfcCache:         make(map[int]*NfcData),
1✔
528
                sharedEncapNfcVlanMap:       make(map[int]*NfcData),
1✔
529
                sharedEncapNfcLabelMap:      make(map[string]*NfcData),
1✔
530
                sharedEncapNfcAppProfileMap: make(map[string]bool),
1✔
531
                sharedEncapLabelMap:         make(map[string][]int),
1✔
532
                lldpIfCache:                 make(map[string]*NfLLDPIfData),
1✔
533
                fabricVlanPoolMap:           make(map[string]map[string]string),
1✔
534
                openStackFabricPathDnMap:    make(map[string]openstackOpflexOdevInfo),
1✔
535
                hostFabricPathDnMap:         make(map[string]string),
1✔
536
                nsRemoteIpCont:              make(map[string]remoteIpConts),
1✔
537
        }
1✔
538
        cont.syncProcessors = map[string]func() bool{
1✔
539
                "snatGlobalInfo": cont.syncSnatGlobalInfo,
1✔
540
                "rdConfig":       cont.syncRdConfig,
1✔
541
                /* Commenting code to remove dependency from istio.io/istio package.
1✔
542
                   Vulnerabilties were detected by quay.io security scan of aci-containers-controller
1✔
543
                   and aci-containers-operator images for istio.io/istio package
1✔
544

1✔
545
                "istioCR":        cont.createIstioCR,
1✔
546
                */
1✔
547
        }
1✔
548
        return cont
1✔
549
}
1✔
550

551
func (cont *AciController) Init() {
×
552
        if cont.config.ChainedMode {
×
553
                cont.log.Info("In chained mode")
×
554
        }
×
555

556
        egdata, err := json.Marshal(cont.config.DefaultEg)
×
557
        if err != nil {
×
558
                cont.log.Error("Could not serialize default endpoint group")
×
559
                panic(err.Error())
×
560
        }
561
        cont.defaultEg = string(egdata)
×
562

×
563
        sgdata, err := json.Marshal(cont.config.DefaultSg)
×
564
        if err != nil {
×
565
                cont.log.Error("Could not serialize default security groups")
×
566
                panic(err.Error())
×
567
        }
568
        cont.defaultSg = string(sgdata)
×
569

×
570
        cont.log.Debug("Initializing IPAM")
×
571
        cont.initIpam()
×
572
        // check if the cluster supports endpoint slices
×
573
        // if cluster doesn't have the support fallback to endpoints
×
574
        kubeClient := cont.env.(*K8sEnvironment).kubeClient
×
575
        if util.IsEndPointSlicesSupported(kubeClient) {
×
576
                cont.serviceEndPoints = &serviceEndpointSlice{}
×
577
                cont.serviceEndPoints.(*serviceEndpointSlice).cont = cont
×
578
                cont.log.Info("Initializing ServiceEndpointSlices")
×
579
        } else {
×
580
                cont.serviceEndPoints = &serviceEndpoint{}
×
581
                cont.serviceEndPoints.(*serviceEndpoint).cont = cont
×
582
                cont.log.Info("Initializing ServiceEndpoints")
×
583
        }
×
584

585
        err = cont.env.Init(cont)
×
586
        if err != nil {
×
587
                panic(err.Error())
×
588
        }
589
}
590

591
func (cont *AciController) processQueue(queue workqueue.RateLimitingInterface,
592
        store cache.Store, handler func(interface{}) bool,
593
        deleteHandler func(string) bool,
594
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
595
        go wait.Until(func() {
2✔
596
                for {
2✔
597
                        key, quit := queue.Get()
1✔
598
                        if quit {
2✔
599
                                break
1✔
600
                        }
601

602
                        var requeue bool
1✔
603
                        switch key := key.(type) {
1✔
604
                        case chan struct{}:
×
605
                                close(key)
×
606
                        case string:
1✔
607
                                if strings.HasPrefix(key, "DELETED_") {
2✔
608
                                        delKey := strings.Trim(key, "DELETED_")
1✔
609
                                        requeue = deleteHandler(delKey)
1✔
610
                                } else {
2✔
611
                                        obj, exists, err := store.GetByKey(key)
1✔
612
                                        if err != nil {
1✔
613
                                                cont.log.Debugf("Error fetching object with key %s from store: %v", key, err)
×
614
                                        }
×
615
                                        //Handle Add/Update/Delete
616
                                        if exists && handler != nil {
2✔
617
                                                requeue = handler(obj)
1✔
618
                                        }
1✔
619
                                        //Handle Post Delete
620
                                        if !exists && postDelHandler != nil {
1✔
621
                                                requeue = postDelHandler()
×
622
                                        }
×
623
                                }
624
                        }
625
                        if requeue {
2✔
626
                                queue.AddRateLimited(key)
1✔
627
                        } else {
2✔
628
                                queue.Forget(key)
1✔
629
                        }
1✔
630
                        queue.Done(key)
1✔
631
                }
632
        }, time.Second, stopCh)
633
        <-stopCh
1✔
634
        queue.ShutDown()
1✔
635
}
636

637
func (cont *AciController) processRemIpContQueue(queue workqueue.RateLimitingInterface,
638
        handler func(interface{}) bool,
639
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
640
        go wait.Until(func() {
2✔
641
                for {
2✔
642
                        key, quit := queue.Get()
1✔
643
                        if quit {
2✔
644
                                break
1✔
645
                        }
646

647
                        var requeue bool
1✔
648
                        switch key := key.(type) {
1✔
649
                        case chan struct{}:
×
650
                                close(key)
×
651
                        case string:
1✔
652
                                if handler != nil {
2✔
653
                                        requeue = handler(key)
1✔
654
                                }
1✔
655
                                if postDelHandler != nil {
2✔
656
                                        requeue = postDelHandler()
1✔
657
                                }
1✔
658
                        }
659
                        if requeue {
1✔
660
                                queue.AddRateLimited(key)
×
661
                        } else {
1✔
662
                                queue.Forget(key)
1✔
663
                        }
1✔
664
                        queue.Done(key)
1✔
665

666
                }
667
        }, time.Second, stopCh)
668
        <-stopCh
1✔
669
        queue.ShutDown()
1✔
670
}
671

672
func (cont *AciController) processEpgDnCacheUpdateQueue(queue workqueue.RateLimitingInterface,
673
        handler func(interface{}) bool,
674
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
675
        go wait.Until(func() {
2✔
676
                for {
2✔
677
                        key, quit := queue.Get()
1✔
678
                        if quit {
2✔
679
                                break
1✔
680
                        }
681

682
                        var requeue bool
1✔
683
                        switch key := key.(type) {
1✔
684
                        case chan struct{}:
×
685
                                close(key)
×
686
                        case bool:
1✔
687
                                if handler != nil {
2✔
688
                                        requeue = handler(key)
1✔
689
                                }
1✔
690
                                if postDelHandler != nil {
1✔
691
                                        requeue = postDelHandler()
×
692
                                }
×
693
                        }
694
                        if requeue {
1✔
695
                                queue.AddRateLimited(key)
×
696
                        } else {
1✔
697
                                queue.Forget(key)
1✔
698
                        }
1✔
699
                        queue.Done(key)
1✔
700

701
                }
702
        }, time.Second, stopCh)
703
        <-stopCh
1✔
704
        queue.ShutDown()
1✔
705
}
706

707
func (cont *AciController) globalStaticObjs() apicapi.ApicSlice {
1✔
708
        return apicapi.ApicSlice{}
1✔
709
}
1✔
710

711
func (cont *AciController) aciNameForKey(ktype, key string) string {
1✔
712
        return util.AciNameForKey(cont.config.AciPrefix, ktype, key)
1✔
713
}
1✔
714

715
func (cont *AciController) initStaticObjs() {
1✔
716
        cont.env.InitStaticAciObjects()
1✔
717
        cont.apicConn.WriteStaticApicObjects(cont.config.AciPrefix+"_static",
1✔
718
                cont.globalStaticObjs())
1✔
719
}
1✔
720

721
func (cont *AciController) vmmDomainProvider() (vmmProv string) {
1✔
722
        vmmProv = "Kubernetes"
1✔
723
        if strings.ToLower(cont.config.AciVmmDomainType) == "openshift" {
1✔
724
                vmmProv = "OpenShift"
×
725
        }
×
726
        return
1✔
727
}
728

729
func (cont *AciController) Run(stopCh <-chan struct{}) {
1✔
730
        var err error
1✔
731
        var privKey []byte
1✔
732
        var apicCert []byte
1✔
733

1✔
734
        cont.config.AciVrfDn = "uni/tn-" + cont.config.AciVrfTenant + "/ctx-" + cont.config.AciVrf
1✔
735

1✔
736
        if cont.config.ApicPrivateKeyPath != "" {
1✔
737
                privKey, err = os.ReadFile(cont.config.ApicPrivateKeyPath)
×
738
                if err != nil {
×
739
                        panic(err)
×
740
                }
741
        }
742
        if cont.config.ApicCertPath != "" {
1✔
743
                apicCert, err = os.ReadFile(cont.config.ApicCertPath)
×
744
                if err != nil {
×
745
                        panic(err)
×
746
                }
747
        }
748
        // If not defined, default is 1800
749
        if cont.config.ApicRefreshTimer == "" {
2✔
750
                cont.config.ApicRefreshTimer = "1800"
1✔
751
        }
1✔
752
        refreshTimeout, err := strconv.Atoi(cont.config.ApicRefreshTimer)
1✔
753
        if err != nil {
1✔
754
                panic(err)
×
755
        }
756
        cont.log.Info("ApicRefreshTimer conf is set to: ", refreshTimeout)
1✔
757

1✔
758
        // Bailout if the refreshTimeout is more than 12Hours or less than 5Mins
1✔
759
        if refreshTimeout > (12*60*60) || refreshTimeout < (5*60) {
1✔
760
                cont.log.Info("ApicRefreshTimer can't be more than 12Hrs or less than 5Mins")
×
761
                panic(err)
×
762
        }
763

764
        // If RefreshTickerAdjustInterval is not defined, default to 150Sec.
765
        if cont.config.ApicRefreshTickerAdjust == "" {
2✔
766
                cont.config.ApicRefreshTickerAdjust = "210"
1✔
767
        }
1✔
768
        refreshTickerAdjust, err := strconv.Atoi(cont.config.ApicRefreshTickerAdjust)
1✔
769
        if err != nil {
1✔
770
                panic(err)
×
771
        }
772

773
        //If ApicSubscriptionDelay is not defined, default to 100ms
774
        if cont.config.ApicSubscriptionDelay == 0 {
2✔
775
                cont.config.ApicSubscriptionDelay = 100
1✔
776
        }
1✔
777
        cont.log.Info("ApicSubscriptionDelay conf is set to: ", cont.config.ApicSubscriptionDelay)
1✔
778

1✔
779
        // If OpflexDeviceDeleteTimeout is not defined, default to 1800s
1✔
780
        if cont.config.OpflexDeviceDeleteTimeout == 0 {
2✔
781
                cont.config.OpflexDeviceDeleteTimeout = 1800
1✔
782
        }
1✔
783

784
        // If OpflexDeviceReconnectWaitTimeout is not defined, default to 25s
785
        if cont.config.OpflexDeviceReconnectWaitTimeout == 0 {
2✔
786
                cont.config.OpflexDeviceReconnectWaitTimeout = 25
1✔
787
        }
1✔
788
        cont.log.Debug("OpflexDeviceReconnectWaitTimeout set to: ", cont.config.OpflexDeviceReconnectWaitTimeout)
1✔
789

1✔
790
        // If SleepTimeSnatGlobalInfoSync is not defined, default to 60
1✔
791
        if cont.config.SleepTimeSnatGlobalInfoSync == 0 {
2✔
792
                cont.config.SleepTimeSnatGlobalInfoSync = 60
1✔
793
        }
1✔
794

795
        // If not defined, default to 32
796
        if cont.config.PodIpPoolChunkSize == 0 {
2✔
797
                cont.config.PodIpPoolChunkSize = 32
1✔
798
        }
1✔
799
        if !cont.config.ChainedMode {
2✔
800
                cont.log.Info("PodIpPoolChunkSize conf is set to: ", cont.config.PodIpPoolChunkSize)
1✔
801
        }
1✔
802

803
        // If ApicConnectionRetryLimit is not defined, default to 5
804
        if cont.config.ApicConnectionRetryLimit == 0 {
2✔
805
                cont.config.ApicConnectionRetryLimit = 5
1✔
806
        }
1✔
807
        cont.log.Debug("ApicConnectionRetryLimit set to: ", cont.config.ApicConnectionRetryLimit)
1✔
808

1✔
809
        // If not valid, default to 5000-65000
1✔
810
        // other permissible values 1-65000
1✔
811
        defStart := 5000
1✔
812
        defEnd := 65000
1✔
813
        if cont.config.SnatDefaultPortRangeStart == 0 {
2✔
814
                cont.config.SnatDefaultPortRangeStart = defStart
1✔
815
        }
1✔
816
        if cont.config.SnatDefaultPortRangeEnd == 0 {
2✔
817
                cont.config.SnatDefaultPortRangeEnd = defEnd
1✔
818
        }
1✔
819
        if cont.config.SnatDefaultPortRangeStart < 0 || cont.config.SnatDefaultPortRangeEnd < 0 ||
1✔
820
                cont.config.SnatDefaultPortRangeStart > defEnd || cont.config.SnatDefaultPortRangeEnd > defEnd ||
1✔
821
                cont.config.SnatDefaultPortRangeStart > cont.config.SnatDefaultPortRangeEnd {
1✔
822
                cont.config.SnatDefaultPortRangeStart = defStart
×
823
                cont.config.SnatDefaultPortRangeEnd = defEnd
×
824
        }
×
825

826
        // Set default value for pbr programming delay if services list is not empty
827
        // and delay value is empty
828
        if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 &&
1✔
829
                cont.config.ServiceGraphEndpointAddDelay.Services != nil &&
1✔
830
                len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 {
1✔
831
                cont.config.ServiceGraphEndpointAddDelay.Delay = 90
×
832
        }
×
833
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
834
                cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay)
×
835
        }
×
836

837
        // Set contract scope for snat svc graph to global by default
838
        if cont.config.SnatSvcContractScope == "" {
2✔
839
                cont.config.SnatSvcContractScope = "global"
1✔
840
        }
1✔
841
        if cont.config.MaxSvcGraphNodes == 0 {
2✔
842
                cont.config.MaxSvcGraphNodes = 32
1✔
843
        }
1✔
844
        if !cont.config.ChainedMode {
2✔
845
                cont.log.Info("Max number of nodes per svc graph is set to: ", cont.config.MaxSvcGraphNodes)
1✔
846
        }
1✔
847
        cont.apicConn, err = apicapi.New(cont.log, cont.config.ApicHosts,
1✔
848
                cont.config.ApicUsername, cont.config.ApicPassword,
1✔
849
                privKey, apicCert, cont.config.AciPrefix,
1✔
850
                refreshTimeout, refreshTickerAdjust, cont.config.ApicSubscriptionDelay,
1✔
851
                cont.config.AciVrfTenant, cont.UpdateLLDPIfLocked)
1✔
852
        if err != nil {
1✔
853
                panic(err)
×
854
        }
855

856
        cont.apicConn.ReconnectRetryLimit = cont.config.ApicConnectionRetryLimit
1✔
857

1✔
858
        if len(cont.config.ApicHosts) != 0 {
1✔
859
        APIC_SWITCH:
×
860
                cont.log.WithFields(logrus.Fields{
×
861
                        "mod":  "APICAPI",
×
862
                        "host": cont.apicConn.Apic[cont.apicConn.ApicIndex],
×
863
                }).Debug("Connecting to APIC to determine the Version")
×
864

×
865
                version, err := cont.apicConn.GetVersion()
×
866
                if err != nil {
×
867
                        cont.log.Error("Could not get APIC version, switching to next APIC")
×
868
                        cont.apicConn.ApicIndex = (cont.apicConn.ApicIndex + 1) % len(cont.apicConn.Apic)
×
869
                        time.Sleep(cont.apicConn.ReconnectInterval)
×
870
                        goto APIC_SWITCH
×
871
                }
872
                cont.apicConn.CachedVersion = version
×
873
                apicapi.ApicVersion = version
×
874
                if version >= "4.2(4i)" {
×
875
                        cont.apicConn.SnatPbrFltrChain = true
×
876
                } else {
×
877
                        cont.apicConn.SnatPbrFltrChain = false
×
878
                }
×
879
                if version >= "5.2" {
×
880
                        cont.vmmClusterFaultSupported = true
×
881
                }
×
882
        } else { // For unit-tests
1✔
883
                cont.apicConn.SnatPbrFltrChain = true
1✔
884
        }
1✔
885

886
        if !cont.config.ChainedMode {
2✔
887
                cont.log.Debug("SnatPbrFltrChain set to:", cont.apicConn.SnatPbrFltrChain)
1✔
888
                // Make sure Pod/NodeBDs are assoicated to same VRF.
1✔
889
                if len(cont.config.ApicHosts) != 0 && cont.config.AciPodBdDn != "" && cont.config.AciNodeBdDn != "" {
1✔
890
                        var expectedVrfRelations []string
×
891
                        expectedVrfRelations = append(expectedVrfRelations, cont.config.AciPodBdDn, cont.config.AciNodeBdDn)
×
892
                        cont.log.Debug("expectedVrfRelations:", expectedVrfRelations)
×
893
                        err = cont.apicConn.ValidateAciVrfAssociation(cont.config.AciVrfDn, expectedVrfRelations)
×
894
                        if err != nil {
×
895
                                cont.log.Error("Pod/NodeBDs and AciL3Out VRF association is incorrect")
×
896
                                panic(err)
×
897
                        }
898
                }
899
        }
900

901
        if len(cont.config.ApicHosts) != 0 && cont.vmmClusterFaultSupported && !cont.config.ChainedMode {
1✔
902
                //Clear fault instances when the controller starts
×
903
                cont.clearFaultInstances()
×
904
                //Subscribe for vmmEpPD for a given domain
×
905
                var tnTargetFilterEpg string
×
906
                tnTargetFilterEpg += fmt.Sprintf("uni/vmmp-%s/dom-%s/", cont.vmmDomainProvider(), cont.config.AciVmmDomain)
×
907
                subnetTargetFilterEpg := fmt.Sprintf("and(wcard(vmmEpPD.dn,\"%s\"))", tnTargetFilterEpg)
×
908
                cont.apicConn.AddSubscriptionClass("vmmEpPD",
×
909
                        []string{"vmmEpPD"}, subnetTargetFilterEpg)
×
910
                cont.apicConn.SetSubscriptionHooks("vmmEpPD",
×
911
                        func(obj apicapi.ApicObject) bool {
×
912
                                cont.vmmEpPDChanged(obj)
×
913
                                return true
×
914
                        },
×
915
                        func(dn string) {
×
916
                                cont.vmmEpPDDeleted(dn)
×
917
                        })
×
918
        }
919

920
        cont.initStaticObjs()
1✔
921

1✔
922
        err = cont.env.PrepareRun(stopCh)
1✔
923
        if err != nil {
1✔
924
                panic(err.Error())
×
925
        }
926

927
        cont.apicConn.FullSyncHook = func() {
1✔
928
                // put a channel into each work queue and wait on it to
×
929
                // checkpoint object syncing in response to new subscription
×
930
                // updates
×
931
                cont.log.Debug("Starting checkpoint")
×
932
                var chans []chan struct{}
×
933
                qs := make([]workqueue.RateLimitingInterface, 0)
×
934
                _, ok := cont.env.(*K8sEnvironment)
×
935
                if ok {
×
936
                        qs = []workqueue.RateLimitingInterface{cont.podQueue}
×
937
                        if !cont.config.ChainedMode {
×
938
                                if !cont.config.DisableHppRendering {
×
939
                                        qs = append(qs, cont.netPolQueue)
×
940
                                }
×
941
                                if cont.config.EnableHppDirect {
×
942
                                        qs = append(qs, cont.remIpContQueue)
×
943
                                }
×
944
                                qs = append(qs, cont.qosQueue, cont.serviceQueue,
×
945
                                        cont.snatQueue, cont.netflowQueue, cont.snatNodeInfoQueue,
×
946
                                        cont.rdConfigQueue, cont.erspanQueue,
×
947
                                        cont.epgDnCacheUpdateQueue)
×
948
                        }
949
                }
950
                for _, q := range qs {
×
951
                        c := make(chan struct{})
×
952
                        chans = append(chans, c)
×
953
                        q.Add(c)
×
954
                }
×
955
                for _, c := range chans {
×
956
                        <-c
×
957
                }
×
958
                cont.log.Debug("Checkpoint complete")
×
959
        }
960

961
        if len(cont.config.ApicHosts) != 0 && !cont.config.ChainedMode {
1✔
962
                cont.BuildSubnetDnCache(cont.config.AciVrfDn, cont.config.AciVrfDn)
×
963
                cont.scheduleRdConfig()
×
964
                if strings.Contains(cont.config.Flavor, "openstack") {
×
965
                        cont.setOpenStackSystemId()
×
966
                }
×
967
        }
968

969
        if !cont.config.ChainedMode {
2✔
970
                if cont.config.AciPolicyTenant != cont.config.AciVrfTenant {
2✔
971
                        cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
972
                                []string{"hostprotPol"})
1✔
973
                }
1✔
974
        } else {
1✔
975
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
976
                        []string{"fvBD", "fvAp"})
1✔
977
                cont.apicConn.AddSubscriptionClass("fvnsVlanInstP",
1✔
978
                        []string{"fvnsVlanInstP"}, "")
1✔
979
                cont.apicConn.AddSubscriptionClass("infraRsDomP",
1✔
980
                        []string{"infraRsDomP"}, "")
1✔
981
                cont.apicConn.AddSubscriptionClass("physDomP",
1✔
982
                        []string{"physDomP"}, "")
1✔
983
                cont.apicConn.AddSubscriptionClass("l3extDomP",
1✔
984
                        []string{"l3extDomP"}, "")
1✔
985
                cont.apicConn.AddSubscriptionClass("infraRsVlanNs",
1✔
986
                        []string{"infraRsVlanNs"}, "")
1✔
987
                cont.apicConn.AddSubscriptionClass("infraGeneric",
1✔
988
                        []string{"infraGeneric", "infraRsFuncToEpg"}, "")
1✔
989
                cont.apicConn.AddSubscriptionClass("l3extOut",
1✔
990
                        []string{"l3extInstP", "l3extSubnet", "fvRsCons", "fvRsProv", "l3extRsEctx", "l3extRsL3DomAtt", "l3extLNodeP", "l3extRsNodeL3OutAtt", "ipRouteP", "ipNexthopP", "l3extLIfP", "l3extVirtualLIfP", "l3extRsDynPathAtt",
1✔
991
                                "l3extRsPathL3OutAtt", "l3extMember", "l3extIp", "bgpExtP", "bgpPeerP", "bgpAsP", "bgpLocalAsnP", "bgpRsPeerPfxPol"}, "")
1✔
992
                cont.apicConn.AddSubscriptionClass("bgpPeerPfxPol",
1✔
993
                        []string{"bgpPeerPfxPol"}, "")
1✔
994
        }
1✔
995
        if !cont.config.ChainedMode {
2✔
996
                // When a new class is added for subscriptio, check if its name attribute
1✔
997
                // is in the format aciPrefix-<some value>, if so add it in nameAttrClass
1✔
998
                // in apicapi.go
1✔
999
                subscribeMo := []string{"fvBD", "vnsLDevVip", "vnsAbsGraph", "vnsLDevCtx",
1✔
1000
                        "vzFilter", "vzBrCP", "l3extInstP", "vnsSvcRedirectPol",
1✔
1001
                        "vnsRedirectHealthGroup", "fvIPSLAMonitoringPol"}
1✔
1002
                if cont.config.AciPolicyTenant == cont.config.AciVrfTenant {
1✔
1003
                        subscribeMo = append(subscribeMo, "hostprotPol")
×
1004
                }
×
1005
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciVrfTenant,
1✔
1006
                        subscribeMo)
1✔
1007
                cont.apicConn.AddSubscriptionDn(fmt.Sprintf("uni/tn-%s/out-%s",
1✔
1008
                        cont.config.AciVrfTenant, cont.config.AciL3Out),
1✔
1009
                        []string{"fvRsCons"})
1✔
1010
                vmmDn := fmt.Sprintf("comp/prov-%s/ctrlr-[%s]-%s/injcont",
1✔
1011
                        cont.env.VmmPolicy(), cont.config.AciVmmDomain,
1✔
1012
                        cont.config.AciVmmController)
1✔
1013
                // Before subscribing to vmm objects, add vmmInjectedLabel as a child after explicit APIC version check
1✔
1014
                // Since it is not supported for APIC versions < "5.0"
1✔
1015
                cont.addVmmInjectedLabel()
1✔
1016
                cont.apicConn.AddSubscriptionDn(vmmDn,
1✔
1017
                        []string{"vmmInjectedHost", "vmmInjectedNs"})
1✔
1018

1✔
1019
                var tnTargetFilter string
1✔
1020
                if len(cont.config.AciVrfRelatedTenants) > 0 {
1✔
1021
                        for _, tn := range cont.config.AciVrfRelatedTenants {
×
1022
                                tnTargetFilter += fmt.Sprintf("tn-%s/|", tn)
×
1023
                        }
×
1024
                } else {
1✔
1025
                        tnTargetFilter += fmt.Sprintf("tn-%s/|tn-%s/",
1✔
1026
                                cont.config.AciPolicyTenant, cont.config.AciVrfTenant)
1✔
1027
                }
1✔
1028
                subnetTargetFilter := fmt.Sprintf("and(wcard(fvSubnet.dn,\"%s\"))",
1✔
1029
                        tnTargetFilter)
1✔
1030
                cont.apicConn.AddSubscriptionClass("fvSubnet",
1✔
1031
                        []string{"fvSubnet"}, subnetTargetFilter)
1✔
1032

1✔
1033
                cont.apicConn.SetSubscriptionHooks("fvSubnet",
1✔
1034
                        func(obj apicapi.ApicObject) bool {
1✔
1035
                                cont.SubnetChanged(obj, cont.config.AciVrfDn)
×
1036
                                return true
×
1037
                        },
×
1038
                        func(dn string) {
×
1039
                                cont.SubnetDeleted(dn)
×
1040
                        })
×
1041

1042
                var opflexODevFilter string
1✔
1043
                if strings.Contains(cont.config.Flavor, "openstack") {
1✔
1044
                        opflexODevFilter = fmt.Sprintf("or(eq(opflexODev.domName,\"%s\"),wcard(opflexODev.compHvDn,\"prov-OpenStack\"))", cont.config.AciVmmDomain)
×
1045
                } else {
1✔
1046
                        opflexODevFilter = fmt.Sprintf("eq(opflexODev.domName,\"%s\")", cont.config.AciVmmDomain)
1✔
1047
                }
1✔
1048
                cont.apicConn.AddSubscriptionClass("opflexODev",
1✔
1049
                        []string{"opflexODev"}, opflexODevFilter)
1✔
1050

1✔
1051
                cont.apicConn.SetSubscriptionHooks("opflexODev",
1✔
1052
                        func(obj apicapi.ApicObject) bool {
1✔
1053
                                cont.opflexDeviceChanged(obj)
×
1054
                                return true
×
1055
                        },
×
1056
                        func(dn string) {
×
1057
                                cont.opflexDeviceDeleted(dn)
×
1058
                        })
×
1059

1060
                if !cont.config.DisableServiceVlanPreprovisioning && !strings.Contains(cont.config.Flavor, "openstack") {
2✔
1061
                        if cont.config.AEP == "" {
2✔
1062
                                cont.log.Error("AEP is missing in configuration, preprovisioning of service vlan will be disabled")
1✔
1063
                        } else {
1✔
1064
                                infraRtAttEntPFilter := fmt.Sprintf("and(wcard(infraRtAttEntP.dn,\"/attentp-%s/\"))", cont.config.AEP)
×
1065
                                cont.apicConn.AddSubscriptionClass("infraRtAttEntP",
×
1066
                                        []string{"infraRtAttEntP"}, infraRtAttEntPFilter)
×
NEW
1067

×
NEW
1068
                                // For bare metal, the infraRtAttEntP associated with an AEP will be empty.
×
NEW
1069
                                // We should not receive any updates for such cases.
×
1070
                                cont.apicConn.SetSubscriptionHooks("infraRtAttEntP",
×
1071
                                        func(obj apicapi.ApicObject) bool {
×
1072
                                                cont.infraRtAttEntPChanged(obj)
×
1073
                                                return true
×
1074
                                        },
×
1075
                                        func(dn string) {
×
1076
                                                cont.infraRtAttEntPDeleted(dn)
×
1077
                                        })
×
1078
                        }
1079
                }
1080

1081
                cont.apicConn.VersionUpdateHook =
1✔
1082
                        func() {
1✔
1083
                                cont.initStaticServiceObjs()
×
1084
                        }
×
1085
        }
1086
        go cont.apicConn.Run(stopCh)
1✔
1087
}
1088

1089
func (cont *AciController) syncNodeAciPods(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1090
        cont.log.Debug("Go routine to periodically check the aci pod information in the opflexOdev of a node")
1✔
1091
        ticker := time.NewTicker(seconds * time.Second)
1✔
1092
        defer ticker.Stop()
1✔
1093

1✔
1094
        for {
2✔
1095
                select {
1✔
1096
                case <-ticker.C:
1✔
1097
                        if cont.config.EnableOpflexAgentReconnect {
1✔
1098
                                cont.checkChangeOfOpflexOdevAciPod()
×
1099
                        }
×
1100
                        if cont.config.AciMultipod {
1✔
1101
                                cont.checkChangeOfOdevAciPod()
×
1102
                        }
×
1103
                case <-stopCh:
1✔
1104
                        return
1✔
1105
                }
1106
        }
1107
}
1108

1109
func (cont *AciController) syncOpflexDevices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1110
        cont.log.Debug("Go routine to periodically delete old opflexdevices started")
1✔
1111
        ticker := time.NewTicker(seconds * time.Second)
1✔
1112
        defer ticker.Stop()
1✔
1113

1✔
1114
        for {
2✔
1115
                select {
1✔
1116
                case <-ticker.C:
1✔
1117
                        cont.deleteOldOpflexDevices()
1✔
1118
                case <-stopCh:
1✔
1119
                        return
1✔
1120
                }
1121
        }
1122
}
1123

1124
func (cont *AciController) syncDelayedEpSlices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1125
        cont.log.Debug("Go routine to periodically check and process the epslices having delay adding in service")
1✔
1126
        ticker := time.NewTicker(seconds * time.Second)
1✔
1127
        defer ticker.Stop()
1✔
1128

1✔
1129
        for {
2✔
1130
                select {
1✔
1131
                case <-ticker.C:
1✔
1132
                        cont.processDelayedEpSlices()
1✔
1133
                case <-stopCh:
1✔
1134
                        return
1✔
1135
                }
1136
        }
1137
}
1138

1139
func (cont *AciController) snatGlobalInfoSync(stopCh <-chan struct{}, seconds int) {
1✔
1140
        time.Sleep(time.Duration(seconds) * time.Second)
1✔
1141
        cont.log.Debug("Go routine to periodically sync globalinfo and nodeinfo started")
1✔
1142
        iteration := 0
1✔
1143
        for {
2✔
1144
                // To avoid noisy logs, only printing once in 5 minutes
1✔
1145
                if iteration%5 == 0 {
2✔
1146
                        cont.log.Debug("Syncing GlobalInfo with Node infos")
1✔
1147
                }
1✔
1148
                var nodeInfos []*nodeinfo.NodeInfo
1✔
1149
                cont.indexMutex.Lock()
1✔
1150
                cache.ListAll(cont.snatNodeInfoIndexer, labels.Everything(),
1✔
1151
                        func(nodeInfoObj interface{}) {
2✔
1152
                                nodeInfo := nodeInfoObj.(*nodeinfo.NodeInfo)
1✔
1153
                                nodeInfos = append(nodeInfos, nodeInfo)
1✔
1154
                        })
1✔
1155
                expectedmap := make(map[string]map[string]bool)
1✔
1156
                for _, glinfo := range cont.snatGlobalInfoCache {
2✔
1157
                        for nodename, entry := range glinfo {
2✔
1158
                                if _, found := expectedmap[nodename]; !found {
2✔
1159
                                        newentry := make(map[string]bool)
1✔
1160
                                        newentry[entry.SnatPolicyName] = true
1✔
1161
                                        expectedmap[nodename] = newentry
1✔
1162
                                } else {
2✔
1163
                                        currententry := expectedmap[nodename]
1✔
1164
                                        currententry[entry.SnatPolicyName] = true
1✔
1165
                                        expectedmap[nodename] = currententry
1✔
1166
                                }
1✔
1167
                        }
1168
                }
1169
                cont.indexMutex.Unlock()
1✔
1170

1✔
1171
                for _, value := range nodeInfos {
2✔
1172
                        marked := false
1✔
1173
                        policyNames := value.Spec.SnatPolicyNames
1✔
1174
                        nodeName := value.ObjectMeta.Name
1✔
1175
                        _, ok := expectedmap[nodeName]
1✔
1176
                        if !ok && len(policyNames) > 0 {
2✔
1177
                                cont.log.Info("Adding missing entry in snatglobalinfo for node: ", nodeName)
1✔
1178
                                cont.log.Debug("No snat policies found in snatglobalinfo")
1✔
1179
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1180
                                marked = true
1✔
1181
                        } else if len(policyNames) != len(expectedmap[nodeName]) {
3✔
1182
                                cont.log.Info("Adding missing snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1183
                                cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1184
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1185
                                marked = true
1✔
1186
                        } else {
2✔
1187
                                if len(policyNames) == 0 && len(expectedmap[nodeName]) == 0 {
1✔
1188
                                        // No snatpolicies present
×
1189
                                        continue
×
1190
                                }
1191
                                eq := reflect.DeepEqual(expectedmap[nodeName], policyNames)
1✔
1192
                                if !eq {
2✔
1193
                                        cont.log.Debug("Syncing inconsistent snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1194
                                        cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1195
                                        cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1196
                                        marked = true
1✔
1197
                                }
1✔
1198
                        }
1199
                        if marked {
2✔
1200
                                cont.log.Info("Nodeinfo and globalinfo out of sync for node: ", nodeName)
1✔
1201
                                nodeinfokey, err := cache.MetaNamespaceKeyFunc(value)
1✔
1202
                                if err != nil {
1✔
1203
                                        cont.log.Error("Not able to get key for node: ", nodeName)
×
1204
                                        continue
×
1205
                                }
1206
                                cont.log.Info("Queuing nodeinfokey for globalinfo sync: ", nodeinfokey)
1✔
1207
                                cont.queueNodeInfoUpdateByKey(nodeinfokey)
1✔
1208
                        } else if iteration%5 == 0 {
2✔
1209
                                cont.log.Info("Nodeinfo and globalinfo in sync for node: ", nodeName)
1✔
1210
                        }
1✔
1211
                }
1212
                time.Sleep(time.Duration(seconds) * time.Second)
1✔
1213
                iteration++
1✔
1214
        }
1215
}
1216

1217
func (cont *AciController) processSyncQueue(queue workqueue.RateLimitingInterface,
1218
        queueStop <-chan struct{}) {
1✔
1219
        go wait.Until(func() {
2✔
1220
                for {
2✔
1221
                        syncType, quit := queue.Get()
1✔
1222
                        if quit {
2✔
1223
                                break
1✔
1224
                        }
1225
                        var requeue bool
1✔
1226
                        if sType, ok := syncType.(string); ok {
2✔
1227
                                if f, ok := cont.syncProcessors[sType]; ok {
2✔
1228
                                        requeue = f()
1✔
1229
                                }
1✔
1230
                        }
1231
                        if requeue {
1✔
1232
                                queue.AddRateLimited(syncType)
×
1233
                        } else {
1✔
1234
                                queue.Forget(syncType)
1✔
1235
                        }
1✔
1236
                        queue.Done(syncType)
1✔
1237
                }
1238
        }, time.Second, queueStop)
1239
        <-queueStop
1✔
1240
        queue.ShutDown()
1✔
1241
}
1242

1243
func (cont *AciController) scheduleSyncGlobalInfo() {
1✔
1244
        cont.syncQueue.AddRateLimited("snatGlobalInfo")
1✔
1245
}
1✔
1246
func (cont *AciController) scheduleRdConfig() {
×
1247
        cont.syncQueue.AddRateLimited("rdConfig")
×
1248
}
×
1249
func (cont *AciController) scheduleCreateIstioCR() {
×
1250
        cont.syncQueue.AddRateLimited("istioCR")
×
1251
}
×
1252

1253
func (cont *AciController) addVmmInjectedLabel() {
1✔
1254
        if apicapi.ApicVersion >= "5.2" {
1✔
1255
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedLabel")
×
1256
                if err != nil {
×
1257
                        panic(err.Error())
×
1258
                }
1259
                err = apicapi.AddMetaDataChild("vmmInjectedSvc", "vmmInjectedLabel")
×
1260
                if err != nil {
×
1261
                        panic(err.Error())
×
1262
                }
1263
        }
1264
        if apicapi.ApicVersion >= "5.0" {
2✔
1265
                err := apicapi.AddMetaDataChild("vmmInjectedReplSet", "vmmInjectedLabel")
1✔
1266
                if err != nil {
1✔
1267
                        panic(err.Error())
×
1268
                }
1269
                err = apicapi.AddMetaDataChild("vmmInjectedContGrp", "vmmInjectedLabel")
1✔
1270
                if err != nil {
1✔
1271
                        panic(err.Error())
×
1272
                }
1273
                err = apicapi.AddMetaDataChild("vmmInjectedDepl", "vmmInjectedLabel")
1✔
1274
                if err != nil {
1✔
1275
                        panic(err.Error())
×
1276
                }
1277
        }
1278
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc