• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 9930

23 Sep 2024 02:20AM UTC coverage: 69.632% (-0.3%) from 69.949%
9930

push

travis-pro

web-flow
Merge pull request #1373 from noironetworks/allow_multiple_ifs

LLDPIf Cache updates

73 of 197 new or added lines in 3 files covered. (37.06%)

8 existing lines in 3 files now uncovered.

13141 of 18872 relevant lines covered (69.63%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.35
/pkg/controller/controller.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "encoding/json"
19
        "fmt"
20
        "net"
21
        "os"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/sirupsen/logrus"
29
        "github.com/yl2chen/cidranger"
30
        "golang.org/x/time/rate"
31

32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        v1net "k8s.io/api/networking/v1"
35
        "k8s.io/apimachinery/pkg/labels"
36
        "k8s.io/apimachinery/pkg/util/wait"
37
        "k8s.io/client-go/kubernetes"
38
        "k8s.io/client-go/tools/cache"
39
        "k8s.io/client-go/util/workqueue"
40

41
        "github.com/noironetworks/aci-containers/pkg/apicapi"
42
        fabattv1 "github.com/noironetworks/aci-containers/pkg/fabricattachment/apis/aci.fabricattachment/v1"
43
        fabattclset "github.com/noironetworks/aci-containers/pkg/fabricattachment/clientset/versioned"
44
        hppv1 "github.com/noironetworks/aci-containers/pkg/hpp/apis/aci.hpp/v1"
45
        "github.com/noironetworks/aci-containers/pkg/index"
46
        "github.com/noironetworks/aci-containers/pkg/ipam"
47
        istiov1 "github.com/noironetworks/aci-containers/pkg/istiocrd/apis/aci.istio/v1"
48
        "github.com/noironetworks/aci-containers/pkg/metadata"
49
        nodeinfo "github.com/noironetworks/aci-containers/pkg/nodeinfo/apis/aci.snat/v1"
50
        rdConfig "github.com/noironetworks/aci-containers/pkg/rdconfig/apis/aci.snat/v1"
51
        snatglobalinfo "github.com/noironetworks/aci-containers/pkg/snatglobalinfo/apis/aci.snat/v1"
52
        "github.com/noironetworks/aci-containers/pkg/util"
53
)
54

55
type podUpdateFunc func(*v1.Pod) (*v1.Pod, error)
56
type nodeUpdateFunc func(*v1.Node) (*v1.Node, error)
57
type serviceUpdateFunc func(*v1.Service) (*v1.Service, error)
58
type listNetworkPoliciesFunc func(string) (*v1net.NetworkPolicyList, error)
59
type listNamespacesFunc func() (*v1.NamespaceList, error)
60

61
type AciController struct {
62
        log    *logrus.Logger
63
        config *ControllerConfig
64
        env    Environment
65

66
        defaultEg string
67
        defaultSg string
68

69
        unitTestMode bool
70

71
        podQueue              workqueue.RateLimitingInterface
72
        netPolQueue           workqueue.RateLimitingInterface
73
        qosQueue              workqueue.RateLimitingInterface
74
        serviceQueue          workqueue.RateLimitingInterface
75
        snatQueue             workqueue.RateLimitingInterface
76
        netflowQueue          workqueue.RateLimitingInterface
77
        erspanQueue           workqueue.RateLimitingInterface
78
        snatNodeInfoQueue     workqueue.RateLimitingInterface
79
        rdConfigQueue         workqueue.RateLimitingInterface
80
        istioQueue            workqueue.RateLimitingInterface
81
        nodeFabNetAttQueue    workqueue.RateLimitingInterface
82
        netFabConfigQueue     workqueue.RateLimitingInterface
83
        nadVlanMapQueue       workqueue.RateLimitingInterface
84
        fabricVlanPoolQueue   workqueue.RateLimitingInterface
85
        netFabL3ConfigQueue   workqueue.RateLimitingInterface
86
        remIpContQueue        workqueue.RateLimitingInterface
87
        epgDnCacheUpdateQueue workqueue.RateLimitingInterface
88
        lldpIfQueue           workqueue.RateLimitingInterface
89

90
        namespaceIndexer                     cache.Indexer
91
        namespaceInformer                    cache.Controller
92
        podIndexer                           cache.Indexer
93
        podInformer                          cache.Controller
94
        endpointsIndexer                     cache.Indexer
95
        endpointsInformer                    cache.Controller
96
        serviceIndexer                       cache.Indexer
97
        serviceInformer                      cache.Controller
98
        replicaSetIndexer                    cache.Indexer
99
        replicaSetInformer                   cache.Controller
100
        deploymentIndexer                    cache.Indexer
101
        deploymentInformer                   cache.Controller
102
        nodeIndexer                          cache.Indexer
103
        nodeInformer                         cache.Controller
104
        networkPolicyIndexer                 cache.Indexer
105
        networkPolicyInformer                cache.Controller
106
        snatIndexer                          cache.Indexer
107
        snatInformer                         cache.Controller
108
        snatNodeInfoIndexer                  cache.Indexer
109
        snatNodeInformer                     cache.Controller
110
        crdInformer                          cache.Controller
111
        rdConfigInformer                     cache.Controller
112
        rdConfigIndexer                      cache.Indexer
113
        qosIndexer                           cache.Indexer
114
        qosInformer                          cache.Controller
115
        netflowIndexer                       cache.Indexer
116
        netflowInformer                      cache.Controller
117
        erspanIndexer                        cache.Indexer
118
        erspanInformer                       cache.Controller
119
        nodePodIfIndexer                     cache.Indexer
120
        nodePodIfInformer                    cache.Controller
121
        istioIndexer                         cache.Indexer
122
        istioInformer                        cache.Controller
123
        endpointSliceIndexer                 cache.Indexer
124
        endpointSliceInformer                cache.Controller
125
        snatCfgInformer                      cache.Controller
126
        updatePod                            podUpdateFunc
127
        updateNode                           nodeUpdateFunc
128
        updateServiceStatus                  serviceUpdateFunc
129
        listNetworkPolicies                  listNetworkPoliciesFunc
130
        listNamespaces                       listNamespacesFunc
131
        nodeFabNetAttInformer                cache.SharedIndexInformer
132
        netFabConfigInformer                 cache.SharedIndexInformer
133
        nadVlanMapInformer                   cache.SharedIndexInformer
134
        fabricVlanPoolInformer               cache.SharedIndexInformer
135
        networkFabricL3ConfigurationInformer cache.SharedIndexInformer
136
        fabNetAttClient                      *fabattclset.Clientset
137
        proactiveConfInformer                cache.SharedIndexInformer
138

139
        indexMutex sync.Mutex
140
        hppMutex   sync.Mutex
141

142
        configuredPodNetworkIps *netIps
143
        podNetworkIps           *netIps
144
        serviceIps              *ipam.IpCache
145
        staticServiceIps        *netIps
146
        nodeServiceIps          *netIps
147

148
        // index of pods matched by deployments
149
        depPods *index.PodSelectorIndex
150
        // index of pods matched by network policies
151
        netPolPods *index.PodSelectorIndex
152
        // index of pods matched by network policy ingress rules
153
        netPolIngressPods *index.PodSelectorIndex
154
        // index of pods matched by network policy egress rules
155
        netPolEgressPods *index.PodSelectorIndex
156
        // index of IP addresses contained in endpoints objects
157
        endpointsIpIndex cidranger.Ranger
158
        // index of service target ports
159
        targetPortIndex map[string]*portIndexEntry
160
        // index of ip blocks referenced by network policy egress rules
161
        netPolSubnetIndex cidranger.Ranger
162
        // index of pods matched by erspan policies
163
        erspanPolPods *index.PodSelectorIndex
164

165
        apicConn *apicapi.ApicConnection
166

167
        nodeServiceMetaCache map[string]*nodeServiceMeta
168
        nodeACIPod           map[string]aciPodAnnot
169
        nodeACIPodAnnot      map[string]aciPodAnnot
170
        nodeOpflexDevice     map[string]apicapi.ApicSlice
171
        nodePodNetCache      map[string]*nodePodNetMeta
172
        serviceMetaCache     map[string]*serviceMeta
173
        snatPolicyCache      map[string]*ContSnatPolicy
174
        delayedEpSlices      []*DelayedEpSlice
175
        snatServices         map[string]bool
176
        snatNodeInfoCache    map[string]*nodeinfo.NodeInfo
177
        rdConfigCache        map[string]*rdConfig.RdConfig
178
        rdConfigSubnetCache  map[string]*rdConfig.RdConfigSpec
179
        istioCache           map[string]*istiov1.AciIstioOperator
180
        podIftoEp            map[string]*EndPointData
181
        // Node Name and Policy Name
182
        snatGlobalInfoCache map[string]map[string]*snatglobalinfo.GlobalInfo
183
        nodeSyncEnabled     bool
184
        serviceSyncEnabled  bool
185
        snatSyncEnabled     bool
186
        syncQueue           workqueue.RateLimitingInterface
187
        syncProcessors      map[string]func() bool
188
        serviceEndPoints    ServiceEndPointType
189
        crdHandlers         map[string]func(*AciController, <-chan struct{})
190
        stopCh              <-chan struct{}
191
        //index of containerportname to ctrPortNameEntry
192
        ctrPortNameCache map[string]*ctrPortNameEntry
193
        // named networkPolicies
194
        nmPortNp map[string]bool
195
        //maps network policy hash to hpp
196
        hppRef map[string]hppReference
197
        //map for ns to remoteIpCont
198
        nsRemoteIpCont map[string]remoteIpCont
199
        // cache to look for Epg DNs which are bound to Vmm domain
200
        cachedEpgDns             []string
201
        vmmClusterFaultSupported bool
202
        additionalNetworkCache   map[string]*AdditionalNetworkMeta
203
        //Used in Shared mode
204
        sharedEncapCache       map[int]*sharedEncapData
205
        sharedEncapAepCache    map[string]map[int]bool
206
        sharedEncapSviCache    map[int]*NfL3Data
207
        sharedEncapVrfCache    map[string]*NfVrfData
208
        sharedEncapTenantCache map[string]*NfTenantData
209
        nfl3configGenerationId int64
210
        // vlan to propertiesList
211
        sharedEncapNfcCache         map[int]*NfcData
212
        sharedEncapNfcVlanMap       map[int]*NfcData
213
        sharedEncapNfcLabelMap      map[string]*NfcData
214
        sharedEncapNfcAppProfileMap map[string]bool
215
        // nadVlanMap encapLabel to vlan
216
        sharedEncapLabelMap      map[string][]int
217
        lldpIfCache              map[string]*NfLLDPIfData
218
        globalVlanConfig         globalVlanConfig
219
        fabricVlanPoolMap        map[string]map[string]string
220
        openStackFabricPathDnMap map[string]openstackOpflexOdevInfo
221
        openStackSystemId        string
222
}
223

224
type NfLLDPIfData struct {
225
        LLDPIf string
226
        // As of now, manage at the NAD level
227
        // more granular introduces intf tracking complexities
228
        // for not sufficient benefits
229
        Refs map[string]bool
230
}
231

232
type NfL3OutData struct {
233
        // +kubebuilder:validation:Enum:"import"
234
        RtCtrl     string
235
        PodId      int
236
        RtrNodeMap map[int]*fabattv1.FabricL3OutRtrNode
237
        ExtEpgMap  map[string]*fabattv1.PolicyPrefixGroup
238
        SviMap     map[int]bool
239
}
240

241
type NfTenantData struct {
242
        CommonTenant     bool
243
        L3OutConfig      map[string]*NfL3OutData
244
        BGPPeerPfxConfig map[string]*fabattv1.BGPPeerPrefixPolicy
245
}
246

247
type NfVrfData struct {
248
        TenantConfig map[string]*NfTenantData
249
}
250

251
type NfL3Networks struct {
252
        fabattv1.PrimaryNetwork
253
        Subnets map[string]*fabattv1.FabricL3Subnet
254
}
255

256
type NfL3Data struct {
257
        Tenant      string
258
        Vrf         fabattv1.VRF
259
        PodId       int
260
        ConnectedNw *NfL3Networks
261
        NetAddr     map[string]*RoutedNetworkData
262
        Nodes       map[int]fabattv1.FabricL3OutNode
263
}
264

265
// remoteIpCont maps ip to pod labels
266
type remoteIpCont map[string]map[string]string
267

268
type NfcData struct {
269
        Aeps map[string]bool
270
        Epg  fabattv1.Epg
271
}
272

273
type sharedEncapData struct {
274
        //node to NAD to pods
275
        Pods   map[string]map[string][]string
276
        NetRef map[string]*AdditionalNetworkMeta
277
        Aeps   map[string]bool
278
}
279

280
type globalVlanConfig struct {
281
        SharedPhysDom apicapi.ApicObject
282
        SharedL3Dom   apicapi.ApicObject
283
}
284

285
type hppReference struct {
286
        RefCount uint              `json:"ref-count,omitempty"`
287
        Npkeys   []string          `json:"npkeys,omitempty"`
288
        HppObj   apicapi.ApicSlice `json:"hpp-obj,omitempty"`
289
        HppCr    hppv1.HostprotPol `json:"hpp-cr,omitempty"`
290
}
291

292
type DelayedEpSlice struct {
293
        ServiceKey  string
294
        OldEpSlice  *discovery.EndpointSlice
295
        NewEpSlice  *discovery.EndpointSlice
296
        DelayedTime time.Time
297
}
298

299
type aciPodAnnot struct {
300
        aciPod             string
301
        isSingleOpflexOdev bool
302
        disconnectTime     time.Time
303
        connectTime        time.Time
304
}
305

306
type nodeServiceMeta struct {
307
        serviceEp metadata.ServiceEndpoint
308
}
309

310
type nodePodNetMeta struct {
311
        nodePods            map[string]bool
312
        podNetIps           metadata.NetIps
313
        podNetIpsAnnotation string
314
}
315

316
type openstackOpflexOdevInfo struct {
317
        opflexODevDn map[string]struct{}
318
        fabricPathDn string
319
}
320

321
type serviceMeta struct {
322
        requestedIps     []net.IP
323
        ingressIps       []net.IP
324
        staticIngressIps []net.IP
325
}
326

327
type ipIndexEntry struct {
328
        ipNet net.IPNet
329
        keys  map[string]bool
330
}
331

332
type targetPort struct {
333
        proto v1.Protocol
334
        ports []int
335
}
336

337
type portIndexEntry struct {
338
        port              targetPort
339
        serviceKeys       map[string]bool
340
        networkPolicyKeys map[string]bool
341
}
342

343
type portRangeSnat struct {
344
        start int
345
        end   int
346
}
347

348
// EndPointData holds PodIF data in controller.
349
type EndPointData struct {
350
        MacAddr    string
351
        EPG        string
352
        Namespace  string
353
        AppProfile string
354
}
355

356
type ctrPortNameEntry struct {
357
        // Proto+port->pods
358
        ctrNmpToPods map[string]map[string]bool
359
}
360

361
type LinkData struct {
362
        Link []string
363
        Pods []string
364
}
365

366
type RoutedNodeData struct {
367
        addr string
368
        idx  int
369
}
370

371
type RoutedNetworkData struct {
372
        subnet       string
373
        netAddress   string
374
        maskLen      int
375
        numAllocated int
376
        maxAddresses int
377
        baseAddress  net.IP
378
        nodeMap      map[string]RoutedNodeData
379
        availableMap map[int]bool
380
}
381

382
type AdditionalNetworkMeta struct {
383
        NetworkName string
384
        EncapVlan   string
385
        //node+localiface->fabricLinks
386
        FabricLink map[string]map[string]LinkData
387
        NodeCache  map[string]*fabattv1.NodeFabricNetworkAttachment
388
        Mode       util.EncapMode
389
}
390

391
type ServiceEndPointType interface {
392
        InitClientInformer(kubeClient *kubernetes.Clientset)
393
        Run(stopCh <-chan struct{})
394
        Wait(stopCh <-chan struct{})
395
        UpdateServicesForNode(nodename string)
396
        GetnodesMetadata(key string, service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint)
397
        SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool
398
        SetNpServiceAugmentForService(servicekey string, service *v1.Service, prs *portRemoteSubnet,
399
                portAugments map[string]*portServiceAugment, subnetIndex cidranger.Ranger, logger *logrus.Entry)
400
}
401

402
type serviceEndpoint struct {
403
        cont *AciController
404
}
405
type serviceEndpointSlice struct {
406
        cont *AciController
407
}
408

409
func (sep *serviceEndpoint) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
410
        sep.cont.initEndpointsInformerFromClient(kubeClient)
×
411
}
×
412

413
func (seps *serviceEndpointSlice) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
414
        seps.cont.initEndpointSliceInformerFromClient(kubeClient)
×
415
}
×
416

417
func (sep *serviceEndpoint) Run(stopCh <-chan struct{}) {
1✔
418
        go sep.cont.endpointsInformer.Run(stopCh)
1✔
419
}
1✔
420

421
func (seps *serviceEndpointSlice) Run(stopCh <-chan struct{}) {
1✔
422
        go seps.cont.endpointSliceInformer.Run(stopCh)
1✔
423
}
1✔
424

425
func (sep *serviceEndpoint) Wait(stopCh <-chan struct{}) {
1✔
426
        cache.WaitForCacheSync(stopCh,
1✔
427
                sep.cont.endpointsInformer.HasSynced,
1✔
428
                sep.cont.serviceInformer.HasSynced)
1✔
429
}
1✔
430

431
func (seps *serviceEndpointSlice) Wait(stopCh <-chan struct{}) {
1✔
432
        seps.cont.log.Debug("Waiting for EndPointSlicecache sync")
1✔
433
        cache.WaitForCacheSync(stopCh,
1✔
434
                seps.cont.endpointSliceInformer.HasSynced,
1✔
435
                seps.cont.serviceInformer.HasSynced)
1✔
436
}
1✔
437

438
func (e *ipIndexEntry) Network() net.IPNet {
1✔
439
        return e.ipNet
1✔
440
}
1✔
441

442
func newNodePodNetMeta() *nodePodNetMeta {
1✔
443
        return &nodePodNetMeta{
1✔
444
                nodePods: make(map[string]bool),
1✔
445
        }
1✔
446
}
1✔
447

448
func createQueue(name string) workqueue.RateLimitingInterface {
1✔
449
        return workqueue.NewNamedRateLimitingQueue(
1✔
450
                workqueue.NewMaxOfRateLimiter(
1✔
451
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
452
                                10*time.Second),
1✔
453
                        &workqueue.BucketRateLimiter{
1✔
454
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
455
                        },
1✔
456
                ),
1✔
457
                "delta")
1✔
458
}
1✔
459

460
func createQueueWithEventLimits(name string, eventsPerSec float64, burstSize int) workqueue.RateLimitingInterface {
1✔
461
        return workqueue.NewNamedRateLimitingQueue(
1✔
462
                workqueue.NewMaxOfRateLimiter(
1✔
463
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
464
                                10*time.Second),
1✔
465
                        &workqueue.BucketRateLimiter{
1✔
466
                                Limiter: rate.NewLimiter(rate.Limit(eventsPerSec), int(burstSize)),
1✔
467
                        },
1✔
468
                ),
1✔
469
                name)
1✔
470
}
1✔
471

472
func NewController(config *ControllerConfig, env Environment, log *logrus.Logger, unittestmode bool) *AciController {
1✔
473
        cont := &AciController{
1✔
474
                log:          log,
1✔
475
                config:       config,
1✔
476
                env:          env,
1✔
477
                defaultEg:    "",
1✔
478
                defaultSg:    "",
1✔
479
                unitTestMode: unittestmode,
1✔
480

1✔
481
                podQueue:              createQueue("pod"),
1✔
482
                netPolQueue:           createQueue("networkPolicy"),
1✔
483
                qosQueue:              createQueue("qos"),
1✔
484
                netflowQueue:          createQueue("netflow"),
1✔
485
                erspanQueue:           createQueue("erspan"),
1✔
486
                serviceQueue:          createQueue("service"),
1✔
487
                snatQueue:             createQueue("snat"),
1✔
488
                snatNodeInfoQueue:     createQueue("snatnodeinfo"),
1✔
489
                rdConfigQueue:         createQueue("rdconfig"),
1✔
490
                istioQueue:            createQueue("istio"),
1✔
491
                nodeFabNetAttQueue:    createQueue("nodefabricnetworkattachment"),
1✔
492
                netFabConfigQueue:     createQueue("networkfabricconfiguration"),
1✔
493
                nadVlanMapQueue:       createQueue("nadvlanmap"),
1✔
494
                fabricVlanPoolQueue:   createQueue("fabricvlanpool"),
1✔
495
                netFabL3ConfigQueue:   createQueue("networkfabricl3configuration"),
1✔
496
                remIpContQueue:        createQueue("remoteIpContainer"),
1✔
497
                epgDnCacheUpdateQueue: createQueue("epgDnCache"),
1✔
498
                lldpIfQueue:           createQueueWithEventLimits("lldpIf", 5, 200),
1✔
499
                syncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
500
                        &workqueue.BucketRateLimiter{
1✔
501
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
502
                        }, "sync"),
1✔
503

1✔
504
                configuredPodNetworkIps: newNetIps(),
1✔
505
                podNetworkIps:           newNetIps(),
1✔
506
                serviceIps:              ipam.NewIpCache(),
1✔
507
                staticServiceIps:        newNetIps(),
1✔
508
                nodeServiceIps:          newNetIps(),
1✔
509

1✔
510
                nodeACIPod:       make(map[string]aciPodAnnot),
1✔
511
                nodeACIPodAnnot:  make(map[string]aciPodAnnot),
1✔
512
                nodeOpflexDevice: make(map[string]apicapi.ApicSlice),
1✔
513

1✔
514
                nodeServiceMetaCache:        make(map[string]*nodeServiceMeta),
1✔
515
                nodePodNetCache:             make(map[string]*nodePodNetMeta),
1✔
516
                serviceMetaCache:            make(map[string]*serviceMeta),
1✔
517
                snatPolicyCache:             make(map[string]*ContSnatPolicy),
1✔
518
                snatServices:                make(map[string]bool),
1✔
519
                snatNodeInfoCache:           make(map[string]*nodeinfo.NodeInfo),
1✔
520
                rdConfigCache:               make(map[string]*rdConfig.RdConfig),
1✔
521
                rdConfigSubnetCache:         make(map[string]*rdConfig.RdConfigSpec),
1✔
522
                podIftoEp:                   make(map[string]*EndPointData),
1✔
523
                snatGlobalInfoCache:         make(map[string]map[string]*snatglobalinfo.GlobalInfo),
1✔
524
                istioCache:                  make(map[string]*istiov1.AciIstioOperator),
1✔
525
                crdHandlers:                 make(map[string]func(*AciController, <-chan struct{})),
1✔
526
                ctrPortNameCache:            make(map[string]*ctrPortNameEntry),
1✔
527
                nmPortNp:                    make(map[string]bool),
1✔
528
                hppRef:                      make(map[string]hppReference),
1✔
529
                additionalNetworkCache:      make(map[string]*AdditionalNetworkMeta),
1✔
530
                sharedEncapCache:            make(map[int]*sharedEncapData),
1✔
531
                sharedEncapAepCache:         make(map[string]map[int]bool),
1✔
532
                sharedEncapSviCache:         make(map[int]*NfL3Data),
1✔
533
                sharedEncapVrfCache:         make(map[string]*NfVrfData),
1✔
534
                sharedEncapTenantCache:      make(map[string]*NfTenantData),
1✔
535
                sharedEncapNfcCache:         make(map[int]*NfcData),
1✔
536
                sharedEncapNfcVlanMap:       make(map[int]*NfcData),
1✔
537
                sharedEncapNfcLabelMap:      make(map[string]*NfcData),
1✔
538
                sharedEncapNfcAppProfileMap: make(map[string]bool),
1✔
539
                sharedEncapLabelMap:         make(map[string][]int),
1✔
540
                lldpIfCache:                 make(map[string]*NfLLDPIfData),
1✔
541
                fabricVlanPoolMap:           make(map[string]map[string]string),
1✔
542
                openStackFabricPathDnMap:    make(map[string]openstackOpflexOdevInfo),
1✔
543
                nsRemoteIpCont:              make(map[string]remoteIpCont),
1✔
544
        }
1✔
545
        cont.syncProcessors = map[string]func() bool{
1✔
546
                "snatGlobalInfo": cont.syncSnatGlobalInfo,
1✔
547
                "rdConfig":       cont.syncRdConfig,
1✔
548
                /* Commenting code to remove dependency from istio.io/istio package.
1✔
549
                   Vulnerabilties were detected by quay.io security scan of aci-containers-controller
1✔
550
                   and aci-containers-operator images for istio.io/istio package
1✔
551

1✔
552
                "istioCR":        cont.createIstioCR,
1✔
553
                */
1✔
554
        }
1✔
555
        return cont
1✔
556
}
1✔
557

558
func (cont *AciController) Init() {
×
559
        if cont.config.ChainedMode {
×
560
                cont.log.Info("In chained mode")
×
561
        }
×
562

563
        egdata, err := json.Marshal(cont.config.DefaultEg)
×
564
        if err != nil {
×
565
                cont.log.Error("Could not serialize default endpoint group")
×
566
                panic(err.Error())
×
567
        }
568
        cont.defaultEg = string(egdata)
×
569

×
570
        sgdata, err := json.Marshal(cont.config.DefaultSg)
×
571
        if err != nil {
×
572
                cont.log.Error("Could not serialize default security groups")
×
573
                panic(err.Error())
×
574
        }
575
        cont.defaultSg = string(sgdata)
×
576

×
577
        cont.log.Debug("Initializing IPAM")
×
578
        cont.initIpam()
×
579
        // check if the cluster supports endpoint slices
×
580
        // if cluster doesn't have the support fallback to endpoints
×
581
        kubeClient := cont.env.(*K8sEnvironment).kubeClient
×
582
        if util.IsEndPointSlicesSupported(kubeClient) {
×
583
                cont.serviceEndPoints = &serviceEndpointSlice{}
×
584
                cont.serviceEndPoints.(*serviceEndpointSlice).cont = cont
×
585
                cont.log.Info("Initializing ServiceEndpointSlices")
×
586
        } else {
×
587
                cont.serviceEndPoints = &serviceEndpoint{}
×
588
                cont.serviceEndPoints.(*serviceEndpoint).cont = cont
×
589
                cont.log.Info("Initializing ServiceEndpoints")
×
590
        }
×
591

592
        err = cont.env.Init(cont)
×
593
        if err != nil {
×
594
                panic(err.Error())
×
595
        }
596
}
597

598
func (cont *AciController) processQueue(queue workqueue.RateLimitingInterface,
599
        store cache.Store, handler func(interface{}) bool,
600
        deleteHandler func(string) bool,
601
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
602
        go wait.Until(func() {
2✔
603
                for {
2✔
604
                        key, quit := queue.Get()
1✔
605
                        if quit {
2✔
606
                                break
1✔
607
                        }
608

609
                        var requeue bool
1✔
610
                        switch key := key.(type) {
1✔
611
                        case chan struct{}:
×
612
                                close(key)
×
613
                        case string:
1✔
614
                                if strings.HasPrefix(key, "DELETED_") {
2✔
615
                                        delKey := strings.Trim(key, "DELETED_")
1✔
616
                                        requeue = deleteHandler(delKey)
1✔
617
                                } else {
2✔
618
                                        obj, exists, err := store.GetByKey(key)
1✔
619
                                        if err != nil {
1✔
620
                                                cont.log.Debugf("Error fetching object with key %s from store: %v", key, err)
×
621
                                        }
×
622
                                        //Handle Add/Update/Delete
623
                                        if exists && handler != nil {
2✔
624
                                                requeue = handler(obj)
1✔
625
                                        }
1✔
626
                                        //Handle Post Delete
627
                                        if !exists && postDelHandler != nil {
1✔
628
                                                requeue = postDelHandler()
×
629
                                        }
×
630
                                }
631
                        }
632
                        if requeue {
2✔
633
                                queue.AddRateLimited(key)
1✔
634
                        } else {
2✔
635
                                queue.Forget(key)
1✔
636
                        }
1✔
637
                        queue.Done(key)
1✔
638
                }
639
        }, time.Second, stopCh)
640
        <-stopCh
1✔
641
        queue.ShutDown()
1✔
642
}
643

644
func (cont *AciController) processRemIpContQueue(queue workqueue.RateLimitingInterface,
645
        handler func(interface{}) bool,
646
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
647
        go wait.Until(func() {
2✔
648
                for {
2✔
649
                        key, quit := queue.Get()
1✔
650
                        if quit {
2✔
651
                                break
1✔
652
                        }
653

654
                        var requeue bool
1✔
655
                        switch key := key.(type) {
1✔
656
                        case chan struct{}:
×
657
                                close(key)
×
658
                        case string:
1✔
659
                                if handler != nil {
2✔
660
                                        requeue = handler(key)
1✔
661
                                }
1✔
662
                                if postDelHandler != nil {
2✔
663
                                        requeue = postDelHandler()
1✔
664
                                }
1✔
665
                        }
666
                        if requeue {
1✔
667
                                queue.AddRateLimited(key)
×
668
                        } else {
1✔
669
                                queue.Forget(key)
1✔
670
                        }
1✔
671
                        queue.Done(key)
1✔
672

673
                }
674
        }, time.Second, stopCh)
675
        <-stopCh
1✔
676
        queue.ShutDown()
1✔
677
}
678

679
func (cont *AciController) processEpgDnCacheUpdateQueue(queue workqueue.RateLimitingInterface,
680
        handler func(interface{}) bool,
681
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
682
        go wait.Until(func() {
2✔
683
                for {
2✔
684
                        key, quit := queue.Get()
1✔
685
                        if quit {
2✔
686
                                break
1✔
687
                        }
688

689
                        var requeue bool
1✔
690
                        switch key := key.(type) {
1✔
691
                        case chan struct{}:
×
692
                                close(key)
×
693
                        case bool:
1✔
694
                                if handler != nil {
2✔
695
                                        requeue = handler(key)
1✔
696
                                }
1✔
697
                                if postDelHandler != nil {
1✔
698
                                        requeue = postDelHandler()
×
699
                                }
×
700
                        }
701
                        if requeue {
1✔
702
                                queue.AddRateLimited(key)
×
703
                        } else {
1✔
704
                                queue.Forget(key)
1✔
705
                        }
1✔
706
                        queue.Done(key)
1✔
707

708
                }
709
        }, time.Second, stopCh)
710
        <-stopCh
1✔
711
        queue.ShutDown()
1✔
712
}
713

714
func (cont *AciController) processLLDPIfQueue(queue workqueue.RateLimitingInterface,
715
        handler func(interface{}) bool, stopCh <-chan struct{}) {
1✔
716
        go wait.Until(func() {
2✔
717
                for {
2✔
718
                        key, quit := queue.Get()
1✔
719
                        if quit {
2✔
720
                                break
1✔
721
                        }
NEW
722
                        var requeue bool
×
NEW
723
                        switch key := key.(type) {
×
NEW
724
                        case chan struct{}:
×
NEW
725
                                close(key)
×
NEW
726
                        case string:
×
NEW
727
                                if handler != nil {
×
NEW
728
                                        requeue = handler(key)
×
NEW
729
                                }
×
730
                        }
NEW
731
                        if requeue {
×
NEW
732
                                queue.AddRateLimited(key)
×
NEW
733
                        } else {
×
NEW
734
                                queue.Forget(key)
×
NEW
735
                        }
×
NEW
736
                        queue.Done(key)
×
737

738
                }
739
        }, time.Second, stopCh)
740
        <-stopCh
1✔
741
        queue.ShutDown()
1✔
742
}
743

744
func (cont *AciController) globalStaticObjs() apicapi.ApicSlice {
1✔
745
        return apicapi.ApicSlice{}
1✔
746
}
1✔
747

748
func (cont *AciController) aciNameForKey(ktype, key string) string {
1✔
749
        return util.AciNameForKey(cont.config.AciPrefix, ktype, key)
1✔
750
}
1✔
751

752
func (cont *AciController) initStaticObjs() {
1✔
753
        cont.env.InitStaticAciObjects()
1✔
754
        cont.apicConn.WriteStaticApicObjects(cont.config.AciPrefix+"_static",
1✔
755
                cont.globalStaticObjs())
1✔
756
}
1✔
757

758
func (cont *AciController) vmmDomainProvider() (vmmProv string) {
1✔
759
        vmmProv = "Kubernetes"
1✔
760
        if strings.ToLower(cont.config.AciVmmDomainType) == "openshift" {
1✔
761
                vmmProv = "OpenShift"
×
762
        }
×
763
        return
1✔
764
}
765

766
func (cont *AciController) Run(stopCh <-chan struct{}) {
1✔
767
        var err error
1✔
768
        var privKey []byte
1✔
769
        var apicCert []byte
1✔
770

1✔
771
        cont.config.AciVrfDn = "uni/tn-" + cont.config.AciVrfTenant + "/ctx-" + cont.config.AciVrf
1✔
772

1✔
773
        if cont.config.ApicPrivateKeyPath != "" {
1✔
774
                privKey, err = os.ReadFile(cont.config.ApicPrivateKeyPath)
×
775
                if err != nil {
×
776
                        panic(err)
×
777
                }
778
        }
779
        if cont.config.ApicCertPath != "" {
1✔
780
                apicCert, err = os.ReadFile(cont.config.ApicCertPath)
×
781
                if err != nil {
×
782
                        panic(err)
×
783
                }
784
        }
785
        // If not defined, default is 1800
786
        if cont.config.ApicRefreshTimer == "" {
2✔
787
                cont.config.ApicRefreshTimer = "1800"
1✔
788
        }
1✔
789
        refreshTimeout, err := strconv.Atoi(cont.config.ApicRefreshTimer)
1✔
790
        if err != nil {
1✔
791
                panic(err)
×
792
        }
793
        cont.log.Info("ApicRefreshTimer conf is set to: ", refreshTimeout)
1✔
794

1✔
795
        // Bailout if the refreshTimeout is more than 12Hours or less than 5Mins
1✔
796
        if refreshTimeout > (12*60*60) || refreshTimeout < (5*60) {
1✔
797
                cont.log.Info("ApicRefreshTimer can't be more than 12Hrs or less than 5Mins")
×
798
                panic(err)
×
799
        }
800

801
        // If RefreshTickerAdjustInterval is not defined, default to 150Sec.
802
        if cont.config.ApicRefreshTickerAdjust == "" {
2✔
803
                cont.config.ApicRefreshTickerAdjust = "150"
1✔
804
        }
1✔
805
        refreshTickerAdjust, err := strconv.Atoi(cont.config.ApicRefreshTickerAdjust)
1✔
806
        if err != nil {
1✔
807
                panic(err)
×
808
        }
809

810
        //If ApicSubscriptionDelay is not defined, default to 100ms
811
        if cont.config.ApicSubscriptionDelay == 0 {
2✔
812
                cont.config.ApicSubscriptionDelay = 100
1✔
813
        }
1✔
814
        cont.log.Info("ApicSubscriptionDelay conf is set to: ", cont.config.ApicSubscriptionDelay)
1✔
815

1✔
816
        // If OpflexDeviceDeleteTimeout is not defined, default to 1800s
1✔
817
        if cont.config.OpflexDeviceDeleteTimeout == 0 {
2✔
818
                cont.config.OpflexDeviceDeleteTimeout = 1800
1✔
819
        }
1✔
820

821
        // If OpflexDeviceReconnectWaitTimeout is not defined, default to 25s
822
        if cont.config.OpflexDeviceReconnectWaitTimeout == 0 {
2✔
823
                cont.config.OpflexDeviceReconnectWaitTimeout = 25
1✔
824
        }
1✔
825
        cont.log.Debug("OpflexDeviceReconnectWaitTimeout set to: ", cont.config.OpflexDeviceReconnectWaitTimeout)
1✔
826

1✔
827
        // If SleepTimeSnatGlobalInfoSync is not defined, default to 60
1✔
828
        if cont.config.SleepTimeSnatGlobalInfoSync == 0 {
2✔
829
                cont.config.SleepTimeSnatGlobalInfoSync = 60
1✔
830
        }
1✔
831

832
        // If not defined, default to 32
833
        if cont.config.PodIpPoolChunkSize == 0 {
2✔
834
                cont.config.PodIpPoolChunkSize = 32
1✔
835
        }
1✔
836
        if !cont.config.ChainedMode {
2✔
837
                cont.log.Info("PodIpPoolChunkSize conf is set to: ", cont.config.PodIpPoolChunkSize)
1✔
838
        }
1✔
839

840
        // If ApicConnectionRetryLimit is not defined, default to 5
841
        if cont.config.ApicConnectionRetryLimit == 0 {
2✔
842
                cont.config.ApicConnectionRetryLimit = 5
1✔
843
        }
1✔
844
        cont.log.Debug("ApicConnectionRetryLimit set to: ", cont.config.ApicConnectionRetryLimit)
1✔
845

1✔
846
        // If not valid, default to 5000-65000
1✔
847
        // other permissible values 1-65000
1✔
848
        defStart := 5000
1✔
849
        defEnd := 65000
1✔
850
        if cont.config.SnatDefaultPortRangeStart == 0 {
2✔
851
                cont.config.SnatDefaultPortRangeStart = defStart
1✔
852
        }
1✔
853
        if cont.config.SnatDefaultPortRangeEnd == 0 {
2✔
854
                cont.config.SnatDefaultPortRangeEnd = defEnd
1✔
855
        }
1✔
856
        if cont.config.SnatDefaultPortRangeStart < 0 || cont.config.SnatDefaultPortRangeEnd < 0 ||
1✔
857
                cont.config.SnatDefaultPortRangeStart > defEnd || cont.config.SnatDefaultPortRangeEnd > defEnd ||
1✔
858
                cont.config.SnatDefaultPortRangeStart > cont.config.SnatDefaultPortRangeEnd {
1✔
859
                cont.config.SnatDefaultPortRangeStart = defStart
×
860
                cont.config.SnatDefaultPortRangeEnd = defEnd
×
861
        }
×
862

863
        // Set default value for pbr programming delay if services list is not empty
864
        // and delay value is empty
865
        if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 &&
1✔
866
                cont.config.ServiceGraphEndpointAddDelay.Services != nil &&
1✔
867
                len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 {
1✔
868
                cont.config.ServiceGraphEndpointAddDelay.Delay = 90
×
869
        }
×
870
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
871
                cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay)
×
872
        }
×
873

874
        // Set contract scope for snat svc graph to global by default
875
        if cont.config.SnatSvcContractScope == "" {
2✔
876
                cont.config.SnatSvcContractScope = "global"
1✔
877
        }
1✔
878
        if cont.config.MaxSvcGraphNodes == 0 {
2✔
879
                cont.config.MaxSvcGraphNodes = 32
1✔
880
        }
1✔
881
        if !cont.config.ChainedMode {
2✔
882
                cont.log.Info("Max number of nodes per svc graph is set to: ", cont.config.MaxSvcGraphNodes)
1✔
883
        }
1✔
884
        cont.apicConn, err = apicapi.New(cont.log, cont.config.ApicHosts,
1✔
885
                cont.config.ApicUsername, cont.config.ApicPassword,
1✔
886
                privKey, apicCert, cont.config.AciPrefix,
1✔
887
                refreshTimeout, refreshTickerAdjust, cont.config.ApicSubscriptionDelay,
1✔
888
                cont.config.AciVrfTenant)
1✔
889
        if err != nil {
1✔
890
                panic(err)
×
891
        }
892

893
        cont.apicConn.ReconnectRetryLimit = cont.config.ApicConnectionRetryLimit
1✔
894

1✔
895
        if len(cont.config.ApicHosts) != 0 {
1✔
896
        APIC_SWITCH:
×
897
                cont.log.WithFields(logrus.Fields{
×
898
                        "mod":  "APICAPI",
×
899
                        "host": cont.apicConn.Apic[cont.apicConn.ApicIndex],
×
900
                }).Debug("Connecting to APIC to determine the Version")
×
901

×
902
                version, err := cont.apicConn.GetVersion()
×
903
                if err != nil {
×
904
                        cont.log.Error("Could not get APIC version, switching to next APIC")
×
905
                        cont.apicConn.ApicIndex = (cont.apicConn.ApicIndex + 1) % len(cont.apicConn.Apic)
×
906
                        time.Sleep(cont.apicConn.ReconnectInterval)
×
907
                        goto APIC_SWITCH
×
908
                }
909
                cont.apicConn.CachedVersion = version
×
910
                apicapi.ApicVersion = version
×
911
                if version >= "4.2(4i)" {
×
912
                        cont.apicConn.SnatPbrFltrChain = true
×
913
                } else {
×
914
                        cont.apicConn.SnatPbrFltrChain = false
×
915
                }
×
916
                if version >= "5.2" {
×
917
                        cont.vmmClusterFaultSupported = true
×
918
                }
×
919
        } else { // For unit-tests
1✔
920
                cont.apicConn.SnatPbrFltrChain = true
1✔
921
        }
1✔
922

923
        if !cont.config.ChainedMode {
2✔
924
                cont.log.Debug("SnatPbrFltrChain set to:", cont.apicConn.SnatPbrFltrChain)
1✔
925
                // Make sure Pod/NodeBDs are assoicated to same VRF.
1✔
926
                if len(cont.config.ApicHosts) != 0 && cont.config.AciPodBdDn != "" && cont.config.AciNodeBdDn != "" {
1✔
927
                        var expectedVrfRelations []string
×
928
                        expectedVrfRelations = append(expectedVrfRelations, cont.config.AciPodBdDn, cont.config.AciNodeBdDn)
×
929
                        cont.log.Debug("expectedVrfRelations:", expectedVrfRelations)
×
930
                        err = cont.apicConn.ValidateAciVrfAssociation(cont.config.AciVrfDn, expectedVrfRelations)
×
931
                        if err != nil {
×
932
                                cont.log.Error("Pod/NodeBDs and AciL3Out VRF association is incorrect")
×
933
                                panic(err)
×
934
                        }
935
                }
936
        }
937

938
        if len(cont.config.ApicHosts) != 0 && cont.vmmClusterFaultSupported && !cont.config.ChainedMode {
1✔
939
                //Clear fault instances when the controller starts
×
940
                cont.clearFaultInstances()
×
941
                //Subscribe for vmmEpPD for a given domain
×
942
                var tnTargetFilterEpg string
×
943
                tnTargetFilterEpg += fmt.Sprintf("uni/vmmp-%s/dom-%s/", cont.vmmDomainProvider(), cont.config.AciVmmDomain)
×
944
                subnetTargetFilterEpg := fmt.Sprintf("and(wcard(vmmEpPD.dn,\"%s\"))", tnTargetFilterEpg)
×
945
                cont.apicConn.AddSubscriptionClass("vmmEpPD",
×
946
                        []string{"vmmEpPD"}, subnetTargetFilterEpg)
×
947
                cont.apicConn.SetSubscriptionHooks("vmmEpPD",
×
948
                        func(obj apicapi.ApicObject) bool {
×
949
                                cont.vmmEpPDChanged(obj)
×
950
                                return true
×
951
                        },
×
952
                        func(dn string) {
×
953
                                cont.vmmEpPDDeleted(dn)
×
954
                        })
×
955
        }
956

957
        cont.initStaticObjs()
1✔
958

1✔
959
        err = cont.env.PrepareRun(stopCh)
1✔
960
        if err != nil {
1✔
961
                panic(err.Error())
×
962
        }
963

964
        cont.apicConn.FullSyncHook = func() {
1✔
965
                // put a channel into each work queue and wait on it to
×
966
                // checkpoint object syncing in response to new subscription
×
967
                // updates
×
968
                cont.log.Debug("Starting checkpoint")
×
969
                var chans []chan struct{}
×
970
                qs := make([]workqueue.RateLimitingInterface, 0)
×
971
                _, ok := cont.env.(*K8sEnvironment)
×
972
                if ok {
×
973
                        qs = []workqueue.RateLimitingInterface{cont.podQueue}
×
974
                        if !cont.config.ChainedMode {
×
975
                                if !cont.config.DisableHppRendering {
×
976
                                        qs = append(qs, cont.netPolQueue)
×
977
                                }
×
978
                                if cont.config.EnableHppDirect {
×
979
                                        qs = append(qs, cont.remIpContQueue)
×
980
                                }
×
981
                                qs = append(qs, cont.qosQueue, cont.serviceQueue,
×
982
                                        cont.snatQueue, cont.netflowQueue, cont.snatNodeInfoQueue,
×
983
                                        cont.rdConfigQueue, cont.erspanQueue,
×
984
                                        cont.epgDnCacheUpdateQueue)
×
985
                        }
986
                }
987
                for _, q := range qs {
×
988
                        c := make(chan struct{})
×
989
                        chans = append(chans, c)
×
990
                        q.Add(c)
×
991
                }
×
992
                for _, c := range chans {
×
993
                        <-c
×
994
                }
×
995
                cont.log.Debug("Checkpoint complete")
×
996
        }
997

998
        if len(cont.config.ApicHosts) != 0 && !cont.config.ChainedMode {
1✔
999
                cont.BuildSubnetDnCache(cont.config.AciVrfDn, cont.config.AciVrfDn)
×
1000
                cont.scheduleRdConfig()
×
1001
                if strings.Contains(cont.config.Flavor, "openstack") {
×
1002
                        cont.setOpenStackSystemId()
×
1003
                }
×
1004
        }
1005

1006
        if !cont.config.ChainedMode {
2✔
1007
                if cont.config.AciPolicyTenant != cont.config.AciVrfTenant {
2✔
1008
                        cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
1009
                                []string{"hostprotPol"})
1✔
1010
                }
1✔
1011
        } else {
1✔
1012
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
1013
                        []string{"fvBD", "fvAp"})
1✔
1014
                cont.apicConn.AddSubscriptionClass("fvnsVlanInstP",
1✔
1015
                        []string{"fvnsVlanInstP"}, "")
1✔
1016
                cont.apicConn.AddSubscriptionClass("infraRsDomP",
1✔
1017
                        []string{"infraRsDomP"}, "")
1✔
1018
                cont.apicConn.AddSubscriptionClass("physDomP",
1✔
1019
                        []string{"physDomP"}, "")
1✔
1020
                cont.apicConn.AddSubscriptionClass("l3extDomP",
1✔
1021
                        []string{"l3extDomP"}, "")
1✔
1022
                cont.apicConn.AddSubscriptionClass("infraRsVlanNs",
1✔
1023
                        []string{"infraRsVlanNs"}, "")
1✔
1024
                cont.apicConn.AddSubscriptionClass("infraGeneric",
1✔
1025
                        []string{"infraGeneric", "infraRsFuncToEpg"}, "")
1✔
1026
                cont.apicConn.AddSubscriptionClass("l3extOut",
1✔
1027
                        []string{"l3extInstP", "l3extSubnet", "fvRsCons", "fvRsProv", "l3extRsEctx", "l3extRsL3DomAtt", "l3extLNodeP", "l3extRsNodeL3OutAtt", "ipRouteP", "ipNexthopP", "l3extLIfP", "l3extVirtualLIfP", "l3extRsDynPathAtt",
1✔
1028
                                "l3extRsPathL3OutAtt", "l3extMember", "l3extIp", "bgpExtP", "bgpPeerP", "bgpAsP", "bgpLocalAsnP", "bgpRsPeerPfxPol"}, "")
1✔
1029
                cont.apicConn.AddSubscriptionClass("bgpPeerPfxPol",
1✔
1030
                        []string{"bgpPeerPfxPol"}, "")
1✔
1031
        }
1✔
1032
        if !cont.config.ChainedMode {
2✔
1033
                // When a new class is added for subscriptio, check if its name attribute
1✔
1034
                // is in the format aciPrefix-<some value>, if so add it in nameAttrClass
1✔
1035
                // in apicapi.go
1✔
1036
                subscribeMo := []string{"fvBD", "vnsLDevVip", "vnsAbsGraph", "vnsLDevCtx",
1✔
1037
                        "vzFilter", "vzBrCP", "l3extInstP", "vnsSvcRedirectPol",
1✔
1038
                        "vnsRedirectHealthGroup", "fvIPSLAMonitoringPol"}
1✔
1039
                if cont.config.AciPolicyTenant == cont.config.AciVrfTenant {
1✔
1040
                        subscribeMo = append(subscribeMo, "hostprotPol")
×
1041
                }
×
1042
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciVrfTenant,
1✔
1043
                        subscribeMo)
1✔
1044
                cont.apicConn.AddSubscriptionDn(fmt.Sprintf("uni/tn-%s/out-%s",
1✔
1045
                        cont.config.AciVrfTenant, cont.config.AciL3Out),
1✔
1046
                        []string{"fvRsCons"})
1✔
1047
                vmmDn := fmt.Sprintf("comp/prov-%s/ctrlr-[%s]-%s/injcont",
1✔
1048
                        cont.env.VmmPolicy(), cont.config.AciVmmDomain,
1✔
1049
                        cont.config.AciVmmController)
1✔
1050
                // Before subscribing to vmm objects, add vmmInjectedLabel as a child after explicit APIC version check
1✔
1051
                // Since it is not supported for APIC versions < "5.0"
1✔
1052
                cont.addVmmInjectedLabel()
1✔
1053
                cont.apicConn.AddSubscriptionDn(vmmDn,
1✔
1054
                        []string{"vmmInjectedHost", "vmmInjectedNs"})
1✔
1055

1✔
1056
                var tnTargetFilter string
1✔
1057
                if len(cont.config.AciVrfRelatedTenants) > 0 {
1✔
1058
                        for _, tn := range cont.config.AciVrfRelatedTenants {
×
1059
                                tnTargetFilter += fmt.Sprintf("tn-%s/|", tn)
×
1060
                        }
×
1061
                } else {
1✔
1062
                        tnTargetFilter += fmt.Sprintf("tn-%s/|tn-%s/",
1✔
1063
                                cont.config.AciPolicyTenant, cont.config.AciVrfTenant)
1✔
1064
                }
1✔
1065
                subnetTargetFilter := fmt.Sprintf("and(wcard(fvSubnet.dn,\"%s\"))",
1✔
1066
                        tnTargetFilter)
1✔
1067
                cont.apicConn.AddSubscriptionClass("fvSubnet",
1✔
1068
                        []string{"fvSubnet"}, subnetTargetFilter)
1✔
1069

1✔
1070
                cont.apicConn.SetSubscriptionHooks("fvSubnet",
1✔
1071
                        func(obj apicapi.ApicObject) bool {
1✔
1072
                                cont.SubnetChanged(obj, cont.config.AciVrfDn)
×
1073
                                return true
×
1074
                        },
×
1075
                        func(dn string) {
×
1076
                                cont.SubnetDeleted(dn)
×
1077
                        })
×
1078

1079
                cont.apicConn.AddSubscriptionClass("opflexODev",
1✔
1080
                        []string{"opflexODev"}, "")
1✔
1081

1✔
1082
                cont.apicConn.SetSubscriptionHooks("opflexODev",
1✔
1083
                        func(obj apicapi.ApicObject) bool {
1✔
1084
                                cont.opflexDeviceChanged(obj)
×
1085
                                return true
×
1086
                        },
×
1087
                        func(dn string) {
×
1088
                                cont.opflexDeviceDeleted(dn)
×
1089
                        })
×
1090

1091
                cont.apicConn.VersionUpdateHook =
1✔
1092
                        func() {
1✔
1093
                                cont.initStaticServiceObjs()
×
1094
                        }
×
1095
        }
1096
        go cont.apicConn.Run(stopCh)
1✔
1097
}
1098

1099
func (cont *AciController) syncNodeAciPods(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1100
        cont.log.Debug("Go routine to periodically check the aci pod information in the opflexOdev of a node")
1✔
1101
        ticker := time.NewTicker(seconds * time.Second)
1✔
1102
        defer ticker.Stop()
1✔
1103

1✔
1104
        for {
2✔
1105
                select {
1✔
1106
                case <-ticker.C:
1✔
1107
                        if cont.config.EnableOpflexAgentReconnect {
1✔
1108
                                cont.checkChangeOfOpflexOdevAciPod()
×
1109
                        }
×
1110
                        if cont.config.AciMultipod {
1✔
1111
                                cont.checkChangeOfOdevAciPod()
×
1112
                        }
×
1113
                case <-stopCh:
1✔
1114
                        return
1✔
1115
                }
1116
        }
1117
}
1118

1119
func (cont *AciController) syncOpflexDevices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1120
        cont.log.Debug("Go routine to periodically delete old opflexdevices started")
1✔
1121
        ticker := time.NewTicker(seconds * time.Second)
1✔
1122
        defer ticker.Stop()
1✔
1123

1✔
1124
        for {
2✔
1125
                select {
1✔
1126
                case <-ticker.C:
1✔
1127
                        cont.deleteOldOpflexDevices()
1✔
1128
                case <-stopCh:
1✔
1129
                        return
1✔
1130
                }
1131
        }
1132
}
1133

1134
func (cont *AciController) syncDelayedEpSlices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1135
        cont.log.Debug("Go routine to periodically check and process the epslices having delay adding in service")
1✔
1136
        ticker := time.NewTicker(seconds * time.Second)
1✔
1137
        defer ticker.Stop()
1✔
1138

1✔
1139
        for {
2✔
1140
                select {
1✔
1141
                case <-ticker.C:
1✔
1142
                        cont.processDelayedEpSlices()
1✔
1143
                case <-stopCh:
1✔
1144
                        return
1✔
1145
                }
1146
        }
1147
}
1148

1149
func (cont *AciController) snatGlobalInfoSync(stopCh <-chan struct{}, seconds int) {
1✔
1150
        time.Sleep(time.Duration(seconds) * time.Second)
1✔
1151
        cont.log.Debug("Go routine to periodically sync globalinfo and nodeinfo started")
1✔
1152
        iteration := 0
1✔
1153
        for {
2✔
1154
                // To avoid noisy logs, only printing once in 5 minutes
1✔
1155
                if iteration%5 == 0 {
2✔
1156
                        cont.log.Debug("Syncing GlobalInfo with Node infos")
1✔
1157
                }
1✔
1158
                var nodeInfos []*nodeinfo.NodeInfo
1✔
1159
                cont.indexMutex.Lock()
1✔
1160
                cache.ListAll(cont.snatNodeInfoIndexer, labels.Everything(),
1✔
1161
                        func(nodeInfoObj interface{}) {
2✔
1162
                                nodeInfo := nodeInfoObj.(*nodeinfo.NodeInfo)
1✔
1163
                                nodeInfos = append(nodeInfos, nodeInfo)
1✔
1164
                        })
1✔
1165
                expectedmap := make(map[string]map[string]bool)
1✔
1166
                for _, glinfo := range cont.snatGlobalInfoCache {
2✔
1167
                        for nodename, entry := range glinfo {
2✔
1168
                                if _, found := expectedmap[nodename]; !found {
2✔
1169
                                        newentry := make(map[string]bool)
1✔
1170
                                        newentry[entry.SnatPolicyName] = true
1✔
1171
                                        expectedmap[nodename] = newentry
1✔
1172
                                } else {
2✔
1173
                                        currententry := expectedmap[nodename]
1✔
1174
                                        currententry[entry.SnatPolicyName] = true
1✔
1175
                                        expectedmap[nodename] = currententry
1✔
1176
                                }
1✔
1177
                        }
1178
                }
1179
                cont.indexMutex.Unlock()
1✔
1180

1✔
1181
                for _, value := range nodeInfos {
2✔
1182
                        marked := false
1✔
1183
                        policyNames := value.Spec.SnatPolicyNames
1✔
1184
                        nodeName := value.ObjectMeta.Name
1✔
1185
                        _, ok := expectedmap[nodeName]
1✔
1186
                        if !ok && len(policyNames) > 0 {
2✔
1187
                                cont.log.Info("Adding missing entry in snatglobalinfo for node: ", nodeName)
1✔
1188
                                cont.log.Debug("No snat policies found in snatglobalinfo")
1✔
1189
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1190
                                marked = true
1✔
1191
                        } else if len(policyNames) != len(expectedmap[nodeName]) {
3✔
1192
                                cont.log.Info("Adding missing snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1193
                                cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1194
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1195
                                marked = true
1✔
1196
                        } else {
2✔
1197
                                if len(policyNames) == 0 && len(expectedmap[nodeName]) == 0 {
1✔
1198
                                        // No snatpolicies present
×
1199
                                        continue
×
1200
                                }
1201
                                eq := reflect.DeepEqual(expectedmap[nodeName], policyNames)
1✔
1202
                                if !eq {
2✔
1203
                                        cont.log.Debug("Syncing inconsistent snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1204
                                        cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1205
                                        cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1206
                                        marked = true
1✔
1207
                                }
1✔
1208
                        }
1209
                        if marked {
2✔
1210
                                cont.log.Info("Nodeinfo and globalinfo out of sync for node: ", nodeName)
1✔
1211
                                nodeinfokey, err := cache.MetaNamespaceKeyFunc(value)
1✔
1212
                                if err != nil {
1✔
1213
                                        cont.log.Error("Not able to get key for node: ", nodeName)
×
1214
                                        continue
×
1215
                                }
1216
                                cont.log.Info("Queuing nodeinfokey for globalinfo sync: ", nodeinfokey)
1✔
1217
                                cont.queueNodeInfoUpdateByKey(nodeinfokey)
1✔
1218
                        } else if iteration%5 == 0 {
2✔
1219
                                cont.log.Info("Nodeinfo and globalinfo in sync for node: ", nodeName)
1✔
1220
                        }
1✔
1221
                }
1222
                time.Sleep(time.Duration(seconds) * time.Second)
1✔
1223
                iteration++
1✔
1224
        }
1225
}
1226

1227
func (cont *AciController) processSyncQueue(queue workqueue.RateLimitingInterface,
1228
        queueStop <-chan struct{}) {
1✔
1229
        go wait.Until(func() {
2✔
1230
                for {
2✔
1231
                        syncType, quit := queue.Get()
1✔
1232
                        if quit {
2✔
1233
                                break
1✔
1234
                        }
1235
                        var requeue bool
1✔
1236
                        if sType, ok := syncType.(string); ok {
2✔
1237
                                if f, ok := cont.syncProcessors[sType]; ok {
2✔
1238
                                        requeue = f()
1✔
1239
                                }
1✔
1240
                        }
1241
                        if requeue {
1✔
1242
                                queue.AddRateLimited(syncType)
×
1243
                        } else {
1✔
1244
                                queue.Forget(syncType)
1✔
1245
                        }
1✔
1246
                        queue.Done(syncType)
1✔
1247
                }
1248
        }, time.Second, queueStop)
1249
        <-queueStop
1✔
1250
        queue.ShutDown()
1✔
1251
}
1252

1253
func (cont *AciController) scheduleSyncGlobalInfo() {
1✔
1254
        cont.syncQueue.AddRateLimited("snatGlobalInfo")
1✔
1255
}
1✔
1256
func (cont *AciController) scheduleRdConfig() {
×
1257
        cont.syncQueue.AddRateLimited("rdConfig")
×
1258
}
×
1259
func (cont *AciController) scheduleCreateIstioCR() {
×
1260
        cont.syncQueue.AddRateLimited("istioCR")
×
1261
}
×
1262

1263
func (cont *AciController) addVmmInjectedLabel() {
1✔
1264
        if apicapi.ApicVersion >= "5.2" {
1✔
1265
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedLabel")
×
1266
                if err != nil {
×
1267
                        panic(err.Error())
×
1268
                }
1269
                err = apicapi.AddMetaDataChild("vmmInjectedSvc", "vmmInjectedLabel")
×
1270
                if err != nil {
×
1271
                        panic(err.Error())
×
1272
                }
1273
        }
1274
        if apicapi.ApicVersion >= "5.0" {
2✔
1275
                err := apicapi.AddMetaDataChild("vmmInjectedReplSet", "vmmInjectedLabel")
1✔
1276
                if err != nil {
1✔
1277
                        panic(err.Error())
×
1278
                }
1279
                err = apicapi.AddMetaDataChild("vmmInjectedContGrp", "vmmInjectedLabel")
1✔
1280
                if err != nil {
1✔
1281
                        panic(err.Error())
×
1282
                }
1283
                err = apicapi.AddMetaDataChild("vmmInjectedDepl", "vmmInjectedLabel")
1✔
1284
                if err != nil {
1✔
1285
                        panic(err.Error())
×
1286
                }
1287
        }
1288
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc