• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 9963

25 Sep 2024 03:22PM UTC coverage: 69.55% (-0.3%) from 69.882%
9963

push

travis-pro

web-flow
Merge pull request #1404 from noironetworks/backport_lldpif_updates

LLDPIf Cache updates

73 of 197 new or added lines in 3 files covered. (37.06%)

10 existing lines in 2 files now uncovered.

13138 of 18890 relevant lines covered (69.55%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.35
/pkg/controller/controller.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "encoding/json"
19
        "fmt"
20
        "net"
21
        "os"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/sirupsen/logrus"
29
        "github.com/yl2chen/cidranger"
30
        "golang.org/x/time/rate"
31

32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        v1net "k8s.io/api/networking/v1"
35
        "k8s.io/apimachinery/pkg/labels"
36
        "k8s.io/apimachinery/pkg/util/wait"
37
        "k8s.io/client-go/kubernetes"
38
        "k8s.io/client-go/tools/cache"
39
        "k8s.io/client-go/util/workqueue"
40

41
        "github.com/noironetworks/aci-containers/pkg/apicapi"
42
        fabattv1 "github.com/noironetworks/aci-containers/pkg/fabricattachment/apis/aci.fabricattachment/v1"
43
        fabattclset "github.com/noironetworks/aci-containers/pkg/fabricattachment/clientset/versioned"
44
        hppv1 "github.com/noironetworks/aci-containers/pkg/hpp/apis/aci.hpp/v1"
45
        "github.com/noironetworks/aci-containers/pkg/index"
46
        "github.com/noironetworks/aci-containers/pkg/ipam"
47
        istiov1 "github.com/noironetworks/aci-containers/pkg/istiocrd/apis/aci.istio/v1"
48
        "github.com/noironetworks/aci-containers/pkg/metadata"
49
        nodeinfo "github.com/noironetworks/aci-containers/pkg/nodeinfo/apis/aci.snat/v1"
50
        rdConfig "github.com/noironetworks/aci-containers/pkg/rdconfig/apis/aci.snat/v1"
51
        snatglobalinfo "github.com/noironetworks/aci-containers/pkg/snatglobalinfo/apis/aci.snat/v1"
52
        "github.com/noironetworks/aci-containers/pkg/util"
53
)
54

55
type podUpdateFunc func(*v1.Pod) (*v1.Pod, error)
56
type nodeUpdateFunc func(*v1.Node) (*v1.Node, error)
57
type serviceUpdateFunc func(*v1.Service) (*v1.Service, error)
58
type listNetworkPoliciesFunc func(string) (*v1net.NetworkPolicyList, error)
59
type listNamespacesFunc func() (*v1.NamespaceList, error)
60

61
type AciController struct {
62
        log    *logrus.Logger
63
        config *ControllerConfig
64
        env    Environment
65

66
        defaultEg string
67
        defaultSg string
68

69
        unitTestMode bool
70

71
        podQueue              workqueue.RateLimitingInterface
72
        netPolQueue           workqueue.RateLimitingInterface
73
        qosQueue              workqueue.RateLimitingInterface
74
        serviceQueue          workqueue.RateLimitingInterface
75
        snatQueue             workqueue.RateLimitingInterface
76
        netflowQueue          workqueue.RateLimitingInterface
77
        erspanQueue           workqueue.RateLimitingInterface
78
        snatNodeInfoQueue     workqueue.RateLimitingInterface
79
        rdConfigQueue         workqueue.RateLimitingInterface
80
        istioQueue            workqueue.RateLimitingInterface
81
        nodeFabNetAttQueue    workqueue.RateLimitingInterface
82
        netFabConfigQueue     workqueue.RateLimitingInterface
83
        nadVlanMapQueue       workqueue.RateLimitingInterface
84
        fabricVlanPoolQueue   workqueue.RateLimitingInterface
85
        netFabL3ConfigQueue   workqueue.RateLimitingInterface
86
        remIpContQueue        workqueue.RateLimitingInterface
87
        epgDnCacheUpdateQueue workqueue.RateLimitingInterface
88
        lldpIfQueue           workqueue.RateLimitingInterface
89

90
        namespaceIndexer                     cache.Indexer
91
        namespaceInformer                    cache.Controller
92
        podIndexer                           cache.Indexer
93
        podInformer                          cache.Controller
94
        endpointsIndexer                     cache.Indexer
95
        endpointsInformer                    cache.Controller
96
        serviceIndexer                       cache.Indexer
97
        serviceInformer                      cache.Controller
98
        replicaSetIndexer                    cache.Indexer
99
        replicaSetInformer                   cache.Controller
100
        deploymentIndexer                    cache.Indexer
101
        deploymentInformer                   cache.Controller
102
        nodeIndexer                          cache.Indexer
103
        nodeInformer                         cache.Controller
104
        networkPolicyIndexer                 cache.Indexer
105
        networkPolicyInformer                cache.Controller
106
        snatIndexer                          cache.Indexer
107
        snatInformer                         cache.Controller
108
        snatNodeInfoIndexer                  cache.Indexer
109
        snatNodeInformer                     cache.Controller
110
        crdInformer                          cache.Controller
111
        rdConfigInformer                     cache.Controller
112
        rdConfigIndexer                      cache.Indexer
113
        qosIndexer                           cache.Indexer
114
        qosInformer                          cache.Controller
115
        netflowIndexer                       cache.Indexer
116
        netflowInformer                      cache.Controller
117
        erspanIndexer                        cache.Indexer
118
        erspanInformer                       cache.Controller
119
        nodePodIfIndexer                     cache.Indexer
120
        nodePodIfInformer                    cache.Controller
121
        istioIndexer                         cache.Indexer
122
        istioInformer                        cache.Controller
123
        endpointSliceIndexer                 cache.Indexer
124
        endpointSliceInformer                cache.Controller
125
        snatCfgInformer                      cache.Controller
126
        updatePod                            podUpdateFunc
127
        updateNode                           nodeUpdateFunc
128
        updateServiceStatus                  serviceUpdateFunc
129
        listNetworkPolicies                  listNetworkPoliciesFunc
130
        listNamespaces                       listNamespacesFunc
131
        nodeFabNetAttInformer                cache.SharedIndexInformer
132
        netFabConfigInformer                 cache.SharedIndexInformer
133
        nadVlanMapInformer                   cache.SharedIndexInformer
134
        fabricVlanPoolInformer               cache.SharedIndexInformer
135
        networkFabricL3ConfigurationInformer cache.SharedIndexInformer
136
        fabNetAttClient                      *fabattclset.Clientset
137
        proactiveConfInformer                cache.SharedIndexInformer
138

139
        indexMutex sync.Mutex
140
        hppMutex   sync.Mutex
141

142
        configuredPodNetworkIps *netIps
143
        podNetworkIps           *netIps
144
        serviceIps              *ipam.IpCache
145
        staticServiceIps        *netIps
146
        nodeServiceIps          *netIps
147

148
        // index of pods matched by deployments
149
        depPods *index.PodSelectorIndex
150
        // index of pods matched by network policies
151
        netPolPods *index.PodSelectorIndex
152
        // index of pods matched by network policy ingress rules
153
        netPolIngressPods *index.PodSelectorIndex
154
        // index of pods matched by network policy egress rules
155
        netPolEgressPods *index.PodSelectorIndex
156
        // index of IP addresses contained in endpoints objects
157
        endpointsIpIndex cidranger.Ranger
158
        // index of service target ports
159
        targetPortIndex map[string]*portIndexEntry
160
        // index of ip blocks referenced by network policy egress rules
161
        netPolSubnetIndex cidranger.Ranger
162
        // index of pods matched by erspan policies
163
        erspanPolPods *index.PodSelectorIndex
164

165
        apicConn *apicapi.ApicConnection
166

167
        nodeServiceMetaCache map[string]*nodeServiceMeta
168
        nodeACIPod           map[string]aciPodAnnot
169
        nodeACIPodAnnot      map[string]aciPodAnnot
170
        nodeOpflexDevice     map[string]apicapi.ApicSlice
171
        nodePodNetCache      map[string]*nodePodNetMeta
172
        serviceMetaCache     map[string]*serviceMeta
173
        snatPolicyCache      map[string]*ContSnatPolicy
174
        delayedEpSlices      []*DelayedEpSlice
175
        snatServices         map[string]bool
176
        snatNodeInfoCache    map[string]*nodeinfo.NodeInfo
177
        rdConfigCache        map[string]*rdConfig.RdConfig
178
        rdConfigSubnetCache  map[string]*rdConfig.RdConfigSpec
179
        istioCache           map[string]*istiov1.AciIstioOperator
180
        podIftoEp            map[string]*EndPointData
181
        // Node Name and Policy Name
182
        snatGlobalInfoCache map[string]map[string]*snatglobalinfo.GlobalInfo
183
        nodeSyncEnabled     bool
184
        serviceSyncEnabled  bool
185
        snatSyncEnabled     bool
186
        syncQueue           workqueue.RateLimitingInterface
187
        syncProcessors      map[string]func() bool
188
        serviceEndPoints    ServiceEndPointType
189
        crdHandlers         map[string]func(*AciController, <-chan struct{})
190
        stopCh              <-chan struct{}
191
        //index of containerportname to ctrPortNameEntry
192
        ctrPortNameCache map[string]*ctrPortNameEntry
193
        // named networkPolicies
194
        nmPortNp map[string]bool
195
        //maps network policy hash to hpp
196
        hppRef map[string]hppReference
197
        //map for ns to remoteIpCont
198
        nsRemoteIpCont map[string]remoteIpCont
199
        // cache to look for Epg DNs which are bound to Vmm domain
200
        cachedEpgDns             []string
201
        vmmClusterFaultSupported bool
202
        additionalNetworkCache   map[string]*AdditionalNetworkMeta
203
        //Used in Shared mode
204
        sharedEncapCache       map[int]*sharedEncapData
205
        sharedEncapAepCache    map[string]map[int]bool
206
        sharedEncapSviCache    map[int]*NfL3Data
207
        sharedEncapVrfCache    map[string]*NfVrfData
208
        sharedEncapTenantCache map[string]*NfTenantData
209
        nfl3configGenerationId int64
210
        // vlan to propertiesList
211
        sharedEncapNfcCache         map[int]*NfcData
212
        sharedEncapNfcVlanMap       map[int]*NfcData
213
        sharedEncapNfcLabelMap      map[string]*NfcData
214
        sharedEncapNfcAppProfileMap map[string]bool
215
        // nadVlanMap encapLabel to vlan
216
        sharedEncapLabelMap      map[string][]int
217
        lldpIfCache              map[string]*NfLLDPIfData
218
        globalVlanConfig         globalVlanConfig
219
        fabricVlanPoolMap        map[string]map[string]string
220
        openStackFabricPathDnMap map[string]openstackOpflexOdevInfo
221
        openStackSystemId        string
222
}
223

224
type NfLLDPIfData struct {
225
        LLDPIf string
226
        // As of now, manage at the NAD level
227
        // more granular introduces intf tracking complexities
228
        // for not sufficient benefits
229
        Refs map[string]bool
230
}
231

232
type NfL3OutData struct {
233
        // +kubebuilder:validation:Enum:"import"
234
        RtCtrl     string
235
        PodId      int
236
        RtrNodeMap map[int]*fabattv1.FabricL3OutRtrNode
237
        ExtEpgMap  map[string]*fabattv1.PolicyPrefixGroup
238
        SviMap     map[int]bool
239
}
240

241
type NfTenantData struct {
242
        CommonTenant     bool
243
        L3OutConfig      map[string]*NfL3OutData
244
        BGPPeerPfxConfig map[string]*fabattv1.BGPPeerPrefixPolicy
245
}
246

247
type NfVrfData struct {
248
        TenantConfig map[string]*NfTenantData
249
}
250

251
type NfL3Networks struct {
252
        fabattv1.PrimaryNetwork
253
        Subnets map[string]*fabattv1.FabricL3Subnet
254
}
255

256
type NfL3Data struct {
257
        Tenant      string
258
        Vrf         fabattv1.VRF
259
        PodId       int
260
        ConnectedNw *NfL3Networks
261
        NetAddr     map[string]*RoutedNetworkData
262
        Nodes       map[int]fabattv1.FabricL3OutNode
263
}
264

265
// remoteIpCont maps ip to pod labels
266
type remoteIpCont map[string]map[string]string
267

268
type NfcData struct {
269
        Aeps map[string]bool
270
        Epg  fabattv1.Epg
271
}
272

273
type sharedEncapData struct {
274
        //node to NAD to pods
275
        Pods   map[string]map[string][]string
276
        NetRef map[string]*AdditionalNetworkMeta
277
        Aeps   map[string]bool
278
}
279

280
type globalVlanConfig struct {
281
        SharedPhysDom apicapi.ApicObject
282
        SharedL3Dom   apicapi.ApicObject
283
}
284

285
type hppReference struct {
286
        RefCount uint              `json:"ref-count,omitempty"`
287
        Npkeys   []string          `json:"npkeys,omitempty"`
288
        HppObj   apicapi.ApicSlice `json:"hpp-obj,omitempty"`
289
        HppCr    hppv1.HostprotPol `json:"hpp-cr,omitempty"`
290
}
291

292
type DelayedEpSlice struct {
293
        ServiceKey  string
294
        OldEpSlice  *discovery.EndpointSlice
295
        NewEpSlice  *discovery.EndpointSlice
296
        DelayedTime time.Time
297
}
298

299
type aciPodAnnot struct {
300
        aciPod             string
301
        isSingleOpflexOdev bool
302
        disconnectTime     time.Time
303
        connectTime        time.Time
304
        lastErrorTime      time.Time
305
}
306

307
type nodeServiceMeta struct {
308
        serviceEp metadata.ServiceEndpoint
309
}
310

311
type nodePodNetMeta struct {
312
        nodePods            map[string]bool
313
        podNetIps           metadata.NetIps
314
        podNetIpsAnnotation string
315
}
316

317
type openstackOpflexOdevInfo struct {
318
        opflexODevDn map[string]struct{}
319
        fabricPathDn string
320
}
321

322
type serviceMeta struct {
323
        requestedIps     []net.IP
324
        ingressIps       []net.IP
325
        staticIngressIps []net.IP
326
}
327

328
type ipIndexEntry struct {
329
        ipNet net.IPNet
330
        keys  map[string]bool
331
}
332

333
type targetPort struct {
334
        proto v1.Protocol
335
        ports []int
336
}
337

338
type portIndexEntry struct {
339
        port              targetPort
340
        serviceKeys       map[string]bool
341
        networkPolicyKeys map[string]bool
342
}
343

344
type portRangeSnat struct {
345
        start int
346
        end   int
347
}
348

349
// EndPointData holds PodIF data in controller.
350
type EndPointData struct {
351
        MacAddr    string
352
        EPG        string
353
        Namespace  string
354
        AppProfile string
355
}
356

357
type ctrPortNameEntry struct {
358
        // Proto+port->pods
359
        ctrNmpToPods map[string]map[string]bool
360
}
361

362
type LinkData struct {
363
        Link []string
364
        Pods []string
365
}
366

367
type RoutedNodeData struct {
368
        addr string
369
        idx  int
370
}
371

372
type RoutedNetworkData struct {
373
        subnet       string
374
        netAddress   string
375
        maskLen      int
376
        numAllocated int
377
        maxAddresses int
378
        baseAddress  net.IP
379
        nodeMap      map[string]RoutedNodeData
380
        availableMap map[int]bool
381
}
382

383
type AdditionalNetworkMeta struct {
384
        NetworkName string
385
        EncapVlan   string
386
        //node+localiface->fabricLinks
387
        FabricLink map[string]map[string]LinkData
388
        NodeCache  map[string]*fabattv1.NodeFabricNetworkAttachment
389
        Mode       util.EncapMode
390
}
391

392
type ServiceEndPointType interface {
393
        InitClientInformer(kubeClient *kubernetes.Clientset)
394
        Run(stopCh <-chan struct{})
395
        Wait(stopCh <-chan struct{})
396
        UpdateServicesForNode(nodename string)
397
        GetnodesMetadata(key string, service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint)
398
        SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool
399
        SetNpServiceAugmentForService(servicekey string, service *v1.Service, prs *portRemoteSubnet,
400
                portAugments map[string]*portServiceAugment, subnetIndex cidranger.Ranger, logger *logrus.Entry)
401
}
402

403
type serviceEndpoint struct {
404
        cont *AciController
405
}
406
type serviceEndpointSlice struct {
407
        cont *AciController
408
}
409

410
func (sep *serviceEndpoint) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
411
        sep.cont.initEndpointsInformerFromClient(kubeClient)
×
412
}
×
413

414
func (seps *serviceEndpointSlice) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
415
        seps.cont.initEndpointSliceInformerFromClient(kubeClient)
×
416
}
×
417

418
func (sep *serviceEndpoint) Run(stopCh <-chan struct{}) {
1✔
419
        go sep.cont.endpointsInformer.Run(stopCh)
1✔
420
}
1✔
421

422
func (seps *serviceEndpointSlice) Run(stopCh <-chan struct{}) {
1✔
423
        go seps.cont.endpointSliceInformer.Run(stopCh)
1✔
424
}
1✔
425

426
func (sep *serviceEndpoint) Wait(stopCh <-chan struct{}) {
1✔
427
        cache.WaitForCacheSync(stopCh,
1✔
428
                sep.cont.endpointsInformer.HasSynced,
1✔
429
                sep.cont.serviceInformer.HasSynced)
1✔
430
}
1✔
431

432
func (seps *serviceEndpointSlice) Wait(stopCh <-chan struct{}) {
1✔
433
        seps.cont.log.Debug("Waiting for EndPointSlicecache sync")
1✔
434
        cache.WaitForCacheSync(stopCh,
1✔
435
                seps.cont.endpointSliceInformer.HasSynced,
1✔
436
                seps.cont.serviceInformer.HasSynced)
1✔
437
}
1✔
438

439
func (e *ipIndexEntry) Network() net.IPNet {
1✔
440
        return e.ipNet
1✔
441
}
1✔
442

443
func newNodePodNetMeta() *nodePodNetMeta {
1✔
444
        return &nodePodNetMeta{
1✔
445
                nodePods: make(map[string]bool),
1✔
446
        }
1✔
447
}
1✔
448

449
func createQueue(name string) workqueue.RateLimitingInterface {
1✔
450
        return workqueue.NewNamedRateLimitingQueue(
1✔
451
                workqueue.NewMaxOfRateLimiter(
1✔
452
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
453
                                10*time.Second),
1✔
454
                        &workqueue.BucketRateLimiter{
1✔
455
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
456
                        },
1✔
457
                ),
1✔
458
                "delta")
1✔
459
}
1✔
460

461
func createQueueWithEventLimits(name string, eventsPerSec float64, burstSize int) workqueue.RateLimitingInterface {
1✔
462
        return workqueue.NewNamedRateLimitingQueue(
1✔
463
                workqueue.NewMaxOfRateLimiter(
1✔
464
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
465
                                10*time.Second),
1✔
466
                        &workqueue.BucketRateLimiter{
1✔
467
                                Limiter: rate.NewLimiter(rate.Limit(eventsPerSec), int(burstSize)),
1✔
468
                        },
1✔
469
                ),
1✔
470
                name)
1✔
471
}
1✔
472

473
func NewController(config *ControllerConfig, env Environment, log *logrus.Logger, unittestmode bool) *AciController {
1✔
474
        cont := &AciController{
1✔
475
                log:          log,
1✔
476
                config:       config,
1✔
477
                env:          env,
1✔
478
                defaultEg:    "",
1✔
479
                defaultSg:    "",
1✔
480
                unitTestMode: unittestmode,
1✔
481

1✔
482
                podQueue:              createQueue("pod"),
1✔
483
                netPolQueue:           createQueue("networkPolicy"),
1✔
484
                qosQueue:              createQueue("qos"),
1✔
485
                netflowQueue:          createQueue("netflow"),
1✔
486
                erspanQueue:           createQueue("erspan"),
1✔
487
                serviceQueue:          createQueue("service"),
1✔
488
                snatQueue:             createQueue("snat"),
1✔
489
                snatNodeInfoQueue:     createQueue("snatnodeinfo"),
1✔
490
                rdConfigQueue:         createQueue("rdconfig"),
1✔
491
                istioQueue:            createQueue("istio"),
1✔
492
                nodeFabNetAttQueue:    createQueue("nodefabricnetworkattachment"),
1✔
493
                netFabConfigQueue:     createQueue("networkfabricconfiguration"),
1✔
494
                nadVlanMapQueue:       createQueue("nadvlanmap"),
1✔
495
                fabricVlanPoolQueue:   createQueue("fabricvlanpool"),
1✔
496
                netFabL3ConfigQueue:   createQueue("networkfabricl3configuration"),
1✔
497
                remIpContQueue:        createQueue("remoteIpContainer"),
1✔
498
                epgDnCacheUpdateQueue: createQueue("epgDnCache"),
1✔
499
                lldpIfQueue:           createQueueWithEventLimits("lldpIf", 5, 200),
1✔
500
                syncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
501
                        &workqueue.BucketRateLimiter{
1✔
502
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
503
                        }, "sync"),
1✔
504

1✔
505
                configuredPodNetworkIps: newNetIps(),
1✔
506
                podNetworkIps:           newNetIps(),
1✔
507
                serviceIps:              ipam.NewIpCache(),
1✔
508
                staticServiceIps:        newNetIps(),
1✔
509
                nodeServiceIps:          newNetIps(),
1✔
510

1✔
511
                nodeACIPod:       make(map[string]aciPodAnnot),
1✔
512
                nodeACIPodAnnot:  make(map[string]aciPodAnnot),
1✔
513
                nodeOpflexDevice: make(map[string]apicapi.ApicSlice),
1✔
514

1✔
515
                nodeServiceMetaCache:        make(map[string]*nodeServiceMeta),
1✔
516
                nodePodNetCache:             make(map[string]*nodePodNetMeta),
1✔
517
                serviceMetaCache:            make(map[string]*serviceMeta),
1✔
518
                snatPolicyCache:             make(map[string]*ContSnatPolicy),
1✔
519
                snatServices:                make(map[string]bool),
1✔
520
                snatNodeInfoCache:           make(map[string]*nodeinfo.NodeInfo),
1✔
521
                rdConfigCache:               make(map[string]*rdConfig.RdConfig),
1✔
522
                rdConfigSubnetCache:         make(map[string]*rdConfig.RdConfigSpec),
1✔
523
                podIftoEp:                   make(map[string]*EndPointData),
1✔
524
                snatGlobalInfoCache:         make(map[string]map[string]*snatglobalinfo.GlobalInfo),
1✔
525
                istioCache:                  make(map[string]*istiov1.AciIstioOperator),
1✔
526
                crdHandlers:                 make(map[string]func(*AciController, <-chan struct{})),
1✔
527
                ctrPortNameCache:            make(map[string]*ctrPortNameEntry),
1✔
528
                nmPortNp:                    make(map[string]bool),
1✔
529
                hppRef:                      make(map[string]hppReference),
1✔
530
                additionalNetworkCache:      make(map[string]*AdditionalNetworkMeta),
1✔
531
                sharedEncapCache:            make(map[int]*sharedEncapData),
1✔
532
                sharedEncapAepCache:         make(map[string]map[int]bool),
1✔
533
                sharedEncapSviCache:         make(map[int]*NfL3Data),
1✔
534
                sharedEncapVrfCache:         make(map[string]*NfVrfData),
1✔
535
                sharedEncapTenantCache:      make(map[string]*NfTenantData),
1✔
536
                sharedEncapNfcCache:         make(map[int]*NfcData),
1✔
537
                sharedEncapNfcVlanMap:       make(map[int]*NfcData),
1✔
538
                sharedEncapNfcLabelMap:      make(map[string]*NfcData),
1✔
539
                sharedEncapNfcAppProfileMap: make(map[string]bool),
1✔
540
                sharedEncapLabelMap:         make(map[string][]int),
1✔
541
                lldpIfCache:                 make(map[string]*NfLLDPIfData),
1✔
542
                fabricVlanPoolMap:           make(map[string]map[string]string),
1✔
543
                openStackFabricPathDnMap:    make(map[string]openstackOpflexOdevInfo),
1✔
544
                nsRemoteIpCont:              make(map[string]remoteIpCont),
1✔
545
        }
1✔
546
        cont.syncProcessors = map[string]func() bool{
1✔
547
                "snatGlobalInfo": cont.syncSnatGlobalInfo,
1✔
548
                "rdConfig":       cont.syncRdConfig,
1✔
549
                /* Commenting code to remove dependency from istio.io/istio package.
1✔
550
                   Vulnerabilties were detected by quay.io security scan of aci-containers-controller
1✔
551
                   and aci-containers-operator images for istio.io/istio package
1✔
552

1✔
553
                "istioCR":        cont.createIstioCR,
1✔
554
                */
1✔
555
        }
1✔
556
        return cont
1✔
557
}
1✔
558

559
func (cont *AciController) Init() {
×
560
        if cont.config.ChainedMode {
×
561
                cont.log.Info("In chained mode")
×
562
        }
×
563

564
        egdata, err := json.Marshal(cont.config.DefaultEg)
×
565
        if err != nil {
×
566
                cont.log.Error("Could not serialize default endpoint group")
×
567
                panic(err.Error())
×
568
        }
569
        cont.defaultEg = string(egdata)
×
570

×
571
        sgdata, err := json.Marshal(cont.config.DefaultSg)
×
572
        if err != nil {
×
573
                cont.log.Error("Could not serialize default security groups")
×
574
                panic(err.Error())
×
575
        }
576
        cont.defaultSg = string(sgdata)
×
577

×
578
        cont.log.Debug("Initializing IPAM")
×
579
        cont.initIpam()
×
580
        // check if the cluster supports endpoint slices
×
581
        // if cluster doesn't have the support fallback to endpoints
×
582
        kubeClient := cont.env.(*K8sEnvironment).kubeClient
×
583
        if util.IsEndPointSlicesSupported(kubeClient) {
×
584
                cont.serviceEndPoints = &serviceEndpointSlice{}
×
585
                cont.serviceEndPoints.(*serviceEndpointSlice).cont = cont
×
586
                cont.log.Info("Initializing ServiceEndpointSlices")
×
587
        } else {
×
588
                cont.serviceEndPoints = &serviceEndpoint{}
×
589
                cont.serviceEndPoints.(*serviceEndpoint).cont = cont
×
590
                cont.log.Info("Initializing ServiceEndpoints")
×
591
        }
×
592

593
        err = cont.env.Init(cont)
×
594
        if err != nil {
×
595
                panic(err.Error())
×
596
        }
597
}
598

599
func (cont *AciController) processQueue(queue workqueue.RateLimitingInterface,
600
        store cache.Store, handler func(interface{}) bool,
601
        deleteHandler func(string) bool,
602
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
603
        go wait.Until(func() {
2✔
604
                for {
2✔
605
                        key, quit := queue.Get()
1✔
606
                        if quit {
2✔
607
                                break
1✔
608
                        }
609

610
                        var requeue bool
1✔
611
                        switch key := key.(type) {
1✔
612
                        case chan struct{}:
×
613
                                close(key)
×
614
                        case string:
1✔
615
                                if strings.HasPrefix(key, "DELETED_") {
2✔
616
                                        delKey := strings.Trim(key, "DELETED_")
1✔
617
                                        requeue = deleteHandler(delKey)
1✔
618
                                } else {
2✔
619
                                        obj, exists, err := store.GetByKey(key)
1✔
620
                                        if err != nil {
1✔
621
                                                cont.log.Debugf("Error fetching object with key %s from store: %v", key, err)
×
622
                                        }
×
623
                                        //Handle Add/Update/Delete
624
                                        if exists && handler != nil {
2✔
625
                                                requeue = handler(obj)
1✔
626
                                        }
1✔
627
                                        //Handle Post Delete
628
                                        if !exists && postDelHandler != nil {
1✔
629
                                                requeue = postDelHandler()
×
630
                                        }
×
631
                                }
632
                        }
633
                        if requeue {
2✔
634
                                queue.AddRateLimited(key)
1✔
635
                        } else {
2✔
636
                                queue.Forget(key)
1✔
637
                        }
1✔
638
                        queue.Done(key)
1✔
639
                }
640
        }, time.Second, stopCh)
641
        <-stopCh
1✔
642
        queue.ShutDown()
1✔
643
}
644

645
func (cont *AciController) processRemIpContQueue(queue workqueue.RateLimitingInterface,
646
        handler func(interface{}) bool,
647
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
648
        go wait.Until(func() {
2✔
649
                for {
2✔
650
                        key, quit := queue.Get()
1✔
651
                        if quit {
2✔
652
                                break
1✔
653
                        }
654

655
                        var requeue bool
1✔
656
                        switch key := key.(type) {
1✔
657
                        case chan struct{}:
×
658
                                close(key)
×
659
                        case string:
1✔
660
                                if handler != nil {
2✔
661
                                        requeue = handler(key)
1✔
662
                                }
1✔
663
                                if postDelHandler != nil {
2✔
664
                                        requeue = postDelHandler()
1✔
665
                                }
1✔
666
                        }
667
                        if requeue {
1✔
668
                                queue.AddRateLimited(key)
×
669
                        } else {
1✔
670
                                queue.Forget(key)
1✔
671
                        }
1✔
672
                        queue.Done(key)
1✔
673

674
                }
675
        }, time.Second, stopCh)
676
        <-stopCh
1✔
677
        queue.ShutDown()
1✔
678
}
679

680
func (cont *AciController) processEpgDnCacheUpdateQueue(queue workqueue.RateLimitingInterface,
681
        handler func(interface{}) bool,
682
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
683
        go wait.Until(func() {
2✔
684
                for {
2✔
685
                        key, quit := queue.Get()
1✔
686
                        if quit {
2✔
687
                                break
1✔
688
                        }
689

690
                        var requeue bool
1✔
691
                        switch key := key.(type) {
1✔
692
                        case chan struct{}:
×
693
                                close(key)
×
694
                        case bool:
1✔
695
                                if handler != nil {
2✔
696
                                        requeue = handler(key)
1✔
697
                                }
1✔
698
                                if postDelHandler != nil {
1✔
699
                                        requeue = postDelHandler()
×
700
                                }
×
701
                        }
702
                        if requeue {
1✔
703
                                queue.AddRateLimited(key)
×
704
                        } else {
1✔
705
                                queue.Forget(key)
1✔
706
                        }
1✔
707
                        queue.Done(key)
1✔
708

709
                }
710
        }, time.Second, stopCh)
711
        <-stopCh
1✔
712
        queue.ShutDown()
1✔
713
}
714

715
func (cont *AciController) processLLDPIfQueue(queue workqueue.RateLimitingInterface,
716
        handler func(interface{}) bool, stopCh <-chan struct{}) {
1✔
717
        go wait.Until(func() {
2✔
718
                for {
2✔
719
                        key, quit := queue.Get()
1✔
720
                        if quit {
2✔
721
                                break
1✔
722
                        }
NEW
723
                        var requeue bool
×
NEW
724
                        switch key := key.(type) {
×
NEW
725
                        case chan struct{}:
×
NEW
726
                                close(key)
×
NEW
727
                        case string:
×
NEW
728
                                if handler != nil {
×
NEW
729
                                        requeue = handler(key)
×
NEW
730
                                }
×
731
                        }
NEW
732
                        if requeue {
×
NEW
733
                                queue.AddRateLimited(key)
×
NEW
734
                        } else {
×
NEW
735
                                queue.Forget(key)
×
NEW
736
                        }
×
NEW
737
                        queue.Done(key)
×
738

739
                }
740
        }, time.Second, stopCh)
741
        <-stopCh
1✔
742
        queue.ShutDown()
1✔
743
}
744

745
func (cont *AciController) globalStaticObjs() apicapi.ApicSlice {
1✔
746
        return apicapi.ApicSlice{}
1✔
747
}
1✔
748

749
func (cont *AciController) aciNameForKey(ktype, key string) string {
1✔
750
        return util.AciNameForKey(cont.config.AciPrefix, ktype, key)
1✔
751
}
1✔
752

753
func (cont *AciController) initStaticObjs() {
1✔
754
        cont.env.InitStaticAciObjects()
1✔
755
        cont.apicConn.WriteStaticApicObjects(cont.config.AciPrefix+"_static",
1✔
756
                cont.globalStaticObjs())
1✔
757
}
1✔
758

759
func (cont *AciController) vmmDomainProvider() (vmmProv string) {
1✔
760
        vmmProv = "Kubernetes"
1✔
761
        if strings.ToLower(cont.config.AciVmmDomainType) == "openshift" {
1✔
762
                vmmProv = "OpenShift"
×
763
        }
×
764
        return
1✔
765
}
766

767
func (cont *AciController) Run(stopCh <-chan struct{}) {
1✔
768
        var err error
1✔
769
        var privKey []byte
1✔
770
        var apicCert []byte
1✔
771

1✔
772
        cont.config.AciVrfDn = "uni/tn-" + cont.config.AciVrfTenant + "/ctx-" + cont.config.AciVrf
1✔
773

1✔
774
        if cont.config.ApicPrivateKeyPath != "" {
1✔
775
                privKey, err = os.ReadFile(cont.config.ApicPrivateKeyPath)
×
776
                if err != nil {
×
777
                        panic(err)
×
778
                }
779
        }
780
        if cont.config.ApicCertPath != "" {
1✔
781
                apicCert, err = os.ReadFile(cont.config.ApicCertPath)
×
782
                if err != nil {
×
783
                        panic(err)
×
784
                }
785
        }
786
        // If not defined, default is 1800
787
        if cont.config.ApicRefreshTimer == "" {
2✔
788
                cont.config.ApicRefreshTimer = "1800"
1✔
789
        }
1✔
790
        refreshTimeout, err := strconv.Atoi(cont.config.ApicRefreshTimer)
1✔
791
        if err != nil {
1✔
792
                panic(err)
×
793
        }
794
        cont.log.Info("ApicRefreshTimer conf is set to: ", refreshTimeout)
1✔
795

1✔
796
        // Bailout if the refreshTimeout is more than 12Hours or less than 5Mins
1✔
797
        if refreshTimeout > (12*60*60) || refreshTimeout < (5*60) {
1✔
798
                cont.log.Info("ApicRefreshTimer can't be more than 12Hrs or less than 5Mins")
×
799
                panic(err)
×
800
        }
801

802
        // If RefreshTickerAdjustInterval is not defined, default to 150Sec.
803
        if cont.config.ApicRefreshTickerAdjust == "" {
2✔
804
                cont.config.ApicRefreshTickerAdjust = "150"
1✔
805
        }
1✔
806
        refreshTickerAdjust, err := strconv.Atoi(cont.config.ApicRefreshTickerAdjust)
1✔
807
        if err != nil {
1✔
808
                panic(err)
×
809
        }
810

811
        //If ApicSubscriptionDelay is not defined, default to 100ms
812
        if cont.config.ApicSubscriptionDelay == 0 {
2✔
813
                cont.config.ApicSubscriptionDelay = 100
1✔
814
        }
1✔
815
        cont.log.Info("ApicSubscriptionDelay conf is set to: ", cont.config.ApicSubscriptionDelay)
1✔
816

1✔
817
        // If OpflexDeviceDeleteTimeout is not defined, default to 1800s
1✔
818
        if cont.config.OpflexDeviceDeleteTimeout == 0 {
2✔
819
                cont.config.OpflexDeviceDeleteTimeout = 1800
1✔
820
        }
1✔
821

822
        // If OpflexDeviceReconnectWaitTimeout is not defined, default to 25s
823
        if cont.config.OpflexDeviceReconnectWaitTimeout == 0 {
2✔
824
                cont.config.OpflexDeviceReconnectWaitTimeout = 25
1✔
825
        }
1✔
826
        cont.log.Debug("OpflexDeviceReconnectWaitTimeout set to: ", cont.config.OpflexDeviceReconnectWaitTimeout)
1✔
827

1✔
828
        // If SleepTimeSnatGlobalInfoSync is not defined, default to 60
1✔
829
        if cont.config.SleepTimeSnatGlobalInfoSync == 0 {
2✔
830
                cont.config.SleepTimeSnatGlobalInfoSync = 60
1✔
831
        }
1✔
832

833
        // If not defined, default to 32
834
        if cont.config.PodIpPoolChunkSize == 0 {
2✔
835
                cont.config.PodIpPoolChunkSize = 32
1✔
836
        }
1✔
837
        if !cont.config.ChainedMode {
2✔
838
                cont.log.Info("PodIpPoolChunkSize conf is set to: ", cont.config.PodIpPoolChunkSize)
1✔
839
        }
1✔
840

841
        // If ApicConnectionRetryLimit is not defined, default to 5
842
        if cont.config.ApicConnectionRetryLimit == 0 {
2✔
843
                cont.config.ApicConnectionRetryLimit = 5
1✔
844
        }
1✔
845
        cont.log.Debug("ApicConnectionRetryLimit set to: ", cont.config.ApicConnectionRetryLimit)
1✔
846

1✔
847
        // If not valid, default to 5000-65000
1✔
848
        // other permissible values 1-65000
1✔
849
        defStart := 5000
1✔
850
        defEnd := 65000
1✔
851
        if cont.config.SnatDefaultPortRangeStart == 0 {
2✔
852
                cont.config.SnatDefaultPortRangeStart = defStart
1✔
853
        }
1✔
854
        if cont.config.SnatDefaultPortRangeEnd == 0 {
2✔
855
                cont.config.SnatDefaultPortRangeEnd = defEnd
1✔
856
        }
1✔
857
        if cont.config.SnatDefaultPortRangeStart < 0 || cont.config.SnatDefaultPortRangeEnd < 0 ||
1✔
858
                cont.config.SnatDefaultPortRangeStart > defEnd || cont.config.SnatDefaultPortRangeEnd > defEnd ||
1✔
859
                cont.config.SnatDefaultPortRangeStart > cont.config.SnatDefaultPortRangeEnd {
1✔
860
                cont.config.SnatDefaultPortRangeStart = defStart
×
861
                cont.config.SnatDefaultPortRangeEnd = defEnd
×
862
        }
×
863

864
        // Set default value for pbr programming delay if services list is not empty
865
        // and delay value is empty
866
        if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 &&
1✔
867
                cont.config.ServiceGraphEndpointAddDelay.Services != nil &&
1✔
868
                len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 {
1✔
869
                cont.config.ServiceGraphEndpointAddDelay.Delay = 90
×
870
        }
×
871
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
872
                cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay)
×
873
        }
×
874

875
        // Set contract scope for snat svc graph to global by default
876
        if cont.config.SnatSvcContractScope == "" {
2✔
877
                cont.config.SnatSvcContractScope = "global"
1✔
878
        }
1✔
879
        if cont.config.MaxSvcGraphNodes == 0 {
2✔
880
                cont.config.MaxSvcGraphNodes = 32
1✔
881
        }
1✔
882
        if !cont.config.ChainedMode {
2✔
883
                cont.log.Info("Max number of nodes per svc graph is set to: ", cont.config.MaxSvcGraphNodes)
1✔
884
        }
1✔
885
        cont.apicConn, err = apicapi.New(cont.log, cont.config.ApicHosts,
1✔
886
                cont.config.ApicUsername, cont.config.ApicPassword,
1✔
887
                privKey, apicCert, cont.config.AciPrefix,
1✔
888
                refreshTimeout, refreshTickerAdjust, cont.config.ApicSubscriptionDelay,
1✔
889
                cont.config.AciVrfTenant)
1✔
890
        if err != nil {
1✔
891
                panic(err)
×
892
        }
893

894
        cont.apicConn.ReconnectRetryLimit = cont.config.ApicConnectionRetryLimit
1✔
895

1✔
896
        if len(cont.config.ApicHosts) != 0 {
1✔
897
        APIC_SWITCH:
×
898
                cont.log.WithFields(logrus.Fields{
×
899
                        "mod":  "APICAPI",
×
900
                        "host": cont.apicConn.Apic[cont.apicConn.ApicIndex],
×
901
                }).Debug("Connecting to APIC to determine the Version")
×
902

×
903
                version, err := cont.apicConn.GetVersion()
×
904
                if err != nil {
×
905
                        cont.log.Error("Could not get APIC version, switching to next APIC")
×
906
                        cont.apicConn.ApicIndex = (cont.apicConn.ApicIndex + 1) % len(cont.apicConn.Apic)
×
907
                        time.Sleep(cont.apicConn.ReconnectInterval)
×
908
                        goto APIC_SWITCH
×
909
                }
910
                cont.apicConn.CachedVersion = version
×
911
                apicapi.ApicVersion = version
×
912
                if version >= "4.2(4i)" {
×
913
                        cont.apicConn.SnatPbrFltrChain = true
×
914
                } else {
×
915
                        cont.apicConn.SnatPbrFltrChain = false
×
916
                }
×
917
                if version >= "5.2" {
×
918
                        cont.vmmClusterFaultSupported = true
×
919
                }
×
920
        } else { // For unit-tests
1✔
921
                cont.apicConn.SnatPbrFltrChain = true
1✔
922
        }
1✔
923

924
        if !cont.config.ChainedMode {
2✔
925
                cont.log.Debug("SnatPbrFltrChain set to:", cont.apicConn.SnatPbrFltrChain)
1✔
926
                // Make sure Pod/NodeBDs are assoicated to same VRF.
1✔
927
                if len(cont.config.ApicHosts) != 0 && cont.config.AciPodBdDn != "" && cont.config.AciNodeBdDn != "" {
1✔
928
                        var expectedVrfRelations []string
×
929
                        expectedVrfRelations = append(expectedVrfRelations, cont.config.AciPodBdDn, cont.config.AciNodeBdDn)
×
930
                        cont.log.Debug("expectedVrfRelations:", expectedVrfRelations)
×
931
                        err = cont.apicConn.ValidateAciVrfAssociation(cont.config.AciVrfDn, expectedVrfRelations)
×
932
                        if err != nil {
×
933
                                cont.log.Error("Pod/NodeBDs and AciL3Out VRF association is incorrect")
×
934
                                panic(err)
×
935
                        }
936
                }
937
        }
938

939
        if len(cont.config.ApicHosts) != 0 && cont.vmmClusterFaultSupported && !cont.config.ChainedMode {
1✔
940
                //Clear fault instances when the controller starts
×
941
                cont.clearFaultInstances()
×
942
                //Subscribe for vmmEpPD for a given domain
×
943
                var tnTargetFilterEpg string
×
944
                tnTargetFilterEpg += fmt.Sprintf("uni/vmmp-%s/dom-%s/", cont.vmmDomainProvider(), cont.config.AciVmmDomain)
×
945
                subnetTargetFilterEpg := fmt.Sprintf("and(wcard(vmmEpPD.dn,\"%s\"))", tnTargetFilterEpg)
×
946
                cont.apicConn.AddSubscriptionClass("vmmEpPD",
×
947
                        []string{"vmmEpPD"}, subnetTargetFilterEpg)
×
948
                cont.apicConn.SetSubscriptionHooks("vmmEpPD",
×
949
                        func(obj apicapi.ApicObject) bool {
×
950
                                cont.vmmEpPDChanged(obj)
×
951
                                return true
×
952
                        },
×
953
                        func(dn string) {
×
954
                                cont.vmmEpPDDeleted(dn)
×
955
                        })
×
956
        }
957

958
        cont.initStaticObjs()
1✔
959

1✔
960
        err = cont.env.PrepareRun(stopCh)
1✔
961
        if err != nil {
1✔
962
                panic(err.Error())
×
963
        }
964

965
        cont.apicConn.FullSyncHook = func() {
1✔
966
                // put a channel into each work queue and wait on it to
×
967
                // checkpoint object syncing in response to new subscription
×
968
                // updates
×
969
                cont.log.Debug("Starting checkpoint")
×
970
                var chans []chan struct{}
×
971
                qs := make([]workqueue.RateLimitingInterface, 0)
×
972
                _, ok := cont.env.(*K8sEnvironment)
×
973
                if ok {
×
974
                        qs = []workqueue.RateLimitingInterface{cont.podQueue}
×
975
                        if !cont.config.ChainedMode {
×
976
                                if !cont.config.DisableHppRendering {
×
977
                                        qs = append(qs, cont.netPolQueue)
×
978
                                }
×
979
                                if cont.config.EnableHppDirect {
×
980
                                        qs = append(qs, cont.remIpContQueue)
×
981
                                }
×
982
                                qs = append(qs, cont.qosQueue, cont.serviceQueue,
×
983
                                        cont.snatQueue, cont.netflowQueue, cont.snatNodeInfoQueue,
×
984
                                        cont.rdConfigQueue, cont.erspanQueue,
×
985
                                        cont.epgDnCacheUpdateQueue)
×
986
                        }
987
                }
988
                for _, q := range qs {
×
989
                        c := make(chan struct{})
×
990
                        chans = append(chans, c)
×
991
                        q.Add(c)
×
992
                }
×
993
                for _, c := range chans {
×
994
                        <-c
×
995
                }
×
996
                cont.log.Debug("Checkpoint complete")
×
997
        }
998

999
        if len(cont.config.ApicHosts) != 0 && !cont.config.ChainedMode {
1✔
1000
                cont.BuildSubnetDnCache(cont.config.AciVrfDn, cont.config.AciVrfDn)
×
1001
                cont.scheduleRdConfig()
×
1002
                if strings.Contains(cont.config.Flavor, "openstack") {
×
1003
                        cont.setOpenStackSystemId()
×
1004
                }
×
1005
        }
1006

1007
        if !cont.config.ChainedMode {
2✔
1008
                if cont.config.AciPolicyTenant != cont.config.AciVrfTenant {
2✔
1009
                        cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
1010
                                []string{"hostprotPol"})
1✔
1011
                }
1✔
1012
        } else {
1✔
1013
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
1014
                        []string{"fvBD", "fvAp"})
1✔
1015
                cont.apicConn.AddSubscriptionClass("fvnsVlanInstP",
1✔
1016
                        []string{"fvnsVlanInstP"}, "")
1✔
1017
                cont.apicConn.AddSubscriptionClass("infraRsDomP",
1✔
1018
                        []string{"infraRsDomP"}, "")
1✔
1019
                cont.apicConn.AddSubscriptionClass("physDomP",
1✔
1020
                        []string{"physDomP"}, "")
1✔
1021
                cont.apicConn.AddSubscriptionClass("l3extDomP",
1✔
1022
                        []string{"l3extDomP"}, "")
1✔
1023
                cont.apicConn.AddSubscriptionClass("infraRsVlanNs",
1✔
1024
                        []string{"infraRsVlanNs"}, "")
1✔
1025
                cont.apicConn.AddSubscriptionClass("infraGeneric",
1✔
1026
                        []string{"infraGeneric", "infraRsFuncToEpg"}, "")
1✔
1027
                cont.apicConn.AddSubscriptionClass("l3extOut",
1✔
1028
                        []string{"l3extInstP", "l3extSubnet", "fvRsCons", "fvRsProv", "l3extRsEctx", "l3extRsL3DomAtt", "l3extLNodeP", "l3extRsNodeL3OutAtt", "ipRouteP", "ipNexthopP", "l3extLIfP", "l3extVirtualLIfP", "l3extRsDynPathAtt",
1✔
1029
                                "l3extRsPathL3OutAtt", "l3extMember", "l3extIp", "bgpExtP", "bgpPeerP", "bgpAsP", "bgpLocalAsnP", "bgpRsPeerPfxPol"}, "")
1✔
1030
                cont.apicConn.AddSubscriptionClass("bgpPeerPfxPol",
1✔
1031
                        []string{"bgpPeerPfxPol"}, "")
1✔
1032
        }
1✔
1033
        if !cont.config.ChainedMode {
2✔
1034
                // When a new class is added for subscriptio, check if its name attribute
1✔
1035
                // is in the format aciPrefix-<some value>, if so add it in nameAttrClass
1✔
1036
                // in apicapi.go
1✔
1037
                subscribeMo := []string{"fvBD", "vnsLDevVip", "vnsAbsGraph", "vnsLDevCtx",
1✔
1038
                        "vzFilter", "vzBrCP", "l3extInstP", "vnsSvcRedirectPol",
1✔
1039
                        "vnsRedirectHealthGroup", "fvIPSLAMonitoringPol"}
1✔
1040
                if cont.config.AciPolicyTenant == cont.config.AciVrfTenant {
1✔
1041
                        subscribeMo = append(subscribeMo, "hostprotPol")
×
1042
                }
×
1043
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciVrfTenant,
1✔
1044
                        subscribeMo)
1✔
1045
                cont.apicConn.AddSubscriptionDn(fmt.Sprintf("uni/tn-%s/out-%s",
1✔
1046
                        cont.config.AciVrfTenant, cont.config.AciL3Out),
1✔
1047
                        []string{"fvRsCons"})
1✔
1048
                vmmDn := fmt.Sprintf("comp/prov-%s/ctrlr-[%s]-%s/injcont",
1✔
1049
                        cont.env.VmmPolicy(), cont.config.AciVmmDomain,
1✔
1050
                        cont.config.AciVmmController)
1✔
1051
                // Before subscribing to vmm objects, add vmmInjectedLabel as a child after explicit APIC version check
1✔
1052
                // Since it is not supported for APIC versions < "5.0"
1✔
1053
                cont.addVmmInjectedLabel()
1✔
1054
                cont.apicConn.AddSubscriptionDn(vmmDn,
1✔
1055
                        []string{"vmmInjectedHost", "vmmInjectedNs"})
1✔
1056

1✔
1057
                var tnTargetFilter string
1✔
1058
                if len(cont.config.AciVrfRelatedTenants) > 0 {
1✔
1059
                        for _, tn := range cont.config.AciVrfRelatedTenants {
×
1060
                                tnTargetFilter += fmt.Sprintf("tn-%s/|", tn)
×
1061
                        }
×
1062
                } else {
1✔
1063
                        tnTargetFilter += fmt.Sprintf("tn-%s/|tn-%s/",
1✔
1064
                                cont.config.AciPolicyTenant, cont.config.AciVrfTenant)
1✔
1065
                }
1✔
1066
                subnetTargetFilter := fmt.Sprintf("and(wcard(fvSubnet.dn,\"%s\"))",
1✔
1067
                        tnTargetFilter)
1✔
1068
                cont.apicConn.AddSubscriptionClass("fvSubnet",
1✔
1069
                        []string{"fvSubnet"}, subnetTargetFilter)
1✔
1070

1✔
1071
                cont.apicConn.SetSubscriptionHooks("fvSubnet",
1✔
1072
                        func(obj apicapi.ApicObject) bool {
1✔
1073
                                cont.SubnetChanged(obj, cont.config.AciVrfDn)
×
1074
                                return true
×
1075
                        },
×
1076
                        func(dn string) {
×
1077
                                cont.SubnetDeleted(dn)
×
1078
                        })
×
1079

1080
                cont.apicConn.AddSubscriptionClass("opflexODev",
1✔
1081
                        []string{"opflexODev"}, "")
1✔
1082

1✔
1083
                cont.apicConn.SetSubscriptionHooks("opflexODev",
1✔
1084
                        func(obj apicapi.ApicObject) bool {
1✔
1085
                                cont.opflexDeviceChanged(obj)
×
1086
                                return true
×
1087
                        },
×
1088
                        func(dn string) {
×
1089
                                cont.opflexDeviceDeleted(dn)
×
1090
                        })
×
1091

1092
                cont.apicConn.VersionUpdateHook =
1✔
1093
                        func() {
1✔
1094
                                cont.initStaticServiceObjs()
×
1095
                        }
×
1096
        }
1097
        go cont.apicConn.Run(stopCh)
1✔
1098
}
1099

1100
func (cont *AciController) syncNodeAciPods(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1101
        cont.log.Debug("Go routine to periodically check the aci pod information in the opflexOdev of a node")
1✔
1102
        ticker := time.NewTicker(seconds * time.Second)
1✔
1103
        defer ticker.Stop()
1✔
1104

1✔
1105
        for {
2✔
1106
                select {
1✔
1107
                case <-ticker.C:
1✔
1108
                        if cont.config.EnableOpflexAgentReconnect {
1✔
1109
                                cont.checkChangeOfOpflexOdevAciPod()
×
1110
                        }
×
1111
                        if cont.config.AciMultipod {
1✔
1112
                                cont.checkChangeOfOdevAciPod()
×
1113
                        }
×
1114
                case <-stopCh:
1✔
1115
                        return
1✔
1116
                }
1117
        }
1118
}
1119

1120
func (cont *AciController) syncOpflexDevices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1121
        cont.log.Debug("Go routine to periodically delete old opflexdevices started")
1✔
1122
        ticker := time.NewTicker(seconds * time.Second)
1✔
1123
        defer ticker.Stop()
1✔
1124

1✔
1125
        for {
2✔
1126
                select {
1✔
1127
                case <-ticker.C:
1✔
1128
                        cont.deleteOldOpflexDevices()
1✔
1129
                case <-stopCh:
1✔
1130
                        return
1✔
1131
                }
1132
        }
1133
}
1134

1135
func (cont *AciController) syncDelayedEpSlices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1136
        cont.log.Debug("Go routine to periodically check and process the epslices having delay adding in service")
1✔
1137
        ticker := time.NewTicker(seconds * time.Second)
1✔
1138
        defer ticker.Stop()
1✔
1139

1✔
1140
        for {
2✔
1141
                select {
1✔
1142
                case <-ticker.C:
1✔
1143
                        cont.processDelayedEpSlices()
1✔
1144
                case <-stopCh:
1✔
1145
                        return
1✔
1146
                }
1147
        }
1148
}
1149

1150
func (cont *AciController) snatGlobalInfoSync(stopCh <-chan struct{}, seconds int) {
1✔
1151
        time.Sleep(time.Duration(seconds) * time.Second)
1✔
1152
        cont.log.Debug("Go routine to periodically sync globalinfo and nodeinfo started")
1✔
1153
        iteration := 0
1✔
1154
        for {
2✔
1155
                // To avoid noisy logs, only printing once in 5 minutes
1✔
1156
                if iteration%5 == 0 {
2✔
1157
                        cont.log.Debug("Syncing GlobalInfo with Node infos")
1✔
1158
                }
1✔
1159
                var nodeInfos []*nodeinfo.NodeInfo
1✔
1160
                cont.indexMutex.Lock()
1✔
1161
                cache.ListAll(cont.snatNodeInfoIndexer, labels.Everything(),
1✔
1162
                        func(nodeInfoObj interface{}) {
2✔
1163
                                nodeInfo := nodeInfoObj.(*nodeinfo.NodeInfo)
1✔
1164
                                nodeInfos = append(nodeInfos, nodeInfo)
1✔
1165
                        })
1✔
1166
                expectedmap := make(map[string]map[string]bool)
1✔
1167
                for _, glinfo := range cont.snatGlobalInfoCache {
2✔
1168
                        for nodename, entry := range glinfo {
2✔
1169
                                if _, found := expectedmap[nodename]; !found {
2✔
1170
                                        newentry := make(map[string]bool)
1✔
1171
                                        newentry[entry.SnatPolicyName] = true
1✔
1172
                                        expectedmap[nodename] = newentry
1✔
1173
                                } else {
2✔
1174
                                        currententry := expectedmap[nodename]
1✔
1175
                                        currententry[entry.SnatPolicyName] = true
1✔
1176
                                        expectedmap[nodename] = currententry
1✔
1177
                                }
1✔
1178
                        }
1179
                }
1180
                cont.indexMutex.Unlock()
1✔
1181

1✔
1182
                for _, value := range nodeInfos {
2✔
1183
                        marked := false
1✔
1184
                        policyNames := value.Spec.SnatPolicyNames
1✔
1185
                        nodeName := value.ObjectMeta.Name
1✔
1186
                        _, ok := expectedmap[nodeName]
1✔
1187
                        if !ok && len(policyNames) > 0 {
2✔
1188
                                cont.log.Info("Adding missing entry in snatglobalinfo for node: ", nodeName)
1✔
1189
                                cont.log.Debug("No snat policies found in snatglobalinfo")
1✔
1190
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1191
                                marked = true
1✔
1192
                        } else if len(policyNames) != len(expectedmap[nodeName]) {
3✔
1193
                                cont.log.Info("Adding missing snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1194
                                cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1195
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1196
                                marked = true
1✔
1197
                        } else {
2✔
1198
                                if len(policyNames) == 0 && len(expectedmap[nodeName]) == 0 {
1✔
1199
                                        // No snatpolicies present
×
1200
                                        continue
×
1201
                                }
1202
                                eq := reflect.DeepEqual(expectedmap[nodeName], policyNames)
1✔
1203
                                if !eq {
2✔
1204
                                        cont.log.Debug("Syncing inconsistent snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1205
                                        cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1206
                                        cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1207
                                        marked = true
1✔
1208
                                }
1✔
1209
                        }
1210
                        if marked {
2✔
1211
                                cont.log.Info("Nodeinfo and globalinfo out of sync for node: ", nodeName)
1✔
1212
                                nodeinfokey, err := cache.MetaNamespaceKeyFunc(value)
1✔
1213
                                if err != nil {
1✔
1214
                                        cont.log.Error("Not able to get key for node: ", nodeName)
×
1215
                                        continue
×
1216
                                }
1217
                                cont.log.Info("Queuing nodeinfokey for globalinfo sync: ", nodeinfokey)
1✔
1218
                                cont.queueNodeInfoUpdateByKey(nodeinfokey)
1✔
1219
                        } else if iteration%5 == 0 {
2✔
1220
                                cont.log.Info("Nodeinfo and globalinfo in sync for node: ", nodeName)
1✔
1221
                        }
1✔
1222
                }
1223
                time.Sleep(time.Duration(seconds) * time.Second)
1✔
1224
                iteration++
1✔
1225
        }
1226
}
1227

1228
func (cont *AciController) processSyncQueue(queue workqueue.RateLimitingInterface,
1229
        queueStop <-chan struct{}) {
1✔
1230
        go wait.Until(func() {
2✔
1231
                for {
2✔
1232
                        syncType, quit := queue.Get()
1✔
1233
                        if quit {
2✔
1234
                                break
1✔
1235
                        }
1236
                        var requeue bool
1✔
1237
                        if sType, ok := syncType.(string); ok {
2✔
1238
                                if f, ok := cont.syncProcessors[sType]; ok {
2✔
1239
                                        requeue = f()
1✔
1240
                                }
1✔
1241
                        }
1242
                        if requeue {
1✔
1243
                                queue.AddRateLimited(syncType)
×
1244
                        } else {
1✔
1245
                                queue.Forget(syncType)
1✔
1246
                        }
1✔
1247
                        queue.Done(syncType)
1✔
1248
                }
1249
        }, time.Second, queueStop)
1250
        <-queueStop
1✔
1251
        queue.ShutDown()
1✔
1252
}
1253

1254
func (cont *AciController) scheduleSyncGlobalInfo() {
1✔
1255
        cont.syncQueue.AddRateLimited("snatGlobalInfo")
1✔
1256
}
1✔
1257
func (cont *AciController) scheduleRdConfig() {
×
1258
        cont.syncQueue.AddRateLimited("rdConfig")
×
1259
}
×
1260
func (cont *AciController) scheduleCreateIstioCR() {
×
1261
        cont.syncQueue.AddRateLimited("istioCR")
×
1262
}
×
1263

1264
func (cont *AciController) addVmmInjectedLabel() {
1✔
1265
        if apicapi.ApicVersion >= "5.2" {
1✔
1266
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedLabel")
×
1267
                if err != nil {
×
1268
                        panic(err.Error())
×
1269
                }
1270
                err = apicapi.AddMetaDataChild("vmmInjectedSvc", "vmmInjectedLabel")
×
1271
                if err != nil {
×
1272
                        panic(err.Error())
×
1273
                }
1274
        }
1275
        if apicapi.ApicVersion >= "5.0" {
2✔
1276
                err := apicapi.AddMetaDataChild("vmmInjectedReplSet", "vmmInjectedLabel")
1✔
1277
                if err != nil {
1✔
1278
                        panic(err.Error())
×
1279
                }
1280
                err = apicapi.AddMetaDataChild("vmmInjectedContGrp", "vmmInjectedLabel")
1✔
1281
                if err != nil {
1✔
1282
                        panic(err.Error())
×
1283
                }
1284
                err = apicapi.AddMetaDataChild("vmmInjectedDepl", "vmmInjectedLabel")
1✔
1285
                if err != nil {
1✔
1286
                        panic(err.Error())
×
1287
                }
1288
        }
1289
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc