• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 7030

08 Feb 2023 03:59AM UTC coverage: 56.705% (-0.3%) from 57.004%
7030

push

travis-ci-com

web-flow
Merge pull request #1067 from noironetworks/endointslice-update-fix-kmr2

Delete pbr programming of deleted endpoint right away

141 of 141 new or added lines in 2 files covered. (100.0%)

11866 of 20926 relevant lines covered (56.7%)

0.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.29
/pkg/controller/controller.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "encoding/json"
19
        "fmt"
20
        "io/ioutil"
21
        "net"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/sirupsen/logrus"
29
        "github.com/yl2chen/cidranger"
30
        "golang.org/x/time/rate"
31

32
        v1 "k8s.io/api/core/v1"
33
        v1beta1 "k8s.io/api/discovery/v1beta1"
34
        "k8s.io/apimachinery/pkg/labels"
35
        "k8s.io/apimachinery/pkg/util/wait"
36
        "k8s.io/client-go/tools/cache"
37
        "k8s.io/client-go/util/workqueue"
38

39
        "github.com/noironetworks/aci-containers/pkg/apicapi"
40
        "github.com/noironetworks/aci-containers/pkg/index"
41
        "github.com/noironetworks/aci-containers/pkg/ipam"
42
        istiov1 "github.com/noironetworks/aci-containers/pkg/istiocrd/apis/aci.istio/v1"
43
        "github.com/noironetworks/aci-containers/pkg/metadata"
44
        nodeinfo "github.com/noironetworks/aci-containers/pkg/nodeinfo/apis/aci.snat/v1"
45
        rdConfig "github.com/noironetworks/aci-containers/pkg/rdconfig/apis/aci.snat/v1"
46
        snatglobalinfo "github.com/noironetworks/aci-containers/pkg/snatglobalinfo/apis/aci.snat/v1"
47
        "github.com/noironetworks/aci-containers/pkg/util"
48
        "k8s.io/client-go/kubernetes"
49
)
50

51
type podUpdateFunc func(*v1.Pod) (*v1.Pod, error)
52
type nodeUpdateFunc func(*v1.Node) (*v1.Node, error)
53
type serviceUpdateFunc func(*v1.Service) (*v1.Service, error)
54

55
type AciController struct {
56
        log    *logrus.Logger
57
        config *ControllerConfig
58
        env    Environment
59

60
        defaultEg string
61
        defaultSg string
62

63
        unitTestMode bool
64

65
        podQueue          workqueue.RateLimitingInterface
66
        netPolQueue       workqueue.RateLimitingInterface
67
        qosQueue          workqueue.RateLimitingInterface
68
        serviceQueue      workqueue.RateLimitingInterface
69
        snatQueue         workqueue.RateLimitingInterface
70
        netflowQueue      workqueue.RateLimitingInterface
71
        erspanQueue       workqueue.RateLimitingInterface
72
        snatNodeInfoQueue workqueue.RateLimitingInterface
73
        rdConfigQueue     workqueue.RateLimitingInterface
74
        istioQueue        workqueue.RateLimitingInterface
75

76
        namespaceIndexer      cache.Indexer
77
        namespaceInformer     cache.Controller
78
        podIndexer            cache.Indexer
79
        podInformer           cache.Controller
80
        endpointsIndexer      cache.Indexer
81
        endpointsInformer     cache.Controller
82
        serviceIndexer        cache.Indexer
83
        serviceInformer       cache.Controller
84
        replicaSetIndexer     cache.Indexer
85
        replicaSetInformer    cache.Controller
86
        deploymentIndexer     cache.Indexer
87
        deploymentInformer    cache.Controller
88
        nodeIndexer           cache.Indexer
89
        nodeInformer          cache.Controller
90
        networkPolicyIndexer  cache.Indexer
91
        networkPolicyInformer cache.Controller
92
        snatIndexer           cache.Indexer
93
        snatInformer          cache.Controller
94
        snatNodeInfoIndexer   cache.Indexer
95
        snatNodeInformer      cache.Controller
96
        crdInformer           cache.Controller
97
        rdConfigInformer      cache.Controller
98
        rdConfigIndexer       cache.Indexer
99
        qosIndexer            cache.Indexer
100
        qosInformer           cache.Controller
101
        netflowIndexer        cache.Indexer
102
        netflowInformer       cache.Controller
103
        erspanIndexer         cache.Indexer
104
        erspanInformer        cache.Controller
105
        nodePodIfIndexer      cache.Indexer
106
        nodePodIfInformer     cache.Controller
107
        istioIndexer          cache.Indexer
108
        istioInformer         cache.Controller
109
        endpointSliceIndexer  cache.Indexer
110
        endpointSliceInformer cache.Controller
111
        snatCfgInformer       cache.Controller
112
        updatePod             podUpdateFunc
113
        updateNode            nodeUpdateFunc
114
        updateServiceStatus   serviceUpdateFunc
115

116
        indexMutex sync.Mutex
117

118
        configuredPodNetworkIps *netIps
119
        podNetworkIps           *netIps
120
        serviceIps              *ipam.IpCache
121
        staticServiceIps        *netIps
122
        nodeServiceIps          *netIps
123

124
        // index of pods matched by deployments
125
        depPods *index.PodSelectorIndex
126
        // index of pods matched by network policies
127
        netPolPods *index.PodSelectorIndex
128
        // index of pods matched by network policy ingress rules
129
        netPolIngressPods *index.PodSelectorIndex
130
        // index of pods matched by network policy egress rules
131
        netPolEgressPods *index.PodSelectorIndex
132
        // index of IP addresses contained in endpoints objects
133
        endpointsIpIndex cidranger.Ranger
134
        // index of service target ports
135
        targetPortIndex map[string]*portIndexEntry
136
        // index of ip blocks referenced by network policy egress rules
137
        netPolSubnetIndex cidranger.Ranger
138
        // index of pods matched by erspan policies
139
        erspanPolPods *index.PodSelectorIndex
140

141
        apicConn *apicapi.ApicConnection
142

143
        nodeServiceMetaCache map[string]*nodeServiceMeta
144
        nodeOpflexDevice     map[string]apicapi.ApicSlice
145
        nodePodNetCache      map[string]*nodePodNetMeta
146
        serviceMetaCache     map[string]*serviceMeta
147
        snatPolicyCache      map[string]*ContSnatPolicy
148
        delayedEpSlices      []*DelayedEpSlice
149
        snatServices         map[string]bool
150
        snatNodeInfoCache    map[string]*nodeinfo.NodeInfo
151
        rdConfigCache        map[string]*rdConfig.RdConfig
152
        rdConfigSubnetCache  map[string]*rdConfig.RdConfigSpec
153
        istioCache           map[string]*istiov1.AciIstioOperator
154
        podIftoEp            map[string]*EndPointData
155
        // Node Name and Policy Name
156
        snatGlobalInfoCache map[string]map[string]*snatglobalinfo.GlobalInfo
157
        nodeSyncEnabled     bool
158
        serviceSyncEnabled  bool
159
        snatSyncEnabled     bool
160
        tunnelGetter        *tunnelState
161
        syncQueue           workqueue.RateLimitingInterface
162
        syncProcessors      map[string]func() bool
163
        serviceEndPoints    ServiceEndPointType
164
        crdHandlers         map[string]func(*AciController, <-chan struct{})
165
        stopCh              <-chan struct{}
166
        //index of containerportname to ctrPortNameEntry
167
        ctrPortNameCache map[string]*ctrPortNameEntry
168
        // named networkPolicies
169
        nmPortNp map[string]bool
170
        // cache to look for Epg DNs which are bound to Vmm domain
171
        cachedEpgDns             []string
172
        vmmClusterFaultSupported bool
173
}
174

175
type DelayedEpSlice struct {
176
        ServiceKey  string
177
        OldEpSlice  *v1beta1.EndpointSlice
178
        NewEpSlice  *v1beta1.EndpointSlice
179
        DelayedTime time.Time
180
}
181

182
type nodeServiceMeta struct {
183
        serviceEp metadata.ServiceEndpoint
184
}
185

186
type nodePodNetMeta struct {
187
        nodePods            map[string]bool
188
        podNetIps           metadata.NetIps
189
        podNetIpsAnnotation string
190
}
191

192
type serviceMeta struct {
193
        requestedIp      net.IP
194
        ingressIps       []net.IP
195
        staticIngressIps []net.IP
196
}
197

198
type ipIndexEntry struct {
199
        ipNet net.IPNet
200
        keys  map[string]bool
201
}
202

203
type targetPort struct {
204
        proto v1.Protocol
205
        ports []int
206
}
207

208
type portIndexEntry struct {
209
        port              targetPort
210
        serviceKeys       map[string]bool
211
        networkPolicyKeys map[string]bool
212
}
213

214
type portRangeSnat struct {
215
        start int
216
        end   int
217
}
218

219
//EndPointData holds PodIF data in controller.
220
type EndPointData struct {
221
        MacAddr    string
222
        EPG        string
223
        Namespace  string
224
        AppProfile string
225
}
226

227
type ctrPortNameEntry struct {
228
        // Proto+port->pods
229
        ctrNmpToPods map[string]map[string]bool
230
}
231

232
type ServiceEndPointType interface {
233
        InitClientInformer(kubeClient *kubernetes.Clientset)
234
        Run(stopCh <-chan struct{})
235
        Wait(stopCh <-chan struct{})
236
        UpdateServicesForNode(nodename string)
237
        GetnodesMetadata(key string, service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint)
238
        SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool
239
        SetNpServiceAugmentForService(servicekey string, service *v1.Service, prs *portRemoteSubnet,
240
                portAugments map[string]*portServiceAugment, subnetIndex cidranger.Ranger, logger *logrus.Entry)
241
}
242

243
type serviceEndpoint struct {
244
        cont *AciController
245
}
246
type serviceEndpointSlice struct {
247
        cont *AciController
248
}
249

250
func (sep *serviceEndpoint) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
251
        sep.cont.initEndpointsInformerFromClient(kubeClient)
×
252
}
×
253

254
func (seps *serviceEndpointSlice) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
255
        seps.cont.initEndpointSliceInformerFromClient(kubeClient)
×
256
}
×
257

258
func (sep *serviceEndpoint) Run(stopCh <-chan struct{}) {
1✔
259
        go sep.cont.endpointsInformer.Run(stopCh)
1✔
260
}
1✔
261

262
func (seps *serviceEndpointSlice) Run(stopCh <-chan struct{}) {
1✔
263
        go seps.cont.endpointSliceInformer.Run(stopCh)
1✔
264
}
1✔
265

266
func (sep *serviceEndpoint) Wait(stopCh <-chan struct{}) {
1✔
267
        cache.WaitForCacheSync(stopCh,
1✔
268
                sep.cont.endpointsInformer.HasSynced,
1✔
269
                sep.cont.serviceInformer.HasSynced)
1✔
270
}
1✔
271

272
func (seps *serviceEndpointSlice) Wait(stopCh <-chan struct{}) {
1✔
273
        seps.cont.log.Debug("Waiting for EndPointSlicecache sync")
1✔
274
        cache.WaitForCacheSync(stopCh,
1✔
275
                seps.cont.endpointSliceInformer.HasSynced,
1✔
276
                seps.cont.serviceInformer.HasSynced)
1✔
277
}
1✔
278

279
func (e *ipIndexEntry) Network() net.IPNet {
1✔
280
        return e.ipNet
1✔
281
}
1✔
282

283
func newNodePodNetMeta() *nodePodNetMeta {
1✔
284
        return &nodePodNetMeta{
1✔
285
                nodePods: make(map[string]bool),
1✔
286
        }
1✔
287
}
1✔
288

289
func createQueue(name string) workqueue.RateLimitingInterface {
1✔
290
        return workqueue.NewNamedRateLimitingQueue(
1✔
291
                workqueue.NewMaxOfRateLimiter(
1✔
292
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
293
                                10*time.Second),
1✔
294
                        &workqueue.BucketRateLimiter{
1✔
295
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
296
                        },
1✔
297
                ),
1✔
298
                "delta")
1✔
299
}
1✔
300

301
func NewController(config *ControllerConfig, env Environment, log *logrus.Logger, unittestmode bool) *AciController {
1✔
302
        cont := &AciController{
1✔
303
                log:          log,
1✔
304
                config:       config,
1✔
305
                env:          env,
1✔
306
                defaultEg:    "",
1✔
307
                defaultSg:    "",
1✔
308
                unitTestMode: unittestmode,
1✔
309

1✔
310
                podQueue:          createQueue("pod"),
1✔
311
                netPolQueue:       createQueue("networkPolicy"),
1✔
312
                qosQueue:          createQueue("qos"),
1✔
313
                netflowQueue:      createQueue("netflow"),
1✔
314
                erspanQueue:       createQueue("erspan"),
1✔
315
                serviceQueue:      createQueue("service"),
1✔
316
                snatQueue:         createQueue("snat"),
1✔
317
                snatNodeInfoQueue: createQueue("snatnodeinfo"),
1✔
318
                rdConfigQueue:     createQueue("rdconfig"),
1✔
319
                istioQueue:        createQueue("istio"),
1✔
320
                syncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
321
                        &workqueue.BucketRateLimiter{
1✔
322
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
323
                        }, "sync"),
1✔
324

1✔
325
                configuredPodNetworkIps: newNetIps(),
1✔
326
                podNetworkIps:           newNetIps(),
1✔
327
                serviceIps:              ipam.NewIpCache(),
1✔
328
                staticServiceIps:        newNetIps(),
1✔
329
                nodeServiceIps:          newNetIps(),
1✔
330

1✔
331
                nodeOpflexDevice: make(map[string]apicapi.ApicSlice),
1✔
332

1✔
333
                nodeServiceMetaCache: make(map[string]*nodeServiceMeta),
1✔
334
                nodePodNetCache:      make(map[string]*nodePodNetMeta),
1✔
335
                serviceMetaCache:     make(map[string]*serviceMeta),
1✔
336
                snatPolicyCache:      make(map[string]*ContSnatPolicy),
1✔
337
                snatServices:         make(map[string]bool),
1✔
338
                snatNodeInfoCache:    make(map[string]*nodeinfo.NodeInfo),
1✔
339
                rdConfigCache:        make(map[string]*rdConfig.RdConfig),
1✔
340
                rdConfigSubnetCache:  make(map[string]*rdConfig.RdConfigSpec),
1✔
341
                podIftoEp:            make(map[string]*EndPointData),
1✔
342
                snatGlobalInfoCache:  make(map[string]map[string]*snatglobalinfo.GlobalInfo),
1✔
343
                istioCache:           make(map[string]*istiov1.AciIstioOperator),
1✔
344
                crdHandlers:          make(map[string]func(*AciController, <-chan struct{})),
1✔
345
                ctrPortNameCache:     make(map[string]*ctrPortNameEntry),
1✔
346
                nmPortNp:             make(map[string]bool),
1✔
347
        }
1✔
348
        cont.syncProcessors = map[string]func() bool{
1✔
349
                "snatGlobalInfo": cont.syncSnatGlobalInfo,
1✔
350
                "rdConfig":       cont.syncRdConfig,
1✔
351
                "istioCR":        cont.createIstioCR,
1✔
352
        }
1✔
353
        return cont
1✔
354
}
1✔
355

356
func (cont *AciController) Init() {
×
357
        if cont.config.LBType != lbTypeAci {
×
358
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedNwPol")
×
359
                if err != nil {
×
360
                        panic(err.Error())
×
361
                }
362
        }
363

364
        egdata, err := json.Marshal(cont.config.DefaultEg)
×
365
        if err != nil {
×
366
                cont.log.Error("Could not serialize default endpoint group")
×
367
                panic(err.Error())
×
368
        }
369
        cont.defaultEg = string(egdata)
×
370

×
371
        sgdata, err := json.Marshal(cont.config.DefaultSg)
×
372
        if err != nil {
×
373
                cont.log.Error("Could not serialize default security groups")
×
374
                panic(err.Error())
×
375
        }
376
        cont.defaultSg = string(sgdata)
×
377

×
378
        cont.log.Debug("Initializing IPAM")
×
379
        cont.initIpam()
×
380
        // check if the cluster supports endpoint slices
×
381
        // if cluster doesn't have the support fallback to endpoints
×
382
        kubeClient := cont.env.(*K8sEnvironment).kubeClient
×
383
        if util.IsEndPointSlicesSupported(kubeClient) {
×
384
                cont.serviceEndPoints = &serviceEndpointSlice{}
×
385
                cont.serviceEndPoints.(*serviceEndpointSlice).cont = cont
×
386
                cont.log.Info("Initializing ServiceEndpointSlices")
×
387
        } else {
×
388
                cont.serviceEndPoints = &serviceEndpoint{}
×
389
                cont.serviceEndPoints.(*serviceEndpoint).cont = cont
×
390
                cont.log.Info("Initializing ServiceEndpoints")
×
391
        }
×
392

393
        err = cont.env.Init(cont)
×
394
        if err != nil {
×
395
                panic(err.Error())
×
396
        }
397
}
398

399
func (cont *AciController) processQueue(queue workqueue.RateLimitingInterface,
400
        store cache.Store, handler func(interface{}) bool,
401
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
402
        go wait.Until(func() {
2✔
403
                for {
2✔
404
                        key, quit := queue.Get()
1✔
405
                        if quit {
2✔
406
                                break
1✔
407
                        }
408

409
                        var requeue bool
1✔
410
                        switch key := key.(type) {
1✔
411
                        case chan struct{}:
×
412
                                close(key)
×
413
                        case string:
1✔
414
                                obj, exists, err := store.GetByKey(key)
1✔
415
                                if err != nil {
1✔
416
                                        cont.log.Debugf("Error fetching object with key %s from store: %v", key, err)
×
417
                                }
×
418
                                //Handle Add/Update/Delete
419
                                if exists && handler != nil {
2✔
420
                                        requeue = handler(obj)
1✔
421
                                }
1✔
422
                                //Handle Post Delete
423
                                if !exists && postDelHandler != nil {
1✔
424
                                        requeue = postDelHandler()
×
425
                                }
×
426
                        }
427
                        if requeue {
2✔
428
                                queue.AddRateLimited(key)
1✔
429
                        } else {
2✔
430
                                queue.Forget(key)
1✔
431
                        }
1✔
432
                        queue.Done(key)
1✔
433

434
                }
435
        }, time.Second, stopCh)
436
        <-stopCh
1✔
437
        queue.ShutDown()
1✔
438
}
439

440
func (cont *AciController) globalStaticObjs() apicapi.ApicSlice {
1✔
441
        return apicapi.ApicSlice{}
1✔
442
}
1✔
443

444
func (cont *AciController) aciNameForKey(ktype string, key string) string {
1✔
445
        return util.AciNameForKey(cont.config.AciPrefix, ktype, key)
1✔
446
}
1✔
447

448
func (cont *AciController) initStaticObjs() {
1✔
449
        cont.env.InitStaticAciObjects()
1✔
450
        cont.apicConn.WriteApicObjects(cont.config.AciPrefix+"_static",
1✔
451
                cont.globalStaticObjs())
1✔
452
}
1✔
453

454
func (cont *AciController) vmmDomainProvider() (vmmProv string) {
1✔
455
        vmmProv = "Kubernetes"
1✔
456
        if strings.ToLower(cont.config.AciVmmDomainType) == "openshift" {
1✔
457
                vmmProv = "OpenShift"
×
458
        }
×
459
        return
1✔
460
}
461

462
func (cont *AciController) Run(stopCh <-chan struct{}) {
1✔
463
        var err error
1✔
464
        var privKey []byte
1✔
465
        var apicCert []byte
1✔
466

1✔
467
        cont.config.AciVrfDn = "uni/tn-" + cont.config.AciVrfTenant + "/ctx-" + cont.config.AciVrf
1✔
468

1✔
469
        if cont.config.ApicPrivateKeyPath != "" {
1✔
470
                privKey, err = ioutil.ReadFile(cont.config.ApicPrivateKeyPath)
×
471
                if err != nil {
×
472
                        panic(err)
×
473
                }
474
        }
475
        if cont.config.ApicCertPath != "" {
1✔
476
                apicCert, err = ioutil.ReadFile(cont.config.ApicCertPath)
×
477
                if err != nil {
×
478
                        panic(err)
×
479
                }
480
        }
481
        // If not defined, default is 1800
482
        if cont.config.ApicRefreshTimer == "" {
2✔
483
                cont.config.ApicRefreshTimer = "1800"
1✔
484
        }
1✔
485
        refreshTimeout, err := strconv.Atoi(cont.config.ApicRefreshTimer)
1✔
486
        if err != nil {
1✔
487
                panic(err)
×
488
        }
489
        cont.log.Info("ApicRefreshTimer conf is set to: ", refreshTimeout)
1✔
490

1✔
491
        // Bailout if the refreshTimeout is more than 12Hours or less than 5Mins
1✔
492
        if refreshTimeout > (12*60*60) || refreshTimeout < (5*60) {
1✔
493
                cont.log.Info("ApicRefreshTimer can't be more than 12Hrs or less than 5Mins")
×
494
                panic(err)
×
495
        }
496

497
        // If RefreshTickerAdjustInterval is not defined, default to 150Sec.
498
        if cont.config.ApicRefreshTickerAdjust == "" {
2✔
499
                cont.config.ApicRefreshTickerAdjust = "150"
1✔
500
        }
1✔
501
        refreshTickerAdjust, err := strconv.Atoi(cont.config.ApicRefreshTickerAdjust)
1✔
502
        if err != nil {
1✔
503
                panic(err)
×
504
        }
505

506
        //If ApicSubscriptionDelay is not defined, default to 100ms
507
        if cont.config.ApicSubscriptionDelay == 0 {
2✔
508
                cont.config.ApicSubscriptionDelay = 100
1✔
509
        }
1✔
510
        cont.log.Info("ApicSubscriptionDelay conf is set to: ", cont.config.ApicSubscriptionDelay)
1✔
511

1✔
512
        // If OpflexDeviceDeleteTimeout is not defined, default to 1800s
1✔
513
        if cont.config.OpflexDeviceDeleteTimeout == 0 {
2✔
514
                cont.config.OpflexDeviceDeleteTimeout = 1800
1✔
515
        }
1✔
516

517
        // If SleepTimeSnatGlobalInfoSync is not defined, default to 60
518
        if cont.config.SleepTimeSnatGlobalInfoSync == 0 {
2✔
519
                cont.config.SleepTimeSnatGlobalInfoSync = 60
1✔
520
        }
1✔
521

522
        // If not defined, default to 32
523
        if cont.config.PodIpPoolChunkSize == 0 {
2✔
524
                cont.config.PodIpPoolChunkSize = 32
1✔
525
        }
1✔
526
        cont.log.Info("PodIpPoolChunkSize conf is set to: ", cont.config.PodIpPoolChunkSize)
1✔
527

1✔
528
        // If not valid, default to 5000-65000
1✔
529
        // other permissible values 1-65000
1✔
530
        defStart := 5000
1✔
531
        defEnd := 65000
1✔
532
        if cont.config.SnatDefaultPortRangeStart == 0 {
2✔
533
                cont.config.SnatDefaultPortRangeStart = defStart
1✔
534
        }
1✔
535
        if cont.config.SnatDefaultPortRangeEnd == 0 {
2✔
536
                cont.config.SnatDefaultPortRangeEnd = defEnd
1✔
537
        }
1✔
538
        if cont.config.SnatDefaultPortRangeStart < 0 || cont.config.SnatDefaultPortRangeEnd < 0 ||
1✔
539
                cont.config.SnatDefaultPortRangeStart > defEnd || cont.config.SnatDefaultPortRangeEnd > defEnd ||
1✔
540
                cont.config.SnatDefaultPortRangeStart > cont.config.SnatDefaultPortRangeEnd {
1✔
541
                cont.config.SnatDefaultPortRangeStart = defStart
×
542
                cont.config.SnatDefaultPortRangeEnd = defEnd
×
543
        }
×
544

545
        // Set default value for pbr programming delay if services list is not empty
546
        // and delay value is empty
547
        if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 &&
1✔
548
                cont.config.ServiceGraphEndpointAddDelay.Services != nil &&
1✔
549
                len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 {
1✔
550
                cont.config.ServiceGraphEndpointAddDelay.Delay = 90
×
551
        }
×
552
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
553
                cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay)
×
554
        }
×
555

556
        // Set contract scope for snat svc graph to global by default
557
        if cont.config.SnatSvcContractScope == "" {
2✔
558
                cont.config.SnatSvcContractScope = "global"
1✔
559
        }
1✔
560
        if cont.config.MaxSvcGraphNodes == 0 {
2✔
561
                cont.config.MaxSvcGraphNodes = 32
1✔
562
        }
1✔
563
        cont.log.Info("Max number of nodes per svc graph is set to: ", cont.config.MaxSvcGraphNodes)
1✔
564

1✔
565
        cont.apicConn, err = apicapi.New(cont.log, cont.config.ApicHosts,
1✔
566
                cont.config.ApicUsername, cont.config.ApicPassword,
1✔
567
                privKey, apicCert, cont.config.AciPrefix,
1✔
568
                refreshTimeout, refreshTickerAdjust, cont.config.ApicSubscriptionDelay)
1✔
569
        if err != nil {
1✔
570
                panic(err)
×
571
        }
572

573
        if len(cont.config.ApicHosts) != 0 {
1✔
574
                cont.log.WithFields(logrus.Fields{
×
575
                        "mod":  "APICAPI",
×
576
                        "host": cont.apicConn.Apic[cont.apicConn.ApicIndex],
×
577
                }).Debug("Connecting to APIC to determine the Version")
×
578

×
579
                version, err := cont.apicConn.GetVersion()
×
580
                if err != nil {
×
581
                        cont.log.Error("Could not get APIC version")
×
582
                        panic(err)
×
583
                }
584
                cont.apicConn.CachedVersion = version
×
585
                apicapi.ApicVersion = version
×
586
                if version >= "4.2(4i)" {
×
587
                        cont.apicConn.SnatPbrFltrChain = true
×
588
                } else {
×
589
                        cont.apicConn.SnatPbrFltrChain = false
×
590
                }
×
591
                if version >= "5.2" {
×
592
                        cont.vmmClusterFaultSupported = true
×
593
                }
×
594

595
        } else { // For unit-tests
1✔
596
                cont.apicConn.SnatPbrFltrChain = true
1✔
597
        }
1✔
598

599
        cont.log.Debug("SnatPbrFltrChain set to:", cont.apicConn.SnatPbrFltrChain)
1✔
600
        // Make sure Pod/NodeBDs are assoicated to same VRF.
1✔
601
        if len(cont.config.ApicHosts) != 0 && cont.config.AciPodBdDn != "" && cont.config.AciNodeBdDn != "" {
1✔
602
                var expectedVrfRelations []string
×
603
                expectedVrfRelations = append(expectedVrfRelations, cont.config.AciPodBdDn, cont.config.AciNodeBdDn)
×
604
                cont.log.Debug("expectedVrfRelations:", expectedVrfRelations)
×
605
                err = cont.apicConn.ValidateAciVrfAssociation(cont.config.AciVrfDn, expectedVrfRelations)
×
606
                if err != nil {
×
607
                        cont.log.Error("Pod/NodeBDs and AciL3Out VRF association is incorrect")
×
608
                        panic(err)
×
609
                }
610
        }
611

612
        if len(cont.config.ApicHosts) != 0 && cont.vmmClusterFaultSupported {
1✔
613
                //Clear fault instances when the controller starts
×
614
                cont.clearFaultInstances()
×
615
                //Subscribe for vmmEpPD for a given domain
×
616
                var tnTargetFilterEpg string
×
617
                tnTargetFilterEpg += fmt.Sprintf("uni/vmmp-%s/dom-%s", cont.vmmDomainProvider(), cont.config.AciVmmDomain)
×
618
                subnetTargetFilterEpg := fmt.Sprintf("and(wcard(vmmEpPD.dn,\"%s\"))", tnTargetFilterEpg)
×
619
                cont.apicConn.AddSubscriptionClass("vmmEpPD",
×
620
                        []string{"vmmEpPD"}, subnetTargetFilterEpg)
×
621
                cont.apicConn.SetSubscriptionHooks("vmmEpPD",
×
622
                        func(obj apicapi.ApicObject) bool {
×
623
                                cont.vmmEpPDChanged(obj)
×
624
                                return true
×
625
                        },
×
626
                        func(dn string) {
×
627
                                cont.vmmEpPDDeleted(dn)
×
628
                        })
×
629

630
        }
631

632
        cont.initStaticObjs()
1✔
633
        err = cont.env.PrepareRun(stopCh)
1✔
634
        if err != nil {
1✔
635
                panic(err.Error())
×
636
        }
637

638
        cont.apicConn.FullSyncHook = func() {
1✔
639
                // put a channel into each work queue and wait on it to
×
640
                // checkpoint object syncing in response to new subscription
×
641
                // updates
×
642
                cont.log.Debug("Starting checkpoint")
×
643
                var chans []chan struct{}
×
644
                qs := make([]workqueue.RateLimitingInterface, 0)
×
645
                _, ok := cont.env.(*K8sEnvironment)
×
646
                if ok {
×
647
                        qs = []workqueue.RateLimitingInterface{
×
648
                                cont.podQueue, cont.netPolQueue, cont.qosQueue,
×
649
                                cont.serviceQueue, cont.snatQueue, cont.netflowQueue,
×
650
                                cont.snatNodeInfoQueue, cont.rdConfigQueue, cont.erspanQueue,
×
651
                        }
×
652
                }
×
653
                for _, q := range qs {
×
654
                        c := make(chan struct{})
×
655
                        chans = append(chans, c)
×
656
                        q.Add(c)
×
657
                }
×
658
                for _, c := range chans {
×
659
                        <-c
×
660
                }
×
661
                cont.log.Debug("Checkpoint complete")
×
662
        }
663

664
        if len(cont.config.ApicHosts) != 0 {
1✔
665
                cont.BuildSubnetDnCache(cont.config.AciVrfDn, cont.config.AciVrfDn)
×
666
                cont.scheduleRdConfig()
×
667
        }
×
668

669
        cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
670
                []string{"hostprotPol"})
1✔
671
        cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciVrfTenant,
1✔
672
                []string{"fvBD", "vnsLDevVip", "vnsAbsGraph", "vnsLDevCtx",
1✔
673
                        "vzFilter", "vzBrCP", "l3extInstP", "vnsSvcRedirectPol",
1✔
674
                        "vnsRedirectHealthGroup", "fvIPSLAMonitoringPol"})
1✔
675
        cont.apicConn.AddSubscriptionDn(fmt.Sprintf("uni/tn-%s/out-%s",
1✔
676
                cont.config.AciVrfTenant, cont.config.AciL3Out),
1✔
677
                []string{"fvRsCons"})
1✔
678
        vmmDn := fmt.Sprintf("comp/prov-%s/ctrlr-[%s]-%s/injcont",
1✔
679
                cont.env.VmmPolicy(), cont.config.AciVmmDomain,
1✔
680
                cont.config.AciVmmController)
1✔
681
        // Before subscribing to vmm objects, add vmmInjectedLabel as a child after explicit APIC version check
1✔
682
        // Since it is not supported for APIC versions < "5.0"
1✔
683
        cont.addVmmInjectedLabel()
1✔
684
        cont.apicConn.AddSubscriptionDn(vmmDn,
1✔
685
                []string{"vmmInjectedHost", "vmmInjectedNs"})
1✔
686

1✔
687
        var tnTargetFilter string
1✔
688
        if len(cont.config.AciVrfRelatedTenants) > 0 {
1✔
689
                for _, tn := range cont.config.AciVrfRelatedTenants {
×
690
                        tnTargetFilter += fmt.Sprintf("tn-%s|", tn)
×
691
                }
×
692
        } else {
1✔
693
                tnTargetFilter += fmt.Sprintf("tn-%s|tn-%s",
1✔
694
                        cont.config.AciPolicyTenant, cont.config.AciVrfTenant)
1✔
695
        }
1✔
696
        subnetTargetFilter := fmt.Sprintf("and(wcard(fvSubnet.dn,\"%s\"))",
1✔
697
                tnTargetFilter)
1✔
698
        cont.apicConn.AddSubscriptionClass("fvSubnet",
1✔
699
                []string{"fvSubnet"}, subnetTargetFilter)
1✔
700

1✔
701
        cont.apicConn.SetSubscriptionHooks("fvSubnet",
1✔
702
                func(obj apicapi.ApicObject) bool {
1✔
703
                        cont.SubnetChanged(obj, cont.config.AciVrfDn)
×
704
                        return true
×
705
                },
×
706
                func(dn string) {
×
707
                        cont.SubnetDeleted(dn)
×
708
                })
×
709

710
        cont.apicConn.AddSubscriptionClass("opflexODev",
1✔
711
                []string{"opflexODev"}, "")
1✔
712

1✔
713
        cont.apicConn.SetSubscriptionHooks("opflexODev",
1✔
714
                func(obj apicapi.ApicObject) bool {
1✔
715
                        cont.opflexDeviceChanged(obj)
×
716
                        return true
×
717
                },
×
718
                func(dn string) {
×
719
                        cont.opflexDeviceDeleted(dn)
×
720
                })
×
721
        go cont.apicConn.Run(stopCh)
1✔
722
}
723

724
func (cont *AciController) syncOpflexDevices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
725
        cont.log.Debug("Go routine to periodically delete old opflexdevices started")
1✔
726
        ticker := time.NewTicker(seconds * time.Second)
1✔
727
        defer ticker.Stop()
1✔
728

1✔
729
        for {
2✔
730
                select {
1✔
731
                case <-ticker.C:
1✔
732
                        cont.deleteOldOpflexDevices()
1✔
733
                case <-stopCh:
1✔
734
                        return
1✔
735
                }
736
        }
737
}
738

739
func (cont *AciController) syncDelayedEpSlices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
740
        cont.log.Debug("Go routine to periodically check and process the epslices having delay adding in service")
1✔
741
        ticker := time.NewTicker(seconds * time.Second)
1✔
742
        defer ticker.Stop()
1✔
743

1✔
744
        for {
2✔
745
                select {
1✔
746
                case <-ticker.C:
1✔
747
                        cont.processDelayedEpSlices()
1✔
748
                case <-stopCh:
1✔
749
                        return
1✔
750
                }
751
        }
752
}
753

754
func (cont *AciController) snatGlobalInfoSync(stopCh <-chan struct{}, seconds int) {
1✔
755
        time.Sleep(time.Duration(seconds) * time.Second)
1✔
756
        cont.log.Debug("Go routine to periodically sync globalinfo and nodeinfo started")
1✔
757
        iteration := 0
1✔
758
        for {
2✔
759
                // To avoid noisy logs, only printing once in 5 minutes
1✔
760
                if iteration%5 == 0 {
2✔
761
                        cont.log.Debug("Syncing GlobalInfo with Node infos")
1✔
762
                }
1✔
763
                var nodeInfos []*nodeinfo.NodeInfo
1✔
764
                cont.indexMutex.Lock()
1✔
765
                cache.ListAll(cont.snatNodeInfoIndexer, labels.Everything(),
1✔
766
                        func(nodeInfoObj interface{}) {
2✔
767
                                nodeInfo := nodeInfoObj.(*nodeinfo.NodeInfo)
1✔
768
                                nodeInfos = append(nodeInfos, nodeInfo)
1✔
769
                        })
1✔
770
                expectedmap := make(map[string]map[string]bool)
1✔
771
                for _, glinfo := range cont.snatGlobalInfoCache {
2✔
772
                        for nodename, entry := range glinfo {
2✔
773
                                if _, found := expectedmap[nodename]; !found {
2✔
774
                                        newentry := make(map[string]bool)
1✔
775
                                        newentry[entry.SnatPolicyName] = true
1✔
776
                                        expectedmap[nodename] = newentry
1✔
777
                                } else {
2✔
778
                                        currententry := expectedmap[nodename]
1✔
779
                                        currententry[entry.SnatPolicyName] = true
1✔
780
                                        expectedmap[nodename] = currententry
1✔
781
                                }
1✔
782
                        }
783
                }
784
                cont.indexMutex.Unlock()
1✔
785

1✔
786
                for _, value := range nodeInfos {
2✔
787
                        marked := false
1✔
788
                        policyNames := value.Spec.SnatPolicyNames
1✔
789
                        nodeName := value.ObjectMeta.Name
1✔
790
                        _, ok := expectedmap[nodeName]
1✔
791
                        if !ok && len(policyNames) > 0 {
2✔
792
                                cont.log.Info("Adding missing entry in snatglobalinfo for node: ", nodeName)
1✔
793
                                cont.log.Debug("No snat policies found in snatglobalinfo")
1✔
794
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
795
                                marked = true
1✔
796
                        } else if len(policyNames) != len(expectedmap[nodeName]) {
3✔
797
                                cont.log.Info("Adding missing snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
798
                                cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
799
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
800
                                marked = true
1✔
801
                        } else {
2✔
802
                                if len(policyNames) == 0 && len(expectedmap[nodeName]) == 0 {
1✔
803
                                        // No snatpolicies present
×
804
                                        continue
×
805
                                }
806
                                eq := reflect.DeepEqual(expectedmap[nodeName], policyNames)
1✔
807
                                if !eq {
2✔
808
                                        cont.log.Debug("Syncing inconsistent snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
809
                                        cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
810
                                        cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
811
                                        marked = true
1✔
812
                                }
1✔
813
                        }
814
                        if marked {
2✔
815
                                cont.log.Info("Nodeinfo and globalinfo out of sync for node: ", nodeName)
1✔
816
                                nodeinfokey, err := cache.MetaNamespaceKeyFunc(value)
1✔
817
                                if err != nil {
1✔
818
                                        cont.log.Error("Not able to get key for node: ", nodeName)
×
819
                                        continue
×
820
                                }
821
                                cont.log.Info("Queuing nodeinfokey for globalinfo sync: ", nodeinfokey)
1✔
822
                                cont.queueNodeInfoUpdateByKey(nodeinfokey)
1✔
823
                        } else {
1✔
824
                                if iteration%5 == 0 {
2✔
825
                                        cont.log.Info("Nodeinfo and globalinfo in sync for node: ", nodeName)
1✔
826
                                }
1✔
827
                        }
828
                }
829
                time.Sleep(time.Duration(seconds) * time.Second)
1✔
830
                iteration = iteration + 1
1✔
831
        }
832
}
833

834
func (cont *AciController) processSyncQueue(queue workqueue.RateLimitingInterface,
835
        queueStop <-chan struct{}) {
1✔
836

1✔
837
        go wait.Until(func() {
2✔
838
                for {
2✔
839
                        syncType, quit := queue.Get()
1✔
840
                        if quit {
2✔
841
                                break
1✔
842
                        }
843
                        var requeue bool
1✔
844
                        switch syncType := syncType.(type) {
1✔
845
                        case string:
1✔
846
                                if f, ok := cont.syncProcessors[syncType]; ok {
2✔
847
                                        requeue = f()
1✔
848
                                }
1✔
849
                        }
850
                        if requeue {
1✔
851
                                queue.AddRateLimited(syncType)
×
852
                        } else {
1✔
853
                                queue.Forget(syncType)
1✔
854
                        }
1✔
855
                        queue.Done(syncType)
1✔
856

857
                }
858
        }, time.Second, queueStop)
859
        <-queueStop
1✔
860
        queue.ShutDown()
1✔
861
}
862

863
func (cont *AciController) scheduleSyncGlobalInfo() {
1✔
864
        cont.syncQueue.AddRateLimited("snatGlobalInfo")
1✔
865
}
1✔
866
func (cont *AciController) scheduleRdConfig() {
×
867
        cont.syncQueue.AddRateLimited("rdConfig")
×
868
}
×
869
func (cont *AciController) scheduleCreateIstioCR() {
×
870
        cont.syncQueue.AddRateLimited("istioCR")
×
871
}
×
872

873
func (cont *AciController) addVmmInjectedLabel() {
1✔
874
        if apicapi.ApicVersion >= "5.2" {
1✔
875
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedLabel")
×
876
                if err != nil {
×
877
                        panic(err.Error())
×
878
                }
879
                err = apicapi.AddMetaDataChild("vmmInjectedSvc", "vmmInjectedLabel")
×
880
                if err != nil {
×
881
                        panic(err.Error())
×
882
                }
883
        }
884
        if apicapi.ApicVersion >= "5.0" {
1✔
885
                err := apicapi.AddMetaDataChild("vmmInjectedReplSet", "vmmInjectedLabel")
×
886
                if err != nil {
×
887
                        panic(err.Error())
×
888
                }
889
                err = apicapi.AddMetaDataChild("vmmInjectedContGrp", "vmmInjectedLabel")
×
890
                if err != nil {
×
891
                        panic(err.Error())
×
892
                }
893
                err = apicapi.AddMetaDataChild("vmmInjectedDepl", "vmmInjectedLabel")
×
894
                if err != nil {
×
895
                        panic(err.Error())
×
896
                }
897
        }
898
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc