• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 9570

12 Aug 2024 05:02PM UTC coverage: 70.797% (-0.3%) from 71.095%
9570

push

travis-pro

web-flow
Merge pull request #1360 from noironetworks/typo-fix

Typo fix in unknown-mac-unicast-action

13057 of 18443 relevant lines covered (70.8%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.73
/pkg/controller/controller.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "encoding/json"
19
        "fmt"
20
        "net"
21
        "os"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/sirupsen/logrus"
29
        "github.com/yl2chen/cidranger"
30
        "golang.org/x/time/rate"
31

32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        v1net "k8s.io/api/networking/v1"
35
        "k8s.io/apimachinery/pkg/labels"
36
        "k8s.io/apimachinery/pkg/util/wait"
37
        "k8s.io/client-go/kubernetes"
38
        "k8s.io/client-go/tools/cache"
39
        "k8s.io/client-go/util/workqueue"
40

41
        "github.com/noironetworks/aci-containers/pkg/apicapi"
42
        fabattv1 "github.com/noironetworks/aci-containers/pkg/fabricattachment/apis/aci.fabricattachment/v1"
43
        fabattclset "github.com/noironetworks/aci-containers/pkg/fabricattachment/clientset/versioned"
44
        hppv1 "github.com/noironetworks/aci-containers/pkg/hpp/apis/aci.hpp/v1"
45
        "github.com/noironetworks/aci-containers/pkg/index"
46
        "github.com/noironetworks/aci-containers/pkg/ipam"
47
        istiov1 "github.com/noironetworks/aci-containers/pkg/istiocrd/apis/aci.istio/v1"
48
        "github.com/noironetworks/aci-containers/pkg/metadata"
49
        nodeinfo "github.com/noironetworks/aci-containers/pkg/nodeinfo/apis/aci.snat/v1"
50
        rdConfig "github.com/noironetworks/aci-containers/pkg/rdconfig/apis/aci.snat/v1"
51
        snatglobalinfo "github.com/noironetworks/aci-containers/pkg/snatglobalinfo/apis/aci.snat/v1"
52
        "github.com/noironetworks/aci-containers/pkg/util"
53
)
54

55
type podUpdateFunc func(*v1.Pod) (*v1.Pod, error)
56
type nodeUpdateFunc func(*v1.Node) (*v1.Node, error)
57
type serviceUpdateFunc func(*v1.Service) (*v1.Service, error)
58
type listNetworkPoliciesFunc func(string) (*v1net.NetworkPolicyList, error)
59
type listNamespacesFunc func() (*v1.NamespaceList, error)
60

61
type AciController struct {
62
        log    *logrus.Logger
63
        config *ControllerConfig
64
        env    Environment
65

66
        defaultEg string
67
        defaultSg string
68

69
        unitTestMode bool
70

71
        podQueue            workqueue.RateLimitingInterface
72
        netPolQueue         workqueue.RateLimitingInterface
73
        qosQueue            workqueue.RateLimitingInterface
74
        serviceQueue        workqueue.RateLimitingInterface
75
        snatQueue           workqueue.RateLimitingInterface
76
        netflowQueue        workqueue.RateLimitingInterface
77
        erspanQueue         workqueue.RateLimitingInterface
78
        snatNodeInfoQueue   workqueue.RateLimitingInterface
79
        rdConfigQueue       workqueue.RateLimitingInterface
80
        istioQueue          workqueue.RateLimitingInterface
81
        nodeFabNetAttQueue  workqueue.RateLimitingInterface
82
        netFabConfigQueue   workqueue.RateLimitingInterface
83
        nadVlanMapQueue     workqueue.RateLimitingInterface
84
        fabricVlanPoolQueue workqueue.RateLimitingInterface
85
        netFabL3ConfigQueue workqueue.RateLimitingInterface
86
        remIpContQueue      workqueue.RateLimitingInterface
87

88
        namespaceIndexer                     cache.Indexer
89
        namespaceInformer                    cache.Controller
90
        podIndexer                           cache.Indexer
91
        podInformer                          cache.Controller
92
        endpointsIndexer                     cache.Indexer
93
        endpointsInformer                    cache.Controller
94
        serviceIndexer                       cache.Indexer
95
        serviceInformer                      cache.Controller
96
        replicaSetIndexer                    cache.Indexer
97
        replicaSetInformer                   cache.Controller
98
        deploymentIndexer                    cache.Indexer
99
        deploymentInformer                   cache.Controller
100
        nodeIndexer                          cache.Indexer
101
        nodeInformer                         cache.Controller
102
        networkPolicyIndexer                 cache.Indexer
103
        networkPolicyInformer                cache.Controller
104
        snatIndexer                          cache.Indexer
105
        snatInformer                         cache.Controller
106
        snatNodeInfoIndexer                  cache.Indexer
107
        snatNodeInformer                     cache.Controller
108
        crdInformer                          cache.Controller
109
        rdConfigInformer                     cache.Controller
110
        rdConfigIndexer                      cache.Indexer
111
        qosIndexer                           cache.Indexer
112
        qosInformer                          cache.Controller
113
        netflowIndexer                       cache.Indexer
114
        netflowInformer                      cache.Controller
115
        erspanIndexer                        cache.Indexer
116
        erspanInformer                       cache.Controller
117
        nodePodIfIndexer                     cache.Indexer
118
        nodePodIfInformer                    cache.Controller
119
        istioIndexer                         cache.Indexer
120
        istioInformer                        cache.Controller
121
        endpointSliceIndexer                 cache.Indexer
122
        endpointSliceInformer                cache.Controller
123
        snatCfgInformer                      cache.Controller
124
        updatePod                            podUpdateFunc
125
        updateNode                           nodeUpdateFunc
126
        updateServiceStatus                  serviceUpdateFunc
127
        listNetworkPolicies                  listNetworkPoliciesFunc
128
        listNamespaces                       listNamespacesFunc
129
        nodeFabNetAttInformer                cache.SharedIndexInformer
130
        netFabConfigInformer                 cache.SharedIndexInformer
131
        nadVlanMapInformer                   cache.SharedIndexInformer
132
        fabricVlanPoolInformer               cache.SharedIndexInformer
133
        networkFabricL3ConfigurationInformer cache.SharedIndexInformer
134
        fabNetAttClient                      *fabattclset.Clientset
135

136
        indexMutex sync.Mutex
137

138
        configuredPodNetworkIps *netIps
139
        podNetworkIps           *netIps
140
        serviceIps              *ipam.IpCache
141
        staticServiceIps        *netIps
142
        nodeServiceIps          *netIps
143

144
        // index of pods matched by deployments
145
        depPods *index.PodSelectorIndex
146
        // index of pods matched by network policies
147
        netPolPods *index.PodSelectorIndex
148
        // index of pods matched by network policy ingress rules
149
        netPolIngressPods *index.PodSelectorIndex
150
        // index of pods matched by network policy egress rules
151
        netPolEgressPods *index.PodSelectorIndex
152
        // index of IP addresses contained in endpoints objects
153
        endpointsIpIndex cidranger.Ranger
154
        // index of service target ports
155
        targetPortIndex map[string]*portIndexEntry
156
        // index of ip blocks referenced by network policy egress rules
157
        netPolSubnetIndex cidranger.Ranger
158
        // index of pods matched by erspan policies
159
        erspanPolPods *index.PodSelectorIndex
160

161
        apicConn *apicapi.ApicConnection
162

163
        nodeServiceMetaCache map[string]*nodeServiceMeta
164
        nodeACIPod           map[string]aciPodAnnot
165
        nodeACIPodAnnot      map[string]aciPodAnnot
166
        nodeOpflexDevice     map[string]apicapi.ApicSlice
167
        nodePodNetCache      map[string]*nodePodNetMeta
168
        serviceMetaCache     map[string]*serviceMeta
169
        snatPolicyCache      map[string]*ContSnatPolicy
170
        delayedEpSlices      []*DelayedEpSlice
171
        snatServices         map[string]bool
172
        snatNodeInfoCache    map[string]*nodeinfo.NodeInfo
173
        rdConfigCache        map[string]*rdConfig.RdConfig
174
        rdConfigSubnetCache  map[string]*rdConfig.RdConfigSpec
175
        istioCache           map[string]*istiov1.AciIstioOperator
176
        podIftoEp            map[string]*EndPointData
177
        // Node Name and Policy Name
178
        snatGlobalInfoCache map[string]map[string]*snatglobalinfo.GlobalInfo
179
        nodeSyncEnabled     bool
180
        serviceSyncEnabled  bool
181
        snatSyncEnabled     bool
182
        syncQueue           workqueue.RateLimitingInterface
183
        syncProcessors      map[string]func() bool
184
        serviceEndPoints    ServiceEndPointType
185
        crdHandlers         map[string]func(*AciController, <-chan struct{})
186
        stopCh              <-chan struct{}
187
        //index of containerportname to ctrPortNameEntry
188
        ctrPortNameCache map[string]*ctrPortNameEntry
189
        // named networkPolicies
190
        nmPortNp map[string]bool
191
        //maps network policy hash to hpp
192
        hppRef map[string]hppReference
193
        //map for ns to remoteIpCont
194
        nsRemoteIpCont map[string]remoteIpCont
195
        // cache to look for Epg DNs which are bound to Vmm domain
196
        cachedEpgDns             []string
197
        vmmClusterFaultSupported bool
198
        additionalNetworkCache   map[string]*AdditionalNetworkMeta
199
        //Used in Shared mode
200
        sharedEncapCache       map[int]*sharedEncapData
201
        sharedEncapAepCache    map[string]map[int]bool
202
        sharedEncapSviCache    map[int]*NfL3Data
203
        sharedEncapVrfCache    map[string]*NfVrfData
204
        sharedEncapTenantCache map[string]*NfTenantData
205
        nfl3configGenerationId int64
206
        // vlan to propertiesList
207
        sharedEncapNfcCache         map[int]*NfcData
208
        sharedEncapNfcVlanMap       map[int]*NfcData
209
        sharedEncapNfcLabelMap      map[string]*NfcData
210
        sharedEncapNfcAppProfileMap map[string]map[int]bool
211
        // nadVlanMap encapLabel to vlan
212
        sharedEncapLabelMap      map[string][]int
213
        lldpIfCache              map[string]string
214
        globalVlanConfig         globalVlanConfig
215
        fabricVlanPoolMap        map[string]map[string]string
216
        openStackFabricPathDnMap map[string]openstackOpflexOdevInfo
217
        openStackSystemId        string
218
}
219

220
type NfL3OutData struct {
221
        // +kubebuilder:validation:Enum:"import"
222
        RtCtrl     string
223
        PodId      int
224
        RtrNodeMap map[int]*fabattv1.FabricL3OutRtrNode
225
        ExtEpgMap  map[string]*fabattv1.PolicyPrefixGroup
226
        SviMap     map[int]bool
227
}
228

229
type NfTenantData struct {
230
        CommonTenant     bool
231
        L3OutConfig      map[string]*NfL3OutData
232
        BGPPeerPfxConfig map[string]*fabattv1.BGPPeerPrefixPolicy
233
}
234

235
type NfVrfData struct {
236
        TenantConfig map[string]*NfTenantData
237
}
238

239
type NfL3Networks struct {
240
        fabattv1.PrimaryNetwork
241
        Subnets map[string]*fabattv1.FabricL3Subnet
242
}
243

244
type NfL3Data struct {
245
        Tenant      string
246
        Vrf         fabattv1.VRF
247
        PodId       int
248
        ConnectedNw *NfL3Networks
249
        NetAddr     map[string]*RoutedNetworkData
250
        Nodes       map[int]fabattv1.FabricL3OutNode
251
}
252

253
// remoteIpCont maps ip to pod labels
254
type remoteIpCont map[string]map[string]string
255

256
type NfcData struct {
257
        Aeps map[string]bool
258
        Epg  fabattv1.Epg
259
}
260

261
type sharedEncapData struct {
262
        //node to NAD to pods
263
        Pods   map[string]map[string][]string
264
        NetRef map[string]*AdditionalNetworkMeta
265
        Aeps   map[string]bool
266
}
267

268
type globalVlanConfig struct {
269
        SharedPhysDom apicapi.ApicObject
270
        SharedL3Dom   apicapi.ApicObject
271
}
272

273
type hppReference struct {
274
        RefCount uint              `json:"ref-count,omitempty"`
275
        Npkeys   []string          `json:"npkeys,omitempty"`
276
        HppObj   apicapi.ApicSlice `json:"hpp-obj,omitempty"`
277
        HppCr    hppv1.HostprotPol `json:"hpp-cr,omitempty"`
278
}
279

280
type DelayedEpSlice struct {
281
        ServiceKey  string
282
        OldEpSlice  *discovery.EndpointSlice
283
        NewEpSlice  *discovery.EndpointSlice
284
        DelayedTime time.Time
285
}
286

287
type aciPodAnnot struct {
288
        aciPod             string
289
        isSingleOpflexOdev bool
290
        disconnectTime     time.Time
291
        connectTime        time.Time
292
}
293

294
type nodeServiceMeta struct {
295
        serviceEp metadata.ServiceEndpoint
296
}
297

298
type nodePodNetMeta struct {
299
        nodePods            map[string]bool
300
        podNetIps           metadata.NetIps
301
        podNetIpsAnnotation string
302
}
303

304
type openstackOpflexOdevInfo struct {
305
        opflexODevDn map[string]struct{}
306
        fabricPathDn string
307
}
308

309
type serviceMeta struct {
310
        requestedIps     []net.IP
311
        ingressIps       []net.IP
312
        staticIngressIps []net.IP
313
}
314

315
type ipIndexEntry struct {
316
        ipNet net.IPNet
317
        keys  map[string]bool
318
}
319

320
type targetPort struct {
321
        proto v1.Protocol
322
        ports []int
323
}
324

325
type portIndexEntry struct {
326
        port              targetPort
327
        serviceKeys       map[string]bool
328
        networkPolicyKeys map[string]bool
329
}
330

331
type portRangeSnat struct {
332
        start int
333
        end   int
334
}
335

336
// EndPointData holds PodIF data in controller.
337
type EndPointData struct {
338
        MacAddr    string
339
        EPG        string
340
        Namespace  string
341
        AppProfile string
342
}
343

344
type ctrPortNameEntry struct {
345
        // Proto+port->pods
346
        ctrNmpToPods map[string]map[string]bool
347
}
348

349
type LinkData struct {
350
        Link []string
351
        Pods []string
352
}
353

354
type RoutedNodeData struct {
355
        addr string
356
        idx  int
357
}
358

359
type RoutedNetworkData struct {
360
        subnet       string
361
        netAddress   string
362
        maskLen      int
363
        numAllocated int
364
        maxAddresses int
365
        baseAddress  net.IP
366
        nodeMap      map[string]RoutedNodeData
367
        availableMap map[int]bool
368
}
369

370
type AdditionalNetworkMeta struct {
371
        NetworkName string
372
        EncapVlan   string
373
        //node+localiface->fabricLinks
374
        FabricLink map[string]map[string]LinkData
375
        NodeCache  map[string]*fabattv1.NodeFabricNetworkAttachment
376
        Mode       util.EncapMode
377
}
378

379
type ServiceEndPointType interface {
380
        InitClientInformer(kubeClient *kubernetes.Clientset)
381
        Run(stopCh <-chan struct{})
382
        Wait(stopCh <-chan struct{})
383
        UpdateServicesForNode(nodename string)
384
        GetnodesMetadata(key string, service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint)
385
        SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool
386
        SetNpServiceAugmentForService(servicekey string, service *v1.Service, prs *portRemoteSubnet,
387
                portAugments map[string]*portServiceAugment, subnetIndex cidranger.Ranger, logger *logrus.Entry)
388
}
389

390
type serviceEndpoint struct {
391
        cont *AciController
392
}
393
type serviceEndpointSlice struct {
394
        cont *AciController
395
}
396

397
func (sep *serviceEndpoint) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
398
        sep.cont.initEndpointsInformerFromClient(kubeClient)
×
399
}
×
400

401
func (seps *serviceEndpointSlice) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
402
        seps.cont.initEndpointSliceInformerFromClient(kubeClient)
×
403
}
×
404

405
func (sep *serviceEndpoint) Run(stopCh <-chan struct{}) {
1✔
406
        go sep.cont.endpointsInformer.Run(stopCh)
1✔
407
}
1✔
408

409
func (seps *serviceEndpointSlice) Run(stopCh <-chan struct{}) {
1✔
410
        go seps.cont.endpointSliceInformer.Run(stopCh)
1✔
411
}
1✔
412

413
func (sep *serviceEndpoint) Wait(stopCh <-chan struct{}) {
1✔
414
        cache.WaitForCacheSync(stopCh,
1✔
415
                sep.cont.endpointsInformer.HasSynced,
1✔
416
                sep.cont.serviceInformer.HasSynced)
1✔
417
}
1✔
418

419
func (seps *serviceEndpointSlice) Wait(stopCh <-chan struct{}) {
1✔
420
        seps.cont.log.Debug("Waiting for EndPointSlicecache sync")
1✔
421
        cache.WaitForCacheSync(stopCh,
1✔
422
                seps.cont.endpointSliceInformer.HasSynced,
1✔
423
                seps.cont.serviceInformer.HasSynced)
1✔
424
}
1✔
425

426
func (e *ipIndexEntry) Network() net.IPNet {
1✔
427
        return e.ipNet
1✔
428
}
1✔
429

430
func newNodePodNetMeta() *nodePodNetMeta {
1✔
431
        return &nodePodNetMeta{
1✔
432
                nodePods: make(map[string]bool),
1✔
433
        }
1✔
434
}
1✔
435

436
func createQueue(name string) workqueue.RateLimitingInterface {
1✔
437
        return workqueue.NewNamedRateLimitingQueue(
1✔
438
                workqueue.NewMaxOfRateLimiter(
1✔
439
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
440
                                10*time.Second),
1✔
441
                        &workqueue.BucketRateLimiter{
1✔
442
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
443
                        },
1✔
444
                ),
1✔
445
                "delta")
1✔
446
}
1✔
447

448
func NewController(config *ControllerConfig, env Environment, log *logrus.Logger, unittestmode bool) *AciController {
1✔
449
        cont := &AciController{
1✔
450
                log:          log,
1✔
451
                config:       config,
1✔
452
                env:          env,
1✔
453
                defaultEg:    "",
1✔
454
                defaultSg:    "",
1✔
455
                unitTestMode: unittestmode,
1✔
456

1✔
457
                podQueue:            createQueue("pod"),
1✔
458
                netPolQueue:         createQueue("networkPolicy"),
1✔
459
                qosQueue:            createQueue("qos"),
1✔
460
                netflowQueue:        createQueue("netflow"),
1✔
461
                erspanQueue:         createQueue("erspan"),
1✔
462
                serviceQueue:        createQueue("service"),
1✔
463
                snatQueue:           createQueue("snat"),
1✔
464
                snatNodeInfoQueue:   createQueue("snatnodeinfo"),
1✔
465
                rdConfigQueue:       createQueue("rdconfig"),
1✔
466
                istioQueue:          createQueue("istio"),
1✔
467
                nodeFabNetAttQueue:  createQueue("nodefabricnetworkattachment"),
1✔
468
                netFabConfigQueue:   createQueue("networkfabricconfiguration"),
1✔
469
                nadVlanMapQueue:     createQueue("nadvlanmap"),
1✔
470
                fabricVlanPoolQueue: createQueue("fabricvlanpool"),
1✔
471
                netFabL3ConfigQueue: createQueue("networkfabricl3configuration"),
1✔
472
                remIpContQueue:      createQueue("remoteIpContainer"),
1✔
473
                syncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
474
                        &workqueue.BucketRateLimiter{
1✔
475
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
476
                        }, "sync"),
1✔
477

1✔
478
                configuredPodNetworkIps: newNetIps(),
1✔
479
                podNetworkIps:           newNetIps(),
1✔
480
                serviceIps:              ipam.NewIpCache(),
1✔
481
                staticServiceIps:        newNetIps(),
1✔
482
                nodeServiceIps:          newNetIps(),
1✔
483

1✔
484
                nodeACIPod:       make(map[string]aciPodAnnot),
1✔
485
                nodeACIPodAnnot:  make(map[string]aciPodAnnot),
1✔
486
                nodeOpflexDevice: make(map[string]apicapi.ApicSlice),
1✔
487

1✔
488
                nodeServiceMetaCache:        make(map[string]*nodeServiceMeta),
1✔
489
                nodePodNetCache:             make(map[string]*nodePodNetMeta),
1✔
490
                serviceMetaCache:            make(map[string]*serviceMeta),
1✔
491
                snatPolicyCache:             make(map[string]*ContSnatPolicy),
1✔
492
                snatServices:                make(map[string]bool),
1✔
493
                snatNodeInfoCache:           make(map[string]*nodeinfo.NodeInfo),
1✔
494
                rdConfigCache:               make(map[string]*rdConfig.RdConfig),
1✔
495
                rdConfigSubnetCache:         make(map[string]*rdConfig.RdConfigSpec),
1✔
496
                podIftoEp:                   make(map[string]*EndPointData),
1✔
497
                snatGlobalInfoCache:         make(map[string]map[string]*snatglobalinfo.GlobalInfo),
1✔
498
                istioCache:                  make(map[string]*istiov1.AciIstioOperator),
1✔
499
                crdHandlers:                 make(map[string]func(*AciController, <-chan struct{})),
1✔
500
                ctrPortNameCache:            make(map[string]*ctrPortNameEntry),
1✔
501
                nmPortNp:                    make(map[string]bool),
1✔
502
                hppRef:                      make(map[string]hppReference),
1✔
503
                additionalNetworkCache:      make(map[string]*AdditionalNetworkMeta),
1✔
504
                sharedEncapCache:            make(map[int]*sharedEncapData),
1✔
505
                sharedEncapAepCache:         make(map[string]map[int]bool),
1✔
506
                sharedEncapSviCache:         make(map[int]*NfL3Data),
1✔
507
                sharedEncapVrfCache:         make(map[string]*NfVrfData),
1✔
508
                sharedEncapTenantCache:      make(map[string]*NfTenantData),
1✔
509
                sharedEncapNfcCache:         make(map[int]*NfcData),
1✔
510
                sharedEncapNfcVlanMap:       make(map[int]*NfcData),
1✔
511
                sharedEncapNfcLabelMap:      make(map[string]*NfcData),
1✔
512
                sharedEncapNfcAppProfileMap: make(map[string]map[int]bool),
1✔
513
                sharedEncapLabelMap:         make(map[string][]int),
1✔
514
                lldpIfCache:                 make(map[string]string),
1✔
515
                fabricVlanPoolMap:           make(map[string]map[string]string),
1✔
516
                openStackFabricPathDnMap:    make(map[string]openstackOpflexOdevInfo),
1✔
517
                nsRemoteIpCont:              make(map[string]remoteIpCont),
1✔
518
        }
1✔
519
        cont.syncProcessors = map[string]func() bool{
1✔
520
                "snatGlobalInfo": cont.syncSnatGlobalInfo,
1✔
521
                "rdConfig":       cont.syncRdConfig,
1✔
522
                /* Commenting code to remove dependency from istio.io/istio package.
1✔
523
                   Vulnerabilties were detected by quay.io security scan of aci-containers-controller
1✔
524
                   and aci-containers-operator images for istio.io/istio package
1✔
525

1✔
526
                "istioCR":        cont.createIstioCR,
1✔
527
                */
1✔
528
        }
1✔
529
        return cont
1✔
530
}
1✔
531

532
func (cont *AciController) Init() {
×
533
        if cont.config.ChainedMode {
×
534
                cont.log.Info("In chained mode")
×
535
        }
×
536

537
        egdata, err := json.Marshal(cont.config.DefaultEg)
×
538
        if err != nil {
×
539
                cont.log.Error("Could not serialize default endpoint group")
×
540
                panic(err.Error())
×
541
        }
542
        cont.defaultEg = string(egdata)
×
543

×
544
        sgdata, err := json.Marshal(cont.config.DefaultSg)
×
545
        if err != nil {
×
546
                cont.log.Error("Could not serialize default security groups")
×
547
                panic(err.Error())
×
548
        }
549
        cont.defaultSg = string(sgdata)
×
550

×
551
        cont.log.Debug("Initializing IPAM")
×
552
        cont.initIpam()
×
553
        // check if the cluster supports endpoint slices
×
554
        // if cluster doesn't have the support fallback to endpoints
×
555
        kubeClient := cont.env.(*K8sEnvironment).kubeClient
×
556
        if util.IsEndPointSlicesSupported(kubeClient) {
×
557
                cont.serviceEndPoints = &serviceEndpointSlice{}
×
558
                cont.serviceEndPoints.(*serviceEndpointSlice).cont = cont
×
559
                cont.log.Info("Initializing ServiceEndpointSlices")
×
560
        } else {
×
561
                cont.serviceEndPoints = &serviceEndpoint{}
×
562
                cont.serviceEndPoints.(*serviceEndpoint).cont = cont
×
563
                cont.log.Info("Initializing ServiceEndpoints")
×
564
        }
×
565

566
        err = cont.env.Init(cont)
×
567
        if err != nil {
×
568
                panic(err.Error())
×
569
        }
570
}
571

572
func (cont *AciController) processQueue(queue workqueue.RateLimitingInterface,
573
        store cache.Store, handler func(interface{}) bool,
574
        deleteHandler func(string) bool,
575
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
576
        go wait.Until(func() {
2✔
577
                for {
2✔
578
                        key, quit := queue.Get()
1✔
579
                        if quit {
2✔
580
                                break
1✔
581
                        }
582

583
                        var requeue bool
1✔
584
                        switch key := key.(type) {
1✔
585
                        case chan struct{}:
×
586
                                close(key)
×
587
                        case string:
1✔
588
                                if strings.HasPrefix(key, "DELETED_") {
2✔
589
                                        delKey := strings.Trim(key, "DELETED_")
1✔
590
                                        requeue = deleteHandler(delKey)
1✔
591
                                } else {
2✔
592
                                        obj, exists, err := store.GetByKey(key)
1✔
593
                                        if err != nil {
1✔
594
                                                cont.log.Debugf("Error fetching object with key %s from store: %v", key, err)
×
595
                                        }
×
596
                                        //Handle Add/Update/Delete
597
                                        if exists && handler != nil {
2✔
598
                                                requeue = handler(obj)
1✔
599
                                        }
1✔
600
                                        //Handle Post Delete
601
                                        if !exists && postDelHandler != nil {
1✔
602
                                                requeue = postDelHandler()
×
603
                                        }
×
604
                                }
605
                        }
606
                        if requeue {
2✔
607
                                queue.AddRateLimited(key)
1✔
608
                        } else {
2✔
609
                                queue.Forget(key)
1✔
610
                        }
1✔
611
                        queue.Done(key)
1✔
612
                }
613
        }, time.Second, stopCh)
614
        <-stopCh
1✔
615
        queue.ShutDown()
1✔
616
}
617

618
func (cont *AciController) processRemIpContQueue(queue workqueue.RateLimitingInterface,
619
        handler func(interface{}) bool,
620
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
621
        go wait.Until(func() {
2✔
622
                for {
2✔
623
                        key, quit := queue.Get()
1✔
624
                        if quit {
2✔
625
                                break
1✔
626
                        }
627

628
                        var requeue bool
1✔
629
                        switch key := key.(type) {
1✔
630
                        case chan struct{}:
×
631
                                close(key)
×
632
                        case string:
1✔
633
                                if handler != nil {
2✔
634
                                        requeue = handler(key)
1✔
635
                                }
1✔
636
                                if postDelHandler != nil {
2✔
637
                                        requeue = postDelHandler()
1✔
638
                                }
1✔
639
                        }
640
                        if requeue {
1✔
641
                                queue.AddRateLimited(key)
×
642
                        } else {
1✔
643
                                queue.Forget(key)
1✔
644
                        }
1✔
645
                        queue.Done(key)
1✔
646

647
                }
648
        }, time.Second, stopCh)
649
        <-stopCh
1✔
650
        queue.ShutDown()
1✔
651
}
652

653
func (cont *AciController) globalStaticObjs() apicapi.ApicSlice {
1✔
654
        return apicapi.ApicSlice{}
1✔
655
}
1✔
656

657
func (cont *AciController) aciNameForKey(ktype, key string) string {
1✔
658
        return util.AciNameForKey(cont.config.AciPrefix, ktype, key)
1✔
659
}
1✔
660

661
func (cont *AciController) initStaticObjs() {
1✔
662
        cont.env.InitStaticAciObjects()
1✔
663
        cont.apicConn.WriteStaticApicObjects(cont.config.AciPrefix+"_static",
1✔
664
                cont.globalStaticObjs())
1✔
665
}
1✔
666

667
func (cont *AciController) vmmDomainProvider() (vmmProv string) {
1✔
668
        vmmProv = "Kubernetes"
1✔
669
        if strings.ToLower(cont.config.AciVmmDomainType) == "openshift" {
1✔
670
                vmmProv = "OpenShift"
×
671
        }
×
672
        return
1✔
673
}
674

675
func (cont *AciController) Run(stopCh <-chan struct{}) {
1✔
676
        var err error
1✔
677
        var privKey []byte
1✔
678
        var apicCert []byte
1✔
679

1✔
680
        cont.config.AciVrfDn = "uni/tn-" + cont.config.AciVrfTenant + "/ctx-" + cont.config.AciVrf
1✔
681

1✔
682
        if cont.config.ApicPrivateKeyPath != "" {
1✔
683
                privKey, err = os.ReadFile(cont.config.ApicPrivateKeyPath)
×
684
                if err != nil {
×
685
                        panic(err)
×
686
                }
687
        }
688
        if cont.config.ApicCertPath != "" {
1✔
689
                apicCert, err = os.ReadFile(cont.config.ApicCertPath)
×
690
                if err != nil {
×
691
                        panic(err)
×
692
                }
693
        }
694
        // If not defined, default is 1800
695
        if cont.config.ApicRefreshTimer == "" {
2✔
696
                cont.config.ApicRefreshTimer = "1800"
1✔
697
        }
1✔
698
        refreshTimeout, err := strconv.Atoi(cont.config.ApicRefreshTimer)
1✔
699
        if err != nil {
1✔
700
                panic(err)
×
701
        }
702
        cont.log.Info("ApicRefreshTimer conf is set to: ", refreshTimeout)
1✔
703

1✔
704
        // Bailout if the refreshTimeout is more than 12Hours or less than 5Mins
1✔
705
        if refreshTimeout > (12*60*60) || refreshTimeout < (5*60) {
1✔
706
                cont.log.Info("ApicRefreshTimer can't be more than 12Hrs or less than 5Mins")
×
707
                panic(err)
×
708
        }
709

710
        // If RefreshTickerAdjustInterval is not defined, default to 150Sec.
711
        if cont.config.ApicRefreshTickerAdjust == "" {
2✔
712
                cont.config.ApicRefreshTickerAdjust = "150"
1✔
713
        }
1✔
714
        refreshTickerAdjust, err := strconv.Atoi(cont.config.ApicRefreshTickerAdjust)
1✔
715
        if err != nil {
1✔
716
                panic(err)
×
717
        }
718

719
        //If ApicSubscriptionDelay is not defined, default to 100ms
720
        if cont.config.ApicSubscriptionDelay == 0 {
2✔
721
                cont.config.ApicSubscriptionDelay = 100
1✔
722
        }
1✔
723
        cont.log.Info("ApicSubscriptionDelay conf is set to: ", cont.config.ApicSubscriptionDelay)
1✔
724

1✔
725
        // If OpflexDeviceDeleteTimeout is not defined, default to 1800s
1✔
726
        if cont.config.OpflexDeviceDeleteTimeout == 0 {
2✔
727
                cont.config.OpflexDeviceDeleteTimeout = 1800
1✔
728
        }
1✔
729

730
        // If OpflexDeviceReconnectWaitTimeout is not defined, default to 25s
731
        if cont.config.OpflexDeviceReconnectWaitTimeout == 0 {
2✔
732
                cont.config.OpflexDeviceReconnectWaitTimeout = 25
1✔
733
        }
1✔
734
        cont.log.Debug("OpflexDeviceReconnectWaitTimeout set to: ", cont.config.OpflexDeviceReconnectWaitTimeout)
1✔
735

1✔
736
        // If SleepTimeSnatGlobalInfoSync is not defined, default to 60
1✔
737
        if cont.config.SleepTimeSnatGlobalInfoSync == 0 {
2✔
738
                cont.config.SleepTimeSnatGlobalInfoSync = 60
1✔
739
        }
1✔
740

741
        // If not defined, default to 32
742
        if cont.config.PodIpPoolChunkSize == 0 {
2✔
743
                cont.config.PodIpPoolChunkSize = 32
1✔
744
        }
1✔
745
        if !cont.config.ChainedMode {
2✔
746
                cont.log.Info("PodIpPoolChunkSize conf is set to: ", cont.config.PodIpPoolChunkSize)
1✔
747
        }
1✔
748

749
        // If ApicConnectionRetryLimit is not defined, default to 5
750
        if cont.config.ApicConnectionRetryLimit == 0 {
2✔
751
                cont.config.ApicConnectionRetryLimit = 5
1✔
752
        }
1✔
753
        cont.log.Debug("ApicConnectionRetryLimit set to: ", cont.config.ApicConnectionRetryLimit)
1✔
754

1✔
755
        // If not valid, default to 5000-65000
1✔
756
        // other permissible values 1-65000
1✔
757
        defStart := 5000
1✔
758
        defEnd := 65000
1✔
759
        if cont.config.SnatDefaultPortRangeStart == 0 {
2✔
760
                cont.config.SnatDefaultPortRangeStart = defStart
1✔
761
        }
1✔
762
        if cont.config.SnatDefaultPortRangeEnd == 0 {
2✔
763
                cont.config.SnatDefaultPortRangeEnd = defEnd
1✔
764
        }
1✔
765
        if cont.config.SnatDefaultPortRangeStart < 0 || cont.config.SnatDefaultPortRangeEnd < 0 ||
1✔
766
                cont.config.SnatDefaultPortRangeStart > defEnd || cont.config.SnatDefaultPortRangeEnd > defEnd ||
1✔
767
                cont.config.SnatDefaultPortRangeStart > cont.config.SnatDefaultPortRangeEnd {
1✔
768
                cont.config.SnatDefaultPortRangeStart = defStart
×
769
                cont.config.SnatDefaultPortRangeEnd = defEnd
×
770
        }
×
771

772
        // Set default value for pbr programming delay if services list is not empty
773
        // and delay value is empty
774
        if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 &&
1✔
775
                cont.config.ServiceGraphEndpointAddDelay.Services != nil &&
1✔
776
                len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 {
1✔
777
                cont.config.ServiceGraphEndpointAddDelay.Delay = 90
×
778
        }
×
779
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
780
                cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay)
×
781
        }
×
782

783
        // Set contract scope for snat svc graph to global by default
784
        if cont.config.SnatSvcContractScope == "" {
2✔
785
                cont.config.SnatSvcContractScope = "global"
1✔
786
        }
1✔
787
        if cont.config.MaxSvcGraphNodes == 0 {
2✔
788
                cont.config.MaxSvcGraphNodes = 32
1✔
789
        }
1✔
790
        if !cont.config.ChainedMode {
2✔
791
                cont.log.Info("Max number of nodes per svc graph is set to: ", cont.config.MaxSvcGraphNodes)
1✔
792
        }
1✔
793
        cont.apicConn, err = apicapi.New(cont.log, cont.config.ApicHosts,
1✔
794
                cont.config.ApicUsername, cont.config.ApicPassword,
1✔
795
                privKey, apicCert, cont.config.AciPrefix,
1✔
796
                refreshTimeout, refreshTickerAdjust, cont.config.ApicSubscriptionDelay,
1✔
797
                cont.config.AciVrfTenant)
1✔
798
        if err != nil {
1✔
799
                panic(err)
×
800
        }
801

802
        cont.apicConn.ReconnectRetryLimit = cont.config.ApicConnectionRetryLimit
1✔
803

1✔
804
        if len(cont.config.ApicHosts) != 0 {
1✔
805
        APIC_SWITCH:
×
806
                cont.log.WithFields(logrus.Fields{
×
807
                        "mod":  "APICAPI",
×
808
                        "host": cont.apicConn.Apic[cont.apicConn.ApicIndex],
×
809
                }).Debug("Connecting to APIC to determine the Version")
×
810

×
811
                version, err := cont.apicConn.GetVersion()
×
812
                if err != nil {
×
813
                        cont.log.Error("Could not get APIC version, switching to next APIC")
×
814
                        cont.apicConn.ApicIndex = (cont.apicConn.ApicIndex + 1) % len(cont.apicConn.Apic)
×
815
                        time.Sleep(cont.apicConn.ReconnectInterval)
×
816
                        goto APIC_SWITCH
×
817
                }
818
                cont.apicConn.CachedVersion = version
×
819
                apicapi.ApicVersion = version
×
820
                if version >= "4.2(4i)" {
×
821
                        cont.apicConn.SnatPbrFltrChain = true
×
822
                } else {
×
823
                        cont.apicConn.SnatPbrFltrChain = false
×
824
                }
×
825
                if version >= "5.2" {
×
826
                        cont.vmmClusterFaultSupported = true
×
827
                }
×
828
        } else { // For unit-tests
1✔
829
                cont.apicConn.SnatPbrFltrChain = true
1✔
830
        }
1✔
831

832
        if !cont.config.ChainedMode {
2✔
833
                cont.log.Debug("SnatPbrFltrChain set to:", cont.apicConn.SnatPbrFltrChain)
1✔
834
                // Make sure Pod/NodeBDs are assoicated to same VRF.
1✔
835
                if len(cont.config.ApicHosts) != 0 && cont.config.AciPodBdDn != "" && cont.config.AciNodeBdDn != "" {
1✔
836
                        var expectedVrfRelations []string
×
837
                        expectedVrfRelations = append(expectedVrfRelations, cont.config.AciPodBdDn, cont.config.AciNodeBdDn)
×
838
                        cont.log.Debug("expectedVrfRelations:", expectedVrfRelations)
×
839
                        err = cont.apicConn.ValidateAciVrfAssociation(cont.config.AciVrfDn, expectedVrfRelations)
×
840
                        if err != nil {
×
841
                                cont.log.Error("Pod/NodeBDs and AciL3Out VRF association is incorrect")
×
842
                                panic(err)
×
843
                        }
844
                }
845
        }
846

847
        if len(cont.config.ApicHosts) != 0 && cont.vmmClusterFaultSupported && !cont.config.ChainedMode {
1✔
848
                //Clear fault instances when the controller starts
×
849
                cont.clearFaultInstances()
×
850
                //Subscribe for vmmEpPD for a given domain
×
851
                var tnTargetFilterEpg string
×
852
                tnTargetFilterEpg += fmt.Sprintf("uni/vmmp-%s/dom-%s/", cont.vmmDomainProvider(), cont.config.AciVmmDomain)
×
853
                subnetTargetFilterEpg := fmt.Sprintf("and(wcard(vmmEpPD.dn,\"%s\"))", tnTargetFilterEpg)
×
854
                cont.apicConn.AddSubscriptionClass("vmmEpPD",
×
855
                        []string{"vmmEpPD"}, subnetTargetFilterEpg)
×
856
                cont.apicConn.SetSubscriptionHooks("vmmEpPD",
×
857
                        func(obj apicapi.ApicObject) bool {
×
858
                                cont.vmmEpPDChanged(obj)
×
859
                                return true
×
860
                        },
×
861
                        func(dn string) {
×
862
                                cont.vmmEpPDDeleted(dn)
×
863
                        })
×
864
        }
865

866
        cont.initStaticObjs()
1✔
867

1✔
868
        err = cont.env.PrepareRun(stopCh)
1✔
869
        if err != nil {
1✔
870
                panic(err.Error())
×
871
        }
872

873
        cont.apicConn.FullSyncHook = func() {
1✔
874
                // put a channel into each work queue and wait on it to
×
875
                // checkpoint object syncing in response to new subscription
×
876
                // updates
×
877
                cont.log.Debug("Starting checkpoint")
×
878
                var chans []chan struct{}
×
879
                qs := make([]workqueue.RateLimitingInterface, 0)
×
880
                _, ok := cont.env.(*K8sEnvironment)
×
881
                if ok {
×
882
                        qs = []workqueue.RateLimitingInterface{cont.podQueue}
×
883
                        if !cont.config.ChainedMode {
×
884
                                if !cont.config.DisableHppRendering {
×
885
                                        qs = append(qs, cont.netPolQueue)
×
886
                                }
×
887
                                if cont.config.EnableHppDirect {
×
888
                                        qs = append(qs, cont.remIpContQueue)
×
889
                                }
×
890
                                qs = append(qs, cont.qosQueue, cont.serviceQueue,
×
891
                                        cont.snatQueue, cont.netflowQueue, cont.snatNodeInfoQueue,
×
892
                                        cont.rdConfigQueue, cont.erspanQueue)
×
893
                        }
894
                }
895
                for _, q := range qs {
×
896
                        c := make(chan struct{})
×
897
                        chans = append(chans, c)
×
898
                        q.Add(c)
×
899
                }
×
900
                for _, c := range chans {
×
901
                        <-c
×
902
                }
×
903
                cont.log.Debug("Checkpoint complete")
×
904
        }
905

906
        if len(cont.config.ApicHosts) != 0 && !cont.config.ChainedMode {
1✔
907
                cont.BuildSubnetDnCache(cont.config.AciVrfDn, cont.config.AciVrfDn)
×
908
                cont.scheduleRdConfig()
×
909
                if strings.Contains(cont.config.Flavor, "openstack") {
×
910
                        cont.setOpenStackSystemId()
×
911
                }
×
912
        }
913

914
        if !cont.config.ChainedMode {
2✔
915
                if cont.config.AciPolicyTenant != cont.config.AciVrfTenant {
2✔
916
                        cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
917
                                []string{"hostprotPol"})
1✔
918
                }
1✔
919
        } else {
1✔
920
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
921
                        []string{"fvBD", "fvAp"})
1✔
922
                cont.apicConn.AddSubscriptionClass("fvnsVlanInstP",
1✔
923
                        []string{"fvnsVlanInstP"}, "")
1✔
924
                cont.apicConn.AddSubscriptionClass("infraRsDomP",
1✔
925
                        []string{"infraRsDomP"}, "")
1✔
926
                cont.apicConn.AddSubscriptionClass("physDomP",
1✔
927
                        []string{"physDomP"}, "")
1✔
928
                cont.apicConn.AddSubscriptionClass("l3extDomP",
1✔
929
                        []string{"l3extDomP"}, "")
1✔
930
                cont.apicConn.AddSubscriptionClass("infraRsVlanNs",
1✔
931
                        []string{"infraRsVlanNs"}, "")
1✔
932
                cont.apicConn.AddSubscriptionClass("infraGeneric",
1✔
933
                        []string{"infraGeneric", "infraRsFuncToEpg"}, "")
1✔
934
                cont.apicConn.AddSubscriptionClass("l3extOut",
1✔
935
                        []string{"l3extInstP", "l3extSubnet", "fvRsCons", "fvRsProv", "l3extRsEctx", "l3extRsL3DomAtt", "l3extLNodeP", "l3extRsNodeL3OutAtt", "ipRouteP", "ipNexthopP", "l3extLIfP", "l3extVirtualLIfP", "l3extRsDynPathAtt",
1✔
936
                                "l3extRsPathL3OutAtt", "l3extMember", "l3extIp", "bgpExtP", "bgpPeerP", "bgpAsP", "bgpLocalAsnP", "bgpRsPeerPfxPol"}, "")
1✔
937
                cont.apicConn.AddSubscriptionClass("bgpPeerPfxPol",
1✔
938
                        []string{"bgpPeerPfxPol"}, "")
1✔
939
        }
1✔
940
        if !cont.config.ChainedMode {
2✔
941
                // When a new class is added for subscriptio, check if its name attribute
1✔
942
                // is in the format aciPrefix-<some value>, if so add it in nameAttrClass
1✔
943
                // in apicapi.go
1✔
944
                subscribeMo := []string{"fvBD", "vnsLDevVip", "vnsAbsGraph", "vnsLDevCtx",
1✔
945
                        "vzFilter", "vzBrCP", "l3extInstP", "vnsSvcRedirectPol",
1✔
946
                        "vnsRedirectHealthGroup", "fvIPSLAMonitoringPol"}
1✔
947
                if cont.config.AciPolicyTenant == cont.config.AciVrfTenant {
1✔
948
                        subscribeMo = append(subscribeMo, "hostprotPol")
×
949
                }
×
950
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciVrfTenant,
1✔
951
                        subscribeMo)
1✔
952
                cont.apicConn.AddSubscriptionDn(fmt.Sprintf("uni/tn-%s/out-%s",
1✔
953
                        cont.config.AciVrfTenant, cont.config.AciL3Out),
1✔
954
                        []string{"fvRsCons"})
1✔
955
                vmmDn := fmt.Sprintf("comp/prov-%s/ctrlr-[%s]-%s/injcont",
1✔
956
                        cont.env.VmmPolicy(), cont.config.AciVmmDomain,
1✔
957
                        cont.config.AciVmmController)
1✔
958
                // Before subscribing to vmm objects, add vmmInjectedLabel as a child after explicit APIC version check
1✔
959
                // Since it is not supported for APIC versions < "5.0"
1✔
960
                cont.addVmmInjectedLabel()
1✔
961
                cont.apicConn.AddSubscriptionDn(vmmDn,
1✔
962
                        []string{"vmmInjectedHost", "vmmInjectedNs"})
1✔
963

1✔
964
                var tnTargetFilter string
1✔
965
                if len(cont.config.AciVrfRelatedTenants) > 0 {
1✔
966
                        for _, tn := range cont.config.AciVrfRelatedTenants {
×
967
                                tnTargetFilter += fmt.Sprintf("tn-%s/|", tn)
×
968
                        }
×
969
                } else {
1✔
970
                        tnTargetFilter += fmt.Sprintf("tn-%s/|tn-%s/",
1✔
971
                                cont.config.AciPolicyTenant, cont.config.AciVrfTenant)
1✔
972
                }
1✔
973
                subnetTargetFilter := fmt.Sprintf("and(wcard(fvSubnet.dn,\"%s\"))",
1✔
974
                        tnTargetFilter)
1✔
975
                cont.apicConn.AddSubscriptionClass("fvSubnet",
1✔
976
                        []string{"fvSubnet"}, subnetTargetFilter)
1✔
977

1✔
978
                cont.apicConn.SetSubscriptionHooks("fvSubnet",
1✔
979
                        func(obj apicapi.ApicObject) bool {
1✔
980
                                cont.SubnetChanged(obj, cont.config.AciVrfDn)
×
981
                                return true
×
982
                        },
×
983
                        func(dn string) {
×
984
                                cont.SubnetDeleted(dn)
×
985
                        })
×
986

987
                cont.apicConn.AddSubscriptionClass("opflexODev",
1✔
988
                        []string{"opflexODev"}, "")
1✔
989

1✔
990
                cont.apicConn.SetSubscriptionHooks("opflexODev",
1✔
991
                        func(obj apicapi.ApicObject) bool {
1✔
992
                                cont.opflexDeviceChanged(obj)
×
993
                                return true
×
994
                        },
×
995
                        func(dn string) {
×
996
                                cont.opflexDeviceDeleted(dn)
×
997
                        })
×
998

999
                cont.apicConn.VersionUpdateHook =
1✔
1000
                        func() {
1✔
1001
                                cont.initStaticServiceObjs()
×
1002
                        }
×
1003
        }
1004
        go cont.apicConn.Run(stopCh)
1✔
1005
}
1006

1007
func (cont *AciController) syncNodeAciPods(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1008
        cont.log.Debug("Go routine to periodically check the aci pod information in the opflexOdev of a node")
1✔
1009
        ticker := time.NewTicker(seconds * time.Second)
1✔
1010
        defer ticker.Stop()
1✔
1011

1✔
1012
        for {
2✔
1013
                select {
1✔
1014
                case <-ticker.C:
1✔
1015
                        if cont.config.EnableOpflexAgentReconnect {
1✔
1016
                                cont.checkChangeOfOpflexOdevAciPod()
×
1017
                        }
×
1018
                        if cont.config.AciMultipod {
1✔
1019
                                cont.checkChangeOfOdevAciPod()
×
1020
                        }
×
1021
                case <-stopCh:
1✔
1022
                        return
1✔
1023
                }
1024
        }
1025
}
1026

1027
func (cont *AciController) syncOpflexDevices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1028
        cont.log.Debug("Go routine to periodically delete old opflexdevices started")
1✔
1029
        ticker := time.NewTicker(seconds * time.Second)
1✔
1030
        defer ticker.Stop()
1✔
1031

1✔
1032
        for {
2✔
1033
                select {
1✔
1034
                case <-ticker.C:
1✔
1035
                        cont.deleteOldOpflexDevices()
1✔
1036
                case <-stopCh:
1✔
1037
                        return
1✔
1038
                }
1039
        }
1040
}
1041

1042
func (cont *AciController) syncDelayedEpSlices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1043
        cont.log.Debug("Go routine to periodically check and process the epslices having delay adding in service")
1✔
1044
        ticker := time.NewTicker(seconds * time.Second)
1✔
1045
        defer ticker.Stop()
1✔
1046

1✔
1047
        for {
2✔
1048
                select {
1✔
1049
                case <-ticker.C:
1✔
1050
                        cont.processDelayedEpSlices()
1✔
1051
                case <-stopCh:
1✔
1052
                        return
1✔
1053
                }
1054
        }
1055
}
1056

1057
func (cont *AciController) snatGlobalInfoSync(stopCh <-chan struct{}, seconds int) {
1✔
1058
        time.Sleep(time.Duration(seconds) * time.Second)
1✔
1059
        cont.log.Debug("Go routine to periodically sync globalinfo and nodeinfo started")
1✔
1060
        iteration := 0
1✔
1061
        for {
2✔
1062
                // To avoid noisy logs, only printing once in 5 minutes
1✔
1063
                if iteration%5 == 0 {
2✔
1064
                        cont.log.Debug("Syncing GlobalInfo with Node infos")
1✔
1065
                }
1✔
1066
                var nodeInfos []*nodeinfo.NodeInfo
1✔
1067
                cont.indexMutex.Lock()
1✔
1068
                cache.ListAll(cont.snatNodeInfoIndexer, labels.Everything(),
1✔
1069
                        func(nodeInfoObj interface{}) {
2✔
1070
                                nodeInfo := nodeInfoObj.(*nodeinfo.NodeInfo)
1✔
1071
                                nodeInfos = append(nodeInfos, nodeInfo)
1✔
1072
                        })
1✔
1073
                expectedmap := make(map[string]map[string]bool)
1✔
1074
                for _, glinfo := range cont.snatGlobalInfoCache {
2✔
1075
                        for nodename, entry := range glinfo {
2✔
1076
                                if _, found := expectedmap[nodename]; !found {
2✔
1077
                                        newentry := make(map[string]bool)
1✔
1078
                                        newentry[entry.SnatPolicyName] = true
1✔
1079
                                        expectedmap[nodename] = newentry
1✔
1080
                                } else {
2✔
1081
                                        currententry := expectedmap[nodename]
1✔
1082
                                        currententry[entry.SnatPolicyName] = true
1✔
1083
                                        expectedmap[nodename] = currententry
1✔
1084
                                }
1✔
1085
                        }
1086
                }
1087
                cont.indexMutex.Unlock()
1✔
1088

1✔
1089
                for _, value := range nodeInfos {
2✔
1090
                        marked := false
1✔
1091
                        policyNames := value.Spec.SnatPolicyNames
1✔
1092
                        nodeName := value.ObjectMeta.Name
1✔
1093
                        _, ok := expectedmap[nodeName]
1✔
1094
                        if !ok && len(policyNames) > 0 {
2✔
1095
                                cont.log.Info("Adding missing entry in snatglobalinfo for node: ", nodeName)
1✔
1096
                                cont.log.Debug("No snat policies found in snatglobalinfo")
1✔
1097
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1098
                                marked = true
1✔
1099
                        } else if len(policyNames) != len(expectedmap[nodeName]) {
3✔
1100
                                cont.log.Info("Adding missing snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1101
                                cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1102
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1103
                                marked = true
1✔
1104
                        } else {
2✔
1105
                                if len(policyNames) == 0 && len(expectedmap[nodeName]) == 0 {
1✔
1106
                                        // No snatpolicies present
×
1107
                                        continue
×
1108
                                }
1109
                                eq := reflect.DeepEqual(expectedmap[nodeName], policyNames)
1✔
1110
                                if !eq {
2✔
1111
                                        cont.log.Debug("Syncing inconsistent snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1112
                                        cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1113
                                        cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1114
                                        marked = true
1✔
1115
                                }
1✔
1116
                        }
1117
                        if marked {
2✔
1118
                                cont.log.Info("Nodeinfo and globalinfo out of sync for node: ", nodeName)
1✔
1119
                                nodeinfokey, err := cache.MetaNamespaceKeyFunc(value)
1✔
1120
                                if err != nil {
1✔
1121
                                        cont.log.Error("Not able to get key for node: ", nodeName)
×
1122
                                        continue
×
1123
                                }
1124
                                cont.log.Info("Queuing nodeinfokey for globalinfo sync: ", nodeinfokey)
1✔
1125
                                cont.queueNodeInfoUpdateByKey(nodeinfokey)
1✔
1126
                        } else if iteration%5 == 0 {
2✔
1127
                                cont.log.Info("Nodeinfo and globalinfo in sync for node: ", nodeName)
1✔
1128
                        }
1✔
1129
                }
1130
                time.Sleep(time.Duration(seconds) * time.Second)
1✔
1131
                iteration++
1✔
1132
        }
1133
}
1134

1135
func (cont *AciController) processSyncQueue(queue workqueue.RateLimitingInterface,
1136
        queueStop <-chan struct{}) {
1✔
1137
        go wait.Until(func() {
2✔
1138
                for {
2✔
1139
                        syncType, quit := queue.Get()
1✔
1140
                        if quit {
2✔
1141
                                break
1✔
1142
                        }
1143
                        var requeue bool
1✔
1144
                        if sType, ok := syncType.(string); ok {
2✔
1145
                                if f, ok := cont.syncProcessors[sType]; ok {
2✔
1146
                                        requeue = f()
1✔
1147
                                }
1✔
1148
                        }
1149
                        if requeue {
1✔
1150
                                queue.AddRateLimited(syncType)
×
1151
                        } else {
1✔
1152
                                queue.Forget(syncType)
1✔
1153
                        }
1✔
1154
                        queue.Done(syncType)
1✔
1155
                }
1156
        }, time.Second, queueStop)
1157
        <-queueStop
1✔
1158
        queue.ShutDown()
1✔
1159
}
1160

1161
func (cont *AciController) scheduleSyncGlobalInfo() {
1✔
1162
        cont.syncQueue.AddRateLimited("snatGlobalInfo")
1✔
1163
}
1✔
1164
func (cont *AciController) scheduleRdConfig() {
×
1165
        cont.syncQueue.AddRateLimited("rdConfig")
×
1166
}
×
1167
func (cont *AciController) scheduleCreateIstioCR() {
×
1168
        cont.syncQueue.AddRateLimited("istioCR")
×
1169
}
×
1170

1171
func (cont *AciController) addVmmInjectedLabel() {
1✔
1172
        if apicapi.ApicVersion >= "5.2" {
1✔
1173
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedLabel")
×
1174
                if err != nil {
×
1175
                        panic(err.Error())
×
1176
                }
1177
                err = apicapi.AddMetaDataChild("vmmInjectedSvc", "vmmInjectedLabel")
×
1178
                if err != nil {
×
1179
                        panic(err.Error())
×
1180
                }
1181
        }
1182
        if apicapi.ApicVersion >= "5.0" {
2✔
1183
                err := apicapi.AddMetaDataChild("vmmInjectedReplSet", "vmmInjectedLabel")
1✔
1184
                if err != nil {
1✔
1185
                        panic(err.Error())
×
1186
                }
1187
                err = apicapi.AddMetaDataChild("vmmInjectedContGrp", "vmmInjectedLabel")
1✔
1188
                if err != nil {
1✔
1189
                        panic(err.Error())
×
1190
                }
1191
                err = apicapi.AddMetaDataChild("vmmInjectedDepl", "vmmInjectedLabel")
1✔
1192
                if err != nil {
1✔
1193
                        panic(err.Error())
×
1194
                }
1195
        }
1196
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc