• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 10728

18 Jun 2025 11:17PM UTC coverage: 68.581% (-0.09%) from 68.669%
10728

push

travis-pro

web-flow
Merge pull request #1537 from noironetworks/backport_upgrade_fixes

Make usage of systemid in CNO optional

14 of 33 new or added lines in 2 files covered. (42.42%)

375 existing lines in 5 files now uncovered.

13326 of 19431 relevant lines covered (68.58%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.89
/pkg/controller/controller.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "encoding/json"
19
        "fmt"
20
        "net"
21
        "os"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/sirupsen/logrus"
29
        "github.com/yl2chen/cidranger"
30
        "golang.org/x/time/rate"
31

32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        v1net "k8s.io/api/networking/v1"
35
        "k8s.io/apimachinery/pkg/labels"
36
        "k8s.io/apimachinery/pkg/util/wait"
37
        "k8s.io/client-go/kubernetes"
38
        "k8s.io/client-go/tools/cache"
39
        "k8s.io/client-go/util/workqueue"
40

41
        "github.com/noironetworks/aci-containers/pkg/apicapi"
42
        fabattv1 "github.com/noironetworks/aci-containers/pkg/fabricattachment/apis/aci.fabricattachment/v1"
43
        fabattclset "github.com/noironetworks/aci-containers/pkg/fabricattachment/clientset/versioned"
44
        hppv1 "github.com/noironetworks/aci-containers/pkg/hpp/apis/aci.hpp/v1"
45
        "github.com/noironetworks/aci-containers/pkg/index"
46
        "github.com/noironetworks/aci-containers/pkg/ipam"
47
        istiov1 "github.com/noironetworks/aci-containers/pkg/istiocrd/apis/aci.istio/v1"
48
        "github.com/noironetworks/aci-containers/pkg/metadata"
49
        nodeinfo "github.com/noironetworks/aci-containers/pkg/nodeinfo/apis/aci.snat/v1"
50
        rdConfig "github.com/noironetworks/aci-containers/pkg/rdconfig/apis/aci.snat/v1"
51
        snatglobalinfo "github.com/noironetworks/aci-containers/pkg/snatglobalinfo/apis/aci.snat/v1"
52
        "github.com/noironetworks/aci-containers/pkg/util"
53
)
54

55
type podUpdateFunc func(*v1.Pod) (*v1.Pod, error)
56
type nodeUpdateFunc func(*v1.Node) (*v1.Node, error)
57
type serviceUpdateFunc func(*v1.Service) (*v1.Service, error)
58
type listNetworkPoliciesFunc func(string) (*v1net.NetworkPolicyList, error)
59
type listNamespacesFunc func() (*v1.NamespaceList, error)
60

61
type AciController struct {
62
        log    *logrus.Logger
63
        config *ControllerConfig
64
        env    Environment
65

66
        defaultEg string
67
        defaultSg string
68

69
        unitTestMode bool
70

71
        podQueue              workqueue.RateLimitingInterface
72
        netPolQueue           workqueue.RateLimitingInterface
73
        qosQueue              workqueue.RateLimitingInterface
74
        serviceQueue          workqueue.RateLimitingInterface
75
        snatQueue             workqueue.RateLimitingInterface
76
        netflowQueue          workqueue.RateLimitingInterface
77
        erspanQueue           workqueue.RateLimitingInterface
78
        snatNodeInfoQueue     workqueue.RateLimitingInterface
79
        rdConfigQueue         workqueue.RateLimitingInterface
80
        istioQueue            workqueue.RateLimitingInterface
81
        nodeFabNetAttQueue    workqueue.RateLimitingInterface
82
        netFabConfigQueue     workqueue.RateLimitingInterface
83
        nadVlanMapQueue       workqueue.RateLimitingInterface
84
        fabricVlanPoolQueue   workqueue.RateLimitingInterface
85
        netFabL3ConfigQueue   workqueue.RateLimitingInterface
86
        remIpContQueue        workqueue.RateLimitingInterface
87
        epgDnCacheUpdateQueue workqueue.RateLimitingInterface
88

89
        namespaceIndexer                     cache.Indexer
90
        namespaceInformer                    cache.Controller
91
        podIndexer                           cache.Indexer
92
        podInformer                          cache.Controller
93
        endpointsIndexer                     cache.Indexer
94
        endpointsInformer                    cache.Controller
95
        serviceIndexer                       cache.Indexer
96
        serviceInformer                      cache.Controller
97
        replicaSetIndexer                    cache.Indexer
98
        replicaSetInformer                   cache.Controller
99
        deploymentIndexer                    cache.Indexer
100
        deploymentInformer                   cache.Controller
101
        nodeIndexer                          cache.Indexer
102
        nodeInformer                         cache.Controller
103
        networkPolicyIndexer                 cache.Indexer
104
        networkPolicyInformer                cache.Controller
105
        snatIndexer                          cache.Indexer
106
        snatInformer                         cache.Controller
107
        snatNodeInfoIndexer                  cache.Indexer
108
        snatNodeInformer                     cache.Controller
109
        snatLocalInfoInformer                cache.Controller
110
        crdInformer                          cache.Controller
111
        rdConfigInformer                     cache.Controller
112
        rdConfigIndexer                      cache.Indexer
113
        qosIndexer                           cache.Indexer
114
        qosInformer                          cache.Controller
115
        netflowIndexer                       cache.Indexer
116
        netflowInformer                      cache.Controller
117
        erspanIndexer                        cache.Indexer
118
        erspanInformer                       cache.Controller
119
        nodePodIfIndexer                     cache.Indexer
120
        nodePodIfInformer                    cache.Controller
121
        istioIndexer                         cache.Indexer
122
        istioInformer                        cache.Controller
123
        endpointSliceIndexer                 cache.Indexer
124
        endpointSliceInformer                cache.Controller
125
        snatCfgInformer                      cache.Controller
126
        updatePod                            podUpdateFunc
127
        updateNode                           nodeUpdateFunc
128
        updateServiceStatus                  serviceUpdateFunc
129
        listNetworkPolicies                  listNetworkPoliciesFunc
130
        listNamespaces                       listNamespacesFunc
131
        nodeFabNetAttInformer                cache.SharedIndexInformer
132
        netFabConfigInformer                 cache.SharedIndexInformer
133
        nadVlanMapInformer                   cache.SharedIndexInformer
134
        fabricVlanPoolInformer               cache.SharedIndexInformer
135
        networkFabricL3ConfigurationInformer cache.SharedIndexInformer
136
        fabNetAttClient                      *fabattclset.Clientset
137
        proactiveConfInformer                cache.SharedIndexInformer
138

139
        indexMutex sync.Mutex
140
        hppMutex   sync.Mutex
141

142
        configuredPodNetworkIps *netIps
143
        podNetworkIps           *netIps
144
        serviceIps              *ipam.IpCache
145
        staticServiceIps        *netIps
146
        nodeServiceIps          *netIps
147

148
        // index of pods matched by deployments
149
        depPods *index.PodSelectorIndex
150
        // index of pods matched by network policies
151
        netPolPods *index.PodSelectorIndex
152
        // index of pods matched by network policy ingress rules
153
        netPolIngressPods *index.PodSelectorIndex
154
        // index of pods matched by network policy egress rules
155
        netPolEgressPods *index.PodSelectorIndex
156
        // index of IP addresses contained in endpoints objects
157
        endpointsIpIndex cidranger.Ranger
158
        // index of service target ports
159
        targetPortIndex map[string]*portIndexEntry
160
        // index of ip blocks referenced by network policy egress rules
161
        netPolSubnetIndex cidranger.Ranger
162
        // index of pods matched by erspan policies
163
        erspanPolPods *index.PodSelectorIndex
164

165
        apicConn *apicapi.ApicConnection
166

167
        nodeServiceMetaCache map[string]*nodeServiceMeta
168
        nodeACIPod           map[string]aciPodAnnot
169
        nodeACIPodAnnot      map[string]aciPodAnnot
170
        nodeOpflexDevice     map[string]apicapi.ApicSlice
171
        nodePodNetCache      map[string]*nodePodNetMeta
172
        serviceMetaCache     map[string]*serviceMeta
173
        snatPolicyCache      map[string]*ContSnatPolicy
174
        delayedEpSlices      []*DelayedEpSlice
175
        snatServices         map[string]bool
176
        snatNodeInfoCache    map[string]*nodeinfo.NodeInfo
177
        rdConfigCache        map[string]*rdConfig.RdConfig
178
        rdConfigSubnetCache  map[string]*rdConfig.RdConfigSpec
179
        istioCache           map[string]*istiov1.AciIstioOperator
180
        podIftoEp            map[string]*EndPointData
181
        // Node Name and Policy Name
182
        snatGlobalInfoCache map[string]map[string]*snatglobalinfo.GlobalInfo
183
        nodeSyncEnabled     bool
184
        serviceSyncEnabled  bool
185
        snatSyncEnabled     bool
186
        syncQueue           workqueue.RateLimitingInterface
187
        syncProcessors      map[string]func() bool
188
        serviceEndPoints    ServiceEndPointType
189
        crdHandlers         map[string]func(*AciController, <-chan struct{})
190
        stopCh              <-chan struct{}
191
        //index of containerportname to ctrPortNameEntry
192
        ctrPortNameCache map[string]*ctrPortNameEntry
193
        // named networkPolicies
194
        nmPortNp map[string]bool
195
        //maps network policy hash to hpp
196
        hppRef map[string]hppReference
197
        //map for ns to remoteIpConts
198
        nsRemoteIpCont map[string]remoteIpConts
199
        // cache to look for Epg DNs which are bound to Vmm domain
200
        cachedEpgDns             []string
201
        vmmClusterFaultSupported bool
202
        additionalNetworkCache   map[string]*AdditionalNetworkMeta
203
        //Used in Shared mode
204
        sharedEncapCache       map[int]*sharedEncapData
205
        sharedEncapAepCache    map[string]map[int]bool
206
        sharedEncapSviCache    map[int]*NfL3Data
207
        sharedEncapVrfCache    map[string]*NfVrfData
208
        sharedEncapTenantCache map[string]*NfTenantData
209
        nfl3configGenerationId int64
210
        // vlan to propertiesList
211
        sharedEncapNfcCache         map[int]*NfcData
212
        sharedEncapNfcVlanMap       map[int]*NfcData
213
        sharedEncapNfcLabelMap      map[string]*NfcData
214
        sharedEncapNfcAppProfileMap map[string]bool
215
        // nadVlanMap encapLabel to vlan
216
        sharedEncapLabelMap      map[string][]int
217
        lldpIfCache              map[string]*NfLLDPIfData
218
        globalVlanConfig         globalVlanConfig
219
        fabricVlanPoolMap        map[string]map[string]string
220
        openStackFabricPathDnMap map[string]openstackOpflexOdevInfo
221
        hostFabricPathDnMap      map[string]hostFabricInfo
222
        openStackSystemId        string
223
}
224

225
type hostFabricInfo struct {
226
        fabricPathDn string
227
        host         string
228
        vpcIfDn      map[string]struct{}
229
}
230

231
type NfLLDPIfData struct {
232
        LLDPIf string
233
        // As of now, manage at the NAD level
234
        // more granular introduces intf tracking complexities
235
        // for not sufficient benefits
236
        Refs map[string]bool
237
}
238

239
type NfL3OutData struct {
240
        // +kubebuilder:validation:Enum:"import"
241
        RtCtrl     string
242
        PodId      int
243
        RtrNodeMap map[int]*fabattv1.FabricL3OutRtrNode
244
        ExtEpgMap  map[string]*fabattv1.PolicyPrefixGroup
245
        SviMap     map[int]bool
246
}
247

248
type NfTenantData struct {
249
        CommonTenant     bool
250
        L3OutConfig      map[string]*NfL3OutData
251
        BGPPeerPfxConfig map[string]*fabattv1.BGPPeerPrefixPolicy
252
}
253

254
type NfVrfData struct {
255
        TenantConfig map[string]*NfTenantData
256
}
257

258
type NfL3Networks struct {
259
        fabattv1.PrimaryNetwork
260
        Subnets map[string]*fabattv1.FabricL3Subnet
261
}
262

263
type NfL3Data struct {
264
        Tenant      string
265
        Vrf         fabattv1.VRF
266
        PodId       int
267
        ConnectedNw *NfL3Networks
268
        NetAddr     map[string]*RoutedNetworkData
269
        Nodes       map[int]fabattv1.FabricL3OutNode
270
}
271

272
// maps pod name to remoteIpCont
273
type remoteIpConts map[string]remoteIpCont
274

275
// remoteIpCont maps ip to pod labels
276
type remoteIpCont map[string]map[string]string
277

278
type NfcData struct {
279
        Aeps map[string]bool
280
        Epg  fabattv1.Epg
281
}
282

283
type sharedEncapData struct {
284
        //node to NAD to pods
285
        Pods   map[string]map[string][]string
286
        NetRef map[string]*AdditionalNetworkMeta
287
        Aeps   map[string]bool
288
}
289

290
type globalVlanConfig struct {
291
        SharedPhysDom apicapi.ApicObject
292
        SharedL3Dom   apicapi.ApicObject
293
}
294

295
type hppReference struct {
296
        RefCount uint              `json:"ref-count,omitempty"`
297
        Npkeys   []string          `json:"npkeys,omitempty"`
298
        HppObj   apicapi.ApicSlice `json:"hpp-obj,omitempty"`
299
        HppCr    hppv1.HostprotPol `json:"hpp-cr,omitempty"`
300
}
301

302
type DelayedEpSlice struct {
303
        ServiceKey  string
304
        OldEpSlice  *discovery.EndpointSlice
305
        NewEpSlice  *discovery.EndpointSlice
306
        DelayedTime time.Time
307
}
308

309
type aciPodAnnot struct {
310
        aciPod             string
311
        isSingleOpflexOdev bool
312
        disconnectTime     time.Time
313
        connectTime        time.Time
314
        lastErrorTime      time.Time
315
}
316

317
type nodeServiceMeta struct {
318
        serviceEp metadata.ServiceEndpoint
319
}
320

321
type nodePodNetMeta struct {
322
        nodePods            map[string]bool
323
        podNetIps           metadata.NetIps
324
        podNetIpsAnnotation string
325
}
326

327
type openstackOpflexOdevInfo struct {
328
        opflexODevDn map[string]struct{}
329
        fabricPathDn string
330
}
331

332
type serviceMeta struct {
333
        requestedIps     []net.IP
334
        ingressIps       []net.IP
335
        staticIngressIps []net.IP
336
}
337

338
type ipIndexEntry struct {
339
        ipNet net.IPNet
340
        keys  map[string]bool
341
}
342

343
type targetPort struct {
344
        proto v1.Protocol
345
        ports []int
346
}
347

348
type portIndexEntry struct {
349
        port              targetPort
350
        serviceKeys       map[string]bool
351
        networkPolicyKeys map[string]bool
352
}
353

354
type portRangeSnat struct {
355
        start int
356
        end   int
357
}
358

359
// EndPointData holds PodIF data in controller.
360
type EndPointData struct {
361
        MacAddr    string
362
        EPG        string
363
        Namespace  string
364
        AppProfile string
365
}
366

367
type ctrPortNameEntry struct {
368
        // Proto+port->pods
369
        ctrNmpToPods map[string]map[string]bool
370
}
371

372
type LinkData struct {
373
        Link []string
374
        Pods []string
375
}
376

377
type RoutedNodeData struct {
378
        addr string
379
        idx  int
380
}
381

382
type RoutedNetworkData struct {
383
        subnet       string
384
        netAddress   string
385
        maskLen      int
386
        numAllocated int
387
        maxAddresses int
388
        baseAddress  net.IP
389
        nodeMap      map[string]RoutedNodeData
390
        availableMap map[int]bool
391
}
392

393
type AdditionalNetworkMeta struct {
394
        NetworkName string
395
        EncapVlan   string
396
        //node+localiface->fabricLinks
397
        FabricLink map[string]map[string]LinkData
398
        NodeCache  map[string]*fabattv1.NodeFabricNetworkAttachment
399
        Mode       util.EncapMode
400
}
401

402
type ServiceEndPointType interface {
403
        InitClientInformer(kubeClient *kubernetes.Clientset)
404
        Run(stopCh <-chan struct{})
405
        Wait(stopCh <-chan struct{})
406
        UpdateServicesForNode(nodename string)
407
        GetnodesMetadata(key string, service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint)
408
        SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool
409
        SetNpServiceAugmentForService(servicekey string, service *v1.Service, prs *portRemoteSubnet,
410
                portAugments map[string]*portServiceAugment, subnetIndex cidranger.Ranger, logger *logrus.Entry)
411
}
412

413
type serviceEndpoint struct {
414
        cont *AciController
415
}
416
type serviceEndpointSlice struct {
417
        cont *AciController
418
}
419

420
func (sep *serviceEndpoint) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
421
        sep.cont.initEndpointsInformerFromClient(kubeClient)
×
422
}
×
423

424
func (seps *serviceEndpointSlice) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
425
        seps.cont.initEndpointSliceInformerFromClient(kubeClient)
×
426
}
×
427

428
func (sep *serviceEndpoint) Run(stopCh <-chan struct{}) {
1✔
429
        go sep.cont.endpointsInformer.Run(stopCh)
1✔
430
}
1✔
431

432
func (seps *serviceEndpointSlice) Run(stopCh <-chan struct{}) {
1✔
433
        go seps.cont.endpointSliceInformer.Run(stopCh)
1✔
434
}
1✔
435

436
func (sep *serviceEndpoint) Wait(stopCh <-chan struct{}) {
1✔
437
        cache.WaitForCacheSync(stopCh,
1✔
438
                sep.cont.endpointsInformer.HasSynced,
1✔
439
                sep.cont.serviceInformer.HasSynced)
1✔
440
}
1✔
441

442
func (seps *serviceEndpointSlice) Wait(stopCh <-chan struct{}) {
1✔
443
        seps.cont.log.Debug("Waiting for EndPointSlicecache sync")
1✔
444
        cache.WaitForCacheSync(stopCh,
1✔
445
                seps.cont.endpointSliceInformer.HasSynced,
1✔
446
                seps.cont.serviceInformer.HasSynced)
1✔
447
}
1✔
448

449
func (e *ipIndexEntry) Network() net.IPNet {
1✔
450
        return e.ipNet
1✔
451
}
1✔
452

453
func newNodePodNetMeta() *nodePodNetMeta {
1✔
454
        return &nodePodNetMeta{
1✔
455
                nodePods: make(map[string]bool),
1✔
456
        }
1✔
457
}
1✔
458

459
func createQueue(name string) workqueue.RateLimitingInterface {
1✔
460
        return workqueue.NewNamedRateLimitingQueue(
1✔
461
                workqueue.NewMaxOfRateLimiter(
1✔
462
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
463
                                10*time.Second),
1✔
464
                        &workqueue.BucketRateLimiter{
1✔
465
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
466
                        },
1✔
467
                ),
1✔
468
                "delta")
1✔
469
}
1✔
470

471
func NewController(config *ControllerConfig, env Environment, log *logrus.Logger, unittestmode bool) *AciController {
1✔
472
        cont := &AciController{
1✔
473
                log:          log,
1✔
474
                config:       config,
1✔
475
                env:          env,
1✔
476
                defaultEg:    "",
1✔
477
                defaultSg:    "",
1✔
478
                unitTestMode: unittestmode,
1✔
479

1✔
480
                podQueue:              createQueue("pod"),
1✔
481
                netPolQueue:           createQueue("networkPolicy"),
1✔
482
                qosQueue:              createQueue("qos"),
1✔
483
                netflowQueue:          createQueue("netflow"),
1✔
484
                erspanQueue:           createQueue("erspan"),
1✔
485
                serviceQueue:          createQueue("service"),
1✔
486
                snatQueue:             createQueue("snat"),
1✔
487
                snatNodeInfoQueue:     createQueue("snatnodeinfo"),
1✔
488
                rdConfigQueue:         createQueue("rdconfig"),
1✔
489
                istioQueue:            createQueue("istio"),
1✔
490
                nodeFabNetAttQueue:    createQueue("nodefabricnetworkattachment"),
1✔
491
                netFabConfigQueue:     createQueue("networkfabricconfiguration"),
1✔
492
                nadVlanMapQueue:       createQueue("nadvlanmap"),
1✔
493
                fabricVlanPoolQueue:   createQueue("fabricvlanpool"),
1✔
494
                netFabL3ConfigQueue:   createQueue("networkfabricl3configuration"),
1✔
495
                remIpContQueue:        createQueue("remoteIpContainer"),
1✔
496
                epgDnCacheUpdateQueue: createQueue("epgDnCache"),
1✔
497
                syncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
498
                        &workqueue.BucketRateLimiter{
1✔
499
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
500
                        }, "sync"),
1✔
501

1✔
502
                configuredPodNetworkIps: newNetIps(),
1✔
503
                podNetworkIps:           newNetIps(),
1✔
504
                serviceIps:              ipam.NewIpCache(),
1✔
505
                staticServiceIps:        newNetIps(),
1✔
506
                nodeServiceIps:          newNetIps(),
1✔
507

1✔
508
                nodeACIPod:       make(map[string]aciPodAnnot),
1✔
509
                nodeACIPodAnnot:  make(map[string]aciPodAnnot),
1✔
510
                nodeOpflexDevice: make(map[string]apicapi.ApicSlice),
1✔
511

1✔
512
                nodeServiceMetaCache:        make(map[string]*nodeServiceMeta),
1✔
513
                nodePodNetCache:             make(map[string]*nodePodNetMeta),
1✔
514
                serviceMetaCache:            make(map[string]*serviceMeta),
1✔
515
                snatPolicyCache:             make(map[string]*ContSnatPolicy),
1✔
516
                snatServices:                make(map[string]bool),
1✔
517
                snatNodeInfoCache:           make(map[string]*nodeinfo.NodeInfo),
1✔
518
                rdConfigCache:               make(map[string]*rdConfig.RdConfig),
1✔
519
                rdConfigSubnetCache:         make(map[string]*rdConfig.RdConfigSpec),
1✔
520
                podIftoEp:                   make(map[string]*EndPointData),
1✔
521
                snatGlobalInfoCache:         make(map[string]map[string]*snatglobalinfo.GlobalInfo),
1✔
522
                istioCache:                  make(map[string]*istiov1.AciIstioOperator),
1✔
523
                crdHandlers:                 make(map[string]func(*AciController, <-chan struct{})),
1✔
524
                ctrPortNameCache:            make(map[string]*ctrPortNameEntry),
1✔
525
                nmPortNp:                    make(map[string]bool),
1✔
526
                hppRef:                      make(map[string]hppReference),
1✔
527
                additionalNetworkCache:      make(map[string]*AdditionalNetworkMeta),
1✔
528
                sharedEncapCache:            make(map[int]*sharedEncapData),
1✔
529
                sharedEncapAepCache:         make(map[string]map[int]bool),
1✔
530
                sharedEncapSviCache:         make(map[int]*NfL3Data),
1✔
531
                sharedEncapVrfCache:         make(map[string]*NfVrfData),
1✔
532
                sharedEncapTenantCache:      make(map[string]*NfTenantData),
1✔
533
                sharedEncapNfcCache:         make(map[int]*NfcData),
1✔
534
                sharedEncapNfcVlanMap:       make(map[int]*NfcData),
1✔
535
                sharedEncapNfcLabelMap:      make(map[string]*NfcData),
1✔
536
                sharedEncapNfcAppProfileMap: make(map[string]bool),
1✔
537
                sharedEncapLabelMap:         make(map[string][]int),
1✔
538
                lldpIfCache:                 make(map[string]*NfLLDPIfData),
1✔
539
                fabricVlanPoolMap:           make(map[string]map[string]string),
1✔
540
                openStackFabricPathDnMap:    make(map[string]openstackOpflexOdevInfo),
1✔
541
                hostFabricPathDnMap:         make(map[string]hostFabricInfo),
1✔
542
                nsRemoteIpCont:              make(map[string]remoteIpConts),
1✔
543
        }
1✔
544
        cont.syncProcessors = map[string]func() bool{
1✔
545
                "snatGlobalInfo": cont.syncSnatGlobalInfo,
1✔
546
                "rdConfig":       cont.syncRdConfig,
1✔
547
                /* Commenting code to remove dependency from istio.io/istio package.
1✔
548
                   Vulnerabilties were detected by quay.io security scan of aci-containers-controller
1✔
549
                   and aci-containers-operator images for istio.io/istio package
1✔
550

1✔
551
                "istioCR":        cont.createIstioCR,
1✔
552
                */
1✔
553
        }
1✔
554
        return cont
1✔
555
}
1✔
556

557
func (cont *AciController) Init() {
×
558
        if cont.config.ChainedMode {
×
559
                cont.log.Info("In chained mode")
×
560
        }
×
561

562
        egdata, err := json.Marshal(cont.config.DefaultEg)
×
563
        if err != nil {
×
564
                cont.log.Error("Could not serialize default endpoint group")
×
565
                panic(err.Error())
×
566
        }
567
        cont.defaultEg = string(egdata)
×
568

×
569
        sgdata, err := json.Marshal(cont.config.DefaultSg)
×
570
        if err != nil {
×
571
                cont.log.Error("Could not serialize default security groups")
×
572
                panic(err.Error())
×
573
        }
574
        cont.defaultSg = string(sgdata)
×
575

×
576
        cont.log.Debug("Initializing IPAM")
×
577
        cont.initIpam()
×
578
        // check if the cluster supports endpoint slices
×
579
        // if cluster doesn't have the support fallback to endpoints
×
580
        kubeClient := cont.env.(*K8sEnvironment).kubeClient
×
581
        if util.IsEndPointSlicesSupported(kubeClient) {
×
582
                cont.serviceEndPoints = &serviceEndpointSlice{}
×
583
                cont.serviceEndPoints.(*serviceEndpointSlice).cont = cont
×
584
                cont.log.Info("Initializing ServiceEndpointSlices")
×
585
        } else {
×
586
                cont.serviceEndPoints = &serviceEndpoint{}
×
587
                cont.serviceEndPoints.(*serviceEndpoint).cont = cont
×
588
                cont.log.Info("Initializing ServiceEndpoints")
×
589
        }
×
590

591
        err = cont.env.Init(cont)
×
592
        if err != nil {
×
593
                panic(err.Error())
×
594
        }
595
}
596

597
func (cont *AciController) processQueue(queue workqueue.RateLimitingInterface,
598
        store cache.Store, handler func(interface{}) bool,
599
        deleteHandler func(string) bool,
600
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
601
        go wait.Until(func() {
2✔
602
                for {
2✔
603
                        key, quit := queue.Get()
1✔
604
                        if quit {
2✔
605
                                break
1✔
606
                        }
607

608
                        var requeue bool
1✔
609
                        switch key := key.(type) {
1✔
610
                        case chan struct{}:
×
611
                                close(key)
×
612
                        case string:
1✔
613
                                if strings.HasPrefix(key, "DELETED_") {
2✔
614
                                        delKey := strings.Trim(key, "DELETED_")
1✔
615
                                        requeue = deleteHandler(delKey)
1✔
616
                                } else {
2✔
617
                                        obj, exists, err := store.GetByKey(key)
1✔
618
                                        if err != nil {
1✔
619
                                                cont.log.Debugf("Error fetching object with key %s from store: %v", key, err)
×
620
                                        }
×
621
                                        //Handle Add/Update/Delete
622
                                        if exists && handler != nil {
2✔
623
                                                requeue = handler(obj)
1✔
624
                                        }
1✔
625
                                        //Handle Post Delete
626
                                        if !exists && postDelHandler != nil {
1✔
627
                                                requeue = postDelHandler()
×
628
                                        }
×
629
                                }
630
                        }
631
                        if requeue {
2✔
632
                                queue.AddRateLimited(key)
1✔
633
                        } else {
2✔
634
                                queue.Forget(key)
1✔
635
                        }
1✔
636
                        queue.Done(key)
1✔
637
                }
638
        }, time.Second, stopCh)
639
        <-stopCh
1✔
640
        queue.ShutDown()
1✔
641
}
642

643
func (cont *AciController) processRemIpContQueue(queue workqueue.RateLimitingInterface,
644
        handler func(interface{}) bool,
645
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
646
        go wait.Until(func() {
2✔
647
                for {
2✔
648
                        key, quit := queue.Get()
1✔
649
                        if quit {
2✔
650
                                break
1✔
651
                        }
652

653
                        var requeue bool
1✔
654
                        switch key := key.(type) {
1✔
655
                        case chan struct{}:
×
656
                                close(key)
×
657
                        case string:
1✔
658
                                if handler != nil {
2✔
659
                                        requeue = handler(key)
1✔
660
                                }
1✔
661
                                if postDelHandler != nil {
2✔
662
                                        requeue = postDelHandler()
1✔
663
                                }
1✔
664
                        }
665
                        if requeue {
1✔
666
                                queue.AddRateLimited(key)
×
667
                        } else {
1✔
668
                                queue.Forget(key)
1✔
669
                        }
1✔
670
                        queue.Done(key)
1✔
671

672
                }
673
        }, time.Second, stopCh)
674
        <-stopCh
1✔
675
        queue.ShutDown()
1✔
676
}
677

678
func (cont *AciController) processEpgDnCacheUpdateQueue(queue workqueue.RateLimitingInterface,
679
        handler func(interface{}) bool,
680
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
681
        go wait.Until(func() {
2✔
682
                for {
2✔
683
                        key, quit := queue.Get()
1✔
684
                        if quit {
2✔
685
                                break
1✔
686
                        }
687

688
                        var requeue bool
1✔
689
                        switch key := key.(type) {
1✔
690
                        case chan struct{}:
×
691
                                close(key)
×
692
                        case bool:
1✔
693
                                if handler != nil {
2✔
694
                                        requeue = handler(key)
1✔
695
                                }
1✔
696
                                if postDelHandler != nil {
1✔
697
                                        requeue = postDelHandler()
×
698
                                }
×
699
                        }
700
                        if requeue {
1✔
701
                                queue.AddRateLimited(key)
×
702
                        } else {
1✔
703
                                queue.Forget(key)
1✔
704
                        }
1✔
705
                        queue.Done(key)
1✔
706

707
                }
708
        }, time.Second, stopCh)
709
        <-stopCh
1✔
710
        queue.ShutDown()
1✔
711
}
712

713
func (cont *AciController) globalStaticObjs() apicapi.ApicSlice {
1✔
714
        return apicapi.ApicSlice{}
1✔
715
}
1✔
716

717
func (cont *AciController) aciNameForKey(ktype, key string) string {
1✔
718
        return util.AciNameForKey(cont.config.AciPrefix, ktype, key)
1✔
719
}
1✔
720

721
func (cont *AciController) initStaticObjs() {
1✔
722
        cont.env.InitStaticAciObjects()
1✔
723
        cont.apicConn.WriteStaticApicObjects(cont.config.AciPrefix+"_static",
1✔
724
                cont.globalStaticObjs())
1✔
725
}
1✔
726

727
func (cont *AciController) vmmDomainProvider() (vmmProv string) {
1✔
728
        vmmProv = "Kubernetes"
1✔
729
        if strings.ToLower(cont.config.AciVmmDomainType) == "openshift" {
1✔
730
                vmmProv = "OpenShift"
×
731
        }
×
732
        return
1✔
733
}
734

735
func (cont *AciController) Run(stopCh <-chan struct{}) {
1✔
736
        var err error
1✔
737
        var privKey []byte
1✔
738
        var apicCert []byte
1✔
739

1✔
740
        cont.config.AciVrfDn = "uni/tn-" + cont.config.AciVrfTenant + "/ctx-" + cont.config.AciVrf
1✔
741

1✔
742
        if cont.config.ApicPrivateKeyPath != "" {
1✔
743
                privKey, err = os.ReadFile(cont.config.ApicPrivateKeyPath)
×
744
                if err != nil {
×
745
                        panic(err)
×
746
                }
747
        }
748
        if cont.config.ApicCertPath != "" {
1✔
749
                apicCert, err = os.ReadFile(cont.config.ApicCertPath)
×
750
                if err != nil {
×
751
                        panic(err)
×
752
                }
753
        }
754
        // If not defined, default is 1800
755
        if cont.config.ApicRefreshTimer == "" {
2✔
756
                cont.config.ApicRefreshTimer = "1800"
1✔
757
        }
1✔
758
        refreshTimeout, err := strconv.Atoi(cont.config.ApicRefreshTimer)
1✔
759
        if err != nil {
1✔
760
                panic(err)
×
761
        }
762
        cont.log.Info("ApicRefreshTimer conf is set to: ", refreshTimeout)
1✔
763

1✔
764
        // Bailout if the refreshTimeout is more than 12Hours or less than 5Mins
1✔
765
        if refreshTimeout > (12*60*60) || refreshTimeout < (5*60) {
1✔
766
                cont.log.Info("ApicRefreshTimer can't be more than 12Hrs or less than 5Mins")
×
767
                panic(err)
×
768
        }
769

770
        // If RefreshTickerAdjustInterval is not defined, default to 150Sec.
771
        if cont.config.ApicRefreshTickerAdjust == "" {
2✔
772
                cont.config.ApicRefreshTickerAdjust = "210"
1✔
773
        }
1✔
774
        refreshTickerAdjust, err := strconv.Atoi(cont.config.ApicRefreshTickerAdjust)
1✔
775
        if err != nil {
1✔
776
                panic(err)
×
777
        }
778

779
        //If ApicSubscriptionDelay is not defined, default to 100ms
780
        if cont.config.ApicSubscriptionDelay == 0 {
2✔
781
                cont.config.ApicSubscriptionDelay = 100
1✔
782
        }
1✔
783
        cont.log.Info("ApicSubscriptionDelay conf is set to: ", cont.config.ApicSubscriptionDelay)
1✔
784

1✔
785
        // If OpflexDeviceDeleteTimeout is not defined, default to 1800s
1✔
786
        if cont.config.OpflexDeviceDeleteTimeout == 0 {
2✔
787
                cont.config.OpflexDeviceDeleteTimeout = 1800
1✔
788
        }
1✔
789

790
        // If OpflexDeviceReconnectWaitTimeout is not defined, default to 25s
791
        if cont.config.OpflexDeviceReconnectWaitTimeout == 0 {
2✔
792
                cont.config.OpflexDeviceReconnectWaitTimeout = 25
1✔
793
        }
1✔
794
        cont.log.Debug("OpflexDeviceReconnectWaitTimeout set to: ", cont.config.OpflexDeviceReconnectWaitTimeout)
1✔
795

1✔
796
        // If SleepTimeSnatGlobalInfoSync is not defined, default to 60
1✔
797
        if cont.config.SleepTimeSnatGlobalInfoSync == 0 {
2✔
798
                cont.config.SleepTimeSnatGlobalInfoSync = 60
1✔
799
        }
1✔
800

801
        // If not defined, default to 32
802
        if cont.config.PodIpPoolChunkSize == 0 {
2✔
803
                cont.config.PodIpPoolChunkSize = 32
1✔
804
        }
1✔
805
        if !cont.config.ChainedMode {
2✔
806
                cont.log.Info("PodIpPoolChunkSize conf is set to: ", cont.config.PodIpPoolChunkSize)
1✔
807
        }
1✔
808

809
        // If ApicConnectionRetryLimit is not defined, default to 5
810
        if cont.config.ApicConnectionRetryLimit == 0 {
2✔
811
                cont.config.ApicConnectionRetryLimit = 5
1✔
812
        }
1✔
813
        cont.log.Debug("ApicConnectionRetryLimit set to: ", cont.config.ApicConnectionRetryLimit)
1✔
814

1✔
815
        // If not valid, default to 5000-65000
1✔
816
        // other permissible values 1-65000
1✔
817
        defStart := 5000
1✔
818
        defEnd := 65000
1✔
819
        if cont.config.SnatDefaultPortRangeStart == 0 {
2✔
820
                cont.config.SnatDefaultPortRangeStart = defStart
1✔
821
        }
1✔
822
        if cont.config.SnatDefaultPortRangeEnd == 0 {
2✔
823
                cont.config.SnatDefaultPortRangeEnd = defEnd
1✔
824
        }
1✔
825
        if cont.config.SnatDefaultPortRangeStart < 0 || cont.config.SnatDefaultPortRangeEnd < 0 ||
1✔
826
                cont.config.SnatDefaultPortRangeStart > defEnd || cont.config.SnatDefaultPortRangeEnd > defEnd ||
1✔
827
                cont.config.SnatDefaultPortRangeStart > cont.config.SnatDefaultPortRangeEnd {
1✔
828
                cont.config.SnatDefaultPortRangeStart = defStart
×
829
                cont.config.SnatDefaultPortRangeEnd = defEnd
×
830
        }
×
831

832
        // Set default value for pbr programming delay if services list is not empty
833
        // and delay value is empty
834
        if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 &&
1✔
835
                cont.config.ServiceGraphEndpointAddDelay.Services != nil &&
1✔
836
                len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 {
1✔
837
                cont.config.ServiceGraphEndpointAddDelay.Delay = 90
×
838
        }
×
839
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
840
                cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay)
×
841
        }
×
842

843
        // Set contract scope for snat svc graph to global by default
844
        if cont.config.SnatSvcContractScope == "" {
2✔
845
                cont.config.SnatSvcContractScope = "global"
1✔
846
        }
1✔
847
        if cont.config.MaxSvcGraphNodes == 0 {
2✔
848
                cont.config.MaxSvcGraphNodes = 32
1✔
849
        }
1✔
850
        if !cont.config.ChainedMode {
2✔
851
                cont.log.Info("Max number of nodes per svc graph is set to: ", cont.config.MaxSvcGraphNodes)
1✔
852
        }
1✔
853
        cont.apicConn, err = apicapi.New(cont.log, cont.config.ApicHosts,
1✔
854
                cont.config.ApicUsername, cont.config.ApicPassword,
1✔
855
                privKey, apicCert, cont.config.AciPrefix,
1✔
856
                refreshTimeout, refreshTickerAdjust, cont.config.ApicSubscriptionDelay,
1✔
857
                cont.config.AciVrfTenant, cont.UpdateLLDPIfLocked)
1✔
858
        if err != nil {
1✔
859
                panic(err)
×
860
        }
861

862
        cont.apicConn.FilterOpflexDevice = cont.config.FilterOpflexDevice
1✔
863
        cont.apicConn.Flavor = cont.config.Flavor
1✔
864
        cont.apicConn.VmmDomain = cont.config.AciVmmDomain
1✔
865
        cont.apicConn.ReconnectRetryLimit = cont.config.ApicConnectionRetryLimit
1✔
866
        cont.apicConn.RequestRetryDelay = cont.config.ApicRequestRetryDelay
1✔
867
        cont.apicConn.EnableRequestRetry = cont.config.EnableApicRequestRetry
1✔
868

1✔
869
        if len(cont.config.ApicHosts) != 0 {
1✔
870
        APIC_SWITCH:
×
871
                cont.log.WithFields(logrus.Fields{
×
872
                        "mod":  "APICAPI",
×
873
                        "host": cont.apicConn.Apic[cont.apicConn.ApicIndex],
×
874
                }).Debug("Connecting to APIC to determine the Version")
×
875

×
876
                version, err := cont.apicConn.GetVersion()
×
877
                if err != nil {
×
878
                        cont.log.Error("Could not get APIC version, switching to next APIC")
×
UNCOV
879
                        cont.apicConn.ApicIndex = (cont.apicConn.ApicIndex + 1) % len(cont.apicConn.Apic)
×
880
                        time.Sleep(cont.apicConn.ReconnectInterval)
×
881
                        goto APIC_SWITCH
×
882
                }
883
                cont.apicConn.CachedVersion = version
×
884
                apicapi.ApicVersion = version
×
885
                if version >= "4.2(4i)" {
×
886
                        cont.apicConn.SnatPbrFltrChain = true
×
887
                } else {
×
888
                        cont.apicConn.SnatPbrFltrChain = false
×
889
                }
×
UNCOV
890
                if version >= "5.2" {
×
UNCOV
891
                        cont.vmmClusterFaultSupported = true
×
UNCOV
892
                }
×
893
        } else { // For unit-tests
1✔
894
                cont.apicConn.SnatPbrFltrChain = true
1✔
895
        }
1✔
896

897
        if !cont.config.ChainedMode {
2✔
898
                cont.log.Debug("SnatPbrFltrChain set to:", cont.apicConn.SnatPbrFltrChain)
1✔
899
                // Make sure Pod/NodeBDs are assoicated to same VRF.
1✔
900
                if len(cont.config.ApicHosts) != 0 && cont.config.AciPodBdDn != "" && cont.config.AciNodeBdDn != "" {
1✔
901
                        var expectedVrfRelations []string
×
902
                        expectedVrfRelations = append(expectedVrfRelations, cont.config.AciPodBdDn, cont.config.AciNodeBdDn)
×
903
                        cont.log.Debug("expectedVrfRelations:", expectedVrfRelations)
×
904
                        err = cont.apicConn.ValidateAciVrfAssociation(cont.config.AciVrfDn, expectedVrfRelations)
×
UNCOV
905
                        if err != nil {
×
UNCOV
906
                                cont.log.Error("Pod/NodeBDs and AciL3Out VRF association is incorrect")
×
UNCOV
907
                                panic(err)
×
908
                        }
909
                }
910
        }
911

912
        if len(cont.config.ApicHosts) != 0 && cont.vmmClusterFaultSupported && !cont.config.ChainedMode {
1✔
913
                //Clear fault instances when the controller starts
×
914
                cont.clearFaultInstances()
×
915
                //Subscribe for vmmEpPD for a given domain
×
916
                var tnTargetFilterEpg string
×
917
                tnTargetFilterEpg += fmt.Sprintf("uni/vmmp-%s/dom-%s/", cont.vmmDomainProvider(), cont.config.AciVmmDomain)
×
918
                subnetTargetFilterEpg := fmt.Sprintf("and(wcard(vmmEpPD.dn,\"%s\"))", tnTargetFilterEpg)
×
919
                cont.apicConn.AddSubscriptionClass("vmmEpPD",
×
920
                        []string{"vmmEpPD"}, subnetTargetFilterEpg)
×
921
                cont.apicConn.SetSubscriptionHooks("vmmEpPD",
×
922
                        func(obj apicapi.ApicObject) bool {
×
923
                                cont.vmmEpPDChanged(obj)
×
924
                                return true
×
925
                        },
×
UNCOV
926
                        func(dn string) {
×
UNCOV
927
                                cont.vmmEpPDDeleted(dn)
×
UNCOV
928
                        })
×
929
        }
930

931
        cont.initStaticObjs()
1✔
932

1✔
933
        err = cont.env.PrepareRun(stopCh)
1✔
934
        if err != nil {
1✔
UNCOV
935
                panic(err.Error())
×
936
        }
937

938
        cont.apicConn.FullSyncHook = func() {
1✔
939
                // put a channel into each work queue and wait on it to
×
940
                // checkpoint object syncing in response to new subscription
×
941
                // updates
×
942
                cont.log.Debug("Starting checkpoint")
×
943
                var chans []chan struct{}
×
944
                qs := make([]workqueue.RateLimitingInterface, 0)
×
945
                _, ok := cont.env.(*K8sEnvironment)
×
946
                if ok {
×
947
                        qs = []workqueue.RateLimitingInterface{cont.podQueue}
×
948
                        if !cont.config.ChainedMode {
×
949
                                if !cont.config.DisableHppRendering {
×
950
                                        qs = append(qs, cont.netPolQueue)
×
951
                                }
×
952
                                if cont.config.EnableHppDirect {
×
953
                                        qs = append(qs, cont.remIpContQueue)
×
954
                                }
×
955
                                qs = append(qs, cont.qosQueue, cont.serviceQueue,
×
UNCOV
956
                                        cont.snatQueue, cont.netflowQueue, cont.snatNodeInfoQueue,
×
UNCOV
957
                                        cont.rdConfigQueue, cont.erspanQueue,
×
958
                                        cont.epgDnCacheUpdateQueue)
×
959
                        }
960
                }
961
                for _, q := range qs {
×
962
                        c := make(chan struct{})
×
963
                        chans = append(chans, c)
×
964
                        q.Add(c)
×
965
                }
×
966
                for _, c := range chans {
×
UNCOV
967
                        <-c
×
UNCOV
968
                }
×
UNCOV
969
                cont.log.Debug("Checkpoint complete")
×
970
        }
971

972
        if len(cont.config.ApicHosts) != 0 && !cont.config.ChainedMode {
1✔
973
                cont.BuildSubnetDnCache(cont.config.AciVrfDn, cont.config.AciVrfDn)
×
974
                cont.scheduleRdConfig()
×
UNCOV
975
                if strings.Contains(cont.config.Flavor, "openstack") {
×
UNCOV
976
                        cont.setOpenStackSystemId()
×
UNCOV
977
                }
×
978
        }
979

980
        if !cont.config.ChainedMode {
2✔
981
                if cont.config.AciPolicyTenant != cont.config.AciVrfTenant {
2✔
982
                        cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
983
                                []string{"hostprotPol"})
1✔
984
                }
1✔
985
        } else {
1✔
986
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
987
                        []string{"fvBD", "fvAp"})
1✔
988
                cont.apicConn.AddSubscriptionClass("fvnsVlanInstP",
1✔
989
                        []string{"fvnsVlanInstP"}, "")
1✔
990
                cont.apicConn.AddSubscriptionClass("infraRsDomP",
1✔
991
                        []string{"infraRsDomP"}, "")
1✔
992
                cont.apicConn.AddSubscriptionClass("physDomP",
1✔
993
                        []string{"physDomP"}, "")
1✔
994
                cont.apicConn.AddSubscriptionClass("l3extDomP",
1✔
995
                        []string{"l3extDomP"}, "")
1✔
996
                cont.apicConn.AddSubscriptionClass("infraRsVlanNs",
1✔
997
                        []string{"infraRsVlanNs"}, "")
1✔
998
                cont.apicConn.AddSubscriptionClass("infraGeneric",
1✔
999
                        []string{"infraGeneric", "infraRsFuncToEpg"}, "")
1✔
1000
                cont.apicConn.AddSubscriptionClass("l3extOut",
1✔
1001
                        []string{"l3extInstP", "l3extSubnet", "fvRsCons", "fvRsProv", "l3extRsEctx", "l3extRsL3DomAtt", "l3extLNodeP", "l3extRsNodeL3OutAtt", "ipRouteP", "ipNexthopP", "l3extLIfP", "l3extVirtualLIfP", "l3extRsDynPathAtt",
1✔
1002
                                "l3extRsPathL3OutAtt", "l3extMember", "l3extIp", "bgpExtP", "bgpPeerP", "bgpAsP", "bgpLocalAsnP", "bgpRsPeerPfxPol"}, "")
1✔
1003
                cont.apicConn.AddSubscriptionClass("bgpPeerPfxPol",
1✔
1004
                        []string{"bgpPeerPfxPol"}, "")
1✔
1005
        }
1✔
1006
        if !cont.config.ChainedMode {
2✔
1007
                // When a new class is added for subscriptio, check if its name attribute
1✔
1008
                // is in the format aciPrefix-<some value>, if so add it in nameAttrClass
1✔
1009
                // in apicapi.go
1✔
1010
                subscribeMo := []string{"fvBD", "vnsLDevVip", "vnsAbsGraph", "vnsLDevCtx",
1✔
1011
                        "vzFilter", "vzBrCP", "l3extInstP", "vnsSvcRedirectPol",
1✔
1012
                        "vnsRedirectHealthGroup", "fvIPSLAMonitoringPol"}
1✔
1013
                if cont.config.AciPolicyTenant == cont.config.AciVrfTenant {
1✔
UNCOV
1014
                        subscribeMo = append(subscribeMo, "hostprotPol")
×
UNCOV
1015
                }
×
1016
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciVrfTenant,
1✔
1017
                        subscribeMo)
1✔
1018
                cont.apicConn.AddSubscriptionDn(fmt.Sprintf("uni/tn-%s/out-%s",
1✔
1019
                        cont.config.AciVrfTenant, cont.config.AciL3Out),
1✔
1020
                        []string{"fvRsCons"})
1✔
1021
                vmmDn := fmt.Sprintf("comp/prov-%s/ctrlr-[%s]-%s/injcont",
1✔
1022
                        cont.env.VmmPolicy(), cont.config.AciVmmDomain,
1✔
1023
                        cont.config.AciVmmController)
1✔
1024
                // Before subscribing to vmm objects, add vmmInjectedLabel as a child after explicit APIC version check
1✔
1025
                // Since it is not supported for APIC versions < "5.0"
1✔
1026
                cont.addVmmInjectedLabel()
1✔
1027
                cont.apicConn.AddSubscriptionDn(vmmDn,
1✔
1028
                        []string{"vmmInjectedHost", "vmmInjectedNs"})
1✔
1029

1✔
1030
                var tnTargetFilter string
1✔
1031
                if len(cont.config.AciVrfRelatedTenants) > 0 {
1✔
UNCOV
1032
                        for _, tn := range cont.config.AciVrfRelatedTenants {
×
UNCOV
1033
                                tnTargetFilter += fmt.Sprintf("tn-%s/|", tn)
×
UNCOV
1034
                        }
×
1035
                } else {
1✔
1036
                        tnTargetFilter += fmt.Sprintf("tn-%s/|tn-%s/",
1✔
1037
                                cont.config.AciPolicyTenant, cont.config.AciVrfTenant)
1✔
1038
                }
1✔
1039
                subnetTargetFilter := fmt.Sprintf("and(wcard(fvSubnet.dn,\"%s\"))",
1✔
1040
                        tnTargetFilter)
1✔
1041
                cont.apicConn.AddSubscriptionClass("fvSubnet",
1✔
1042
                        []string{"fvSubnet"}, subnetTargetFilter)
1✔
1043

1✔
1044
                cont.apicConn.SetSubscriptionHooks("fvSubnet",
1✔
1045
                        func(obj apicapi.ApicObject) bool {
1✔
1046
                                cont.SubnetChanged(obj, cont.config.AciVrfDn)
×
1047
                                return true
×
1048
                        },
×
UNCOV
1049
                        func(dn string) {
×
UNCOV
1050
                                cont.SubnetDeleted(dn)
×
UNCOV
1051
                        })
×
1052

1053
                cont.apicConn.AddSubscriptionClass("opflexODev",
1✔
1054
                        []string{"opflexODev"}, "")
1✔
1055

1✔
1056
                cont.apicConn.SetSubscriptionHooks("opflexODev",
1✔
1057
                        func(obj apicapi.ApicObject) bool {
1✔
UNCOV
1058
                                cont.opflexDeviceChanged(obj)
×
UNCOV
1059
                                return true
×
UNCOV
1060
                        },
×
1061
                        func(dn string) {
×
1062
                                cont.opflexDeviceDeleted(dn)
×
1063
                        })
×
1064

1065
                if !cont.config.DisableServiceVlanPreprovisioning && !strings.Contains(cont.config.Flavor, "openstack") {
2✔
1066
                        if cont.config.AEP == "" {
2✔
1067
                                cont.log.Error("AEP is missing in configuration, preprovisioning of service vlan will be disabled")
1✔
1068
                        } else {
1✔
UNCOV
1069
                                infraRtAttEntPFilter := fmt.Sprintf("and(wcard(infraRtAttEntP.dn,\"/attentp-%s/\"))", cont.config.AEP)
×
UNCOV
1070
                                cont.apicConn.AddSubscriptionClass("infraRtAttEntP",
×
UNCOV
1071
                                        []string{"infraRtAttEntP"}, infraRtAttEntPFilter)
×
1072

×
1073
                                // For bare metal, the infraRtAttEntP associated with an AEP will be empty.
×
1074
                                // We should not receive any updates for such cases.
×
1075
                                cont.apicConn.SetSubscriptionHooks("infraRtAttEntP",
×
1076
                                        func(obj apicapi.ApicObject) bool {
×
1077
                                                cont.infraRtAttEntPChanged(obj)
×
1078
                                                return true
×
1079
                                        },
×
1080
                                        func(dn string) {
×
1081
                                                cont.infraRtAttEntPDeleted(dn)
×
1082
                                        })
×
1083

1084
                                cont.apicConn.AddSubscriptionClass("vpcIf",
×
1085
                                        []string{"vpcIf"}, "")
×
UNCOV
1086

×
1087
                                cont.apicConn.SetSubscriptionHooks("vpcIf",
×
1088
                                        func(obj apicapi.ApicObject) bool {
×
1089
                                                cont.vpcIfChanged(obj)
×
1090
                                                return true
×
1091
                                        },
×
1092
                                        func(dn string) {
×
1093
                                                cont.vpcIfDeleted(dn)
×
1094
                                        })
×
1095
                        }
1096
                }
1097

1098
                cont.apicConn.VersionUpdateHook =
1✔
1099
                        func() {
1✔
UNCOV
1100
                                cont.initStaticServiceObjs()
×
UNCOV
1101
                        }
×
1102
        }
1103
        go cont.apicConn.Run(stopCh)
1✔
1104
}
1105

1106
func (cont *AciController) syncNodeAciPods(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1107
        cont.log.Debug("Go routine to periodically check the aci pod information in the opflexOdev of a node")
1✔
1108
        ticker := time.NewTicker(seconds * time.Second)
1✔
1109
        defer ticker.Stop()
1✔
1110

1✔
1111
        for {
2✔
1112
                select {
1✔
1113
                case <-ticker.C:
1✔
1114
                        if cont.config.EnableOpflexAgentReconnect {
1✔
UNCOV
1115
                                cont.checkChangeOfOpflexOdevAciPod()
×
UNCOV
1116
                        }
×
1117
                        if cont.config.AciMultipod {
1✔
1118
                                cont.checkChangeOfOdevAciPod()
×
1119
                        }
×
1120
                case <-stopCh:
1✔
1121
                        return
1✔
1122
                }
1123
        }
1124
}
1125

1126
func (cont *AciController) syncOpflexDevices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1127
        cont.log.Debug("Go routine to periodically delete old opflexdevices started")
1✔
1128
        ticker := time.NewTicker(seconds * time.Second)
1✔
1129
        defer ticker.Stop()
1✔
1130

1✔
1131
        for {
2✔
1132
                select {
1✔
1133
                case <-ticker.C:
1✔
1134
                        cont.deleteOldOpflexDevices()
1✔
1135
                case <-stopCh:
1✔
1136
                        return
1✔
1137
                }
1138
        }
1139
}
1140

1141
func (cont *AciController) syncDelayedEpSlices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1142
        cont.log.Debug("Go routine to periodically check and process the epslices having delay adding in service")
1✔
1143
        ticker := time.NewTicker(seconds * time.Second)
1✔
1144
        defer ticker.Stop()
1✔
1145

1✔
1146
        for {
2✔
1147
                select {
1✔
1148
                case <-ticker.C:
1✔
1149
                        cont.processDelayedEpSlices()
1✔
1150
                case <-stopCh:
1✔
1151
                        return
1✔
1152
                }
1153
        }
1154
}
1155

1156
func (cont *AciController) snatGlobalInfoSync(stopCh <-chan struct{}, seconds int) {
1✔
1157
        time.Sleep(time.Duration(seconds) * time.Second)
1✔
1158
        cont.log.Debug("Go routine to periodically sync globalinfo and nodeinfo started")
1✔
1159
        iteration := 0
1✔
1160
        for {
2✔
1161
                // To avoid noisy logs, only printing once in 5 minutes
1✔
1162
                if iteration%5 == 0 {
2✔
1163
                        cont.log.Debug("Syncing GlobalInfo with Node infos")
1✔
1164
                }
1✔
1165
                var nodeInfos []*nodeinfo.NodeInfo
1✔
1166
                cont.indexMutex.Lock()
1✔
1167
                cache.ListAll(cont.snatNodeInfoIndexer, labels.Everything(),
1✔
1168
                        func(nodeInfoObj interface{}) {
2✔
1169
                                nodeInfo := nodeInfoObj.(*nodeinfo.NodeInfo)
1✔
1170
                                nodeInfos = append(nodeInfos, nodeInfo)
1✔
1171
                        })
1✔
1172
                expectedmap := make(map[string]map[string]bool)
1✔
1173
                for _, glinfo := range cont.snatGlobalInfoCache {
2✔
1174
                        for nodename, entry := range glinfo {
2✔
1175
                                if _, found := expectedmap[nodename]; !found {
2✔
1176
                                        newentry := make(map[string]bool)
1✔
1177
                                        newentry[entry.SnatPolicyName] = true
1✔
1178
                                        expectedmap[nodename] = newentry
1✔
1179
                                } else {
2✔
1180
                                        currententry := expectedmap[nodename]
1✔
1181
                                        currententry[entry.SnatPolicyName] = true
1✔
1182
                                        expectedmap[nodename] = currententry
1✔
1183
                                }
1✔
1184
                        }
1185
                }
1186
                cont.indexMutex.Unlock()
1✔
1187

1✔
1188
                for _, value := range nodeInfos {
2✔
1189
                        marked := false
1✔
1190
                        policyNames := value.Spec.SnatPolicyNames
1✔
1191
                        nodeName := value.ObjectMeta.Name
1✔
1192
                        _, ok := expectedmap[nodeName]
1✔
1193
                        if !ok && len(policyNames) > 0 {
2✔
1194
                                cont.log.Info("Adding missing entry in snatglobalinfo for node: ", nodeName)
1✔
1195
                                cont.log.Debug("No snat policies found in snatglobalinfo")
1✔
1196
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1197
                                marked = true
1✔
1198
                        } else if len(policyNames) != len(expectedmap[nodeName]) {
3✔
1199
                                cont.log.Info("Adding missing snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1200
                                cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1201
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1202
                                marked = true
1✔
1203
                        } else {
2✔
1204
                                if len(policyNames) == 0 && len(expectedmap[nodeName]) == 0 {
1✔
UNCOV
1205
                                        // No snatpolicies present
×
UNCOV
1206
                                        continue
×
1207
                                }
1208
                                eq := reflect.DeepEqual(expectedmap[nodeName], policyNames)
1✔
1209
                                if !eq {
2✔
1210
                                        cont.log.Debug("Syncing inconsistent snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1211
                                        cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1212
                                        cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1213
                                        marked = true
1✔
1214
                                }
1✔
1215
                        }
1216
                        if marked {
2✔
1217
                                cont.log.Info("Nodeinfo and globalinfo out of sync for node: ", nodeName)
1✔
1218
                                nodeinfokey, err := cache.MetaNamespaceKeyFunc(value)
1✔
1219
                                if err != nil {
1✔
UNCOV
1220
                                        cont.log.Error("Not able to get key for node: ", nodeName)
×
UNCOV
1221
                                        continue
×
1222
                                }
1223
                                cont.log.Info("Queuing nodeinfokey for globalinfo sync: ", nodeinfokey)
1✔
1224
                                cont.queueNodeInfoUpdateByKey(nodeinfokey)
1✔
1225
                        } else if iteration%5 == 0 {
2✔
1226
                                cont.log.Info("Nodeinfo and globalinfo in sync for node: ", nodeName)
1✔
1227
                        }
1✔
1228
                }
1229
                time.Sleep(time.Duration(seconds) * time.Second)
1✔
1230
                iteration++
1✔
1231
        }
1232
}
1233

1234
func (cont *AciController) processSyncQueue(queue workqueue.RateLimitingInterface,
1235
        queueStop <-chan struct{}) {
1✔
1236
        go wait.Until(func() {
2✔
1237
                for {
2✔
1238
                        syncType, quit := queue.Get()
1✔
1239
                        if quit {
2✔
1240
                                break
1✔
1241
                        }
1242
                        var requeue bool
1✔
1243
                        if sType, ok := syncType.(string); ok {
2✔
1244
                                if f, ok := cont.syncProcessors[sType]; ok {
2✔
1245
                                        requeue = f()
1✔
1246
                                }
1✔
1247
                        }
1248
                        if requeue {
1✔
UNCOV
1249
                                queue.AddRateLimited(syncType)
×
1250
                        } else {
1✔
1251
                                queue.Forget(syncType)
1✔
1252
                        }
1✔
1253
                        queue.Done(syncType)
1✔
1254
                }
1255
        }, time.Second, queueStop)
1256
        <-queueStop
1✔
1257
        queue.ShutDown()
1✔
1258
}
1259

1260
func (cont *AciController) scheduleSyncGlobalInfo() {
1✔
1261
        cont.syncQueue.AddRateLimited("snatGlobalInfo")
1✔
1262
}
1✔
UNCOV
1263
func (cont *AciController) scheduleRdConfig() {
×
UNCOV
1264
        cont.syncQueue.AddRateLimited("rdConfig")
×
UNCOV
1265
}
×
1266
func (cont *AciController) scheduleCreateIstioCR() {
×
1267
        cont.syncQueue.AddRateLimited("istioCR")
×
1268
}
×
1269

1270
func (cont *AciController) addVmmInjectedLabel() {
1✔
1271
        if apicapi.ApicVersion >= "5.2" {
1✔
UNCOV
1272
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedLabel")
×
UNCOV
1273
                if err != nil {
×
UNCOV
1274
                        panic(err.Error())
×
1275
                }
1276
                err = apicapi.AddMetaDataChild("vmmInjectedSvc", "vmmInjectedLabel")
×
1277
                if err != nil {
×
UNCOV
1278
                        panic(err.Error())
×
1279
                }
1280
        }
1281
        if apicapi.ApicVersion >= "5.0" {
2✔
1282
                err := apicapi.AddMetaDataChild("vmmInjectedReplSet", "vmmInjectedLabel")
1✔
1283
                if err != nil {
1✔
UNCOV
1284
                        panic(err.Error())
×
1285
                }
1286
                err = apicapi.AddMetaDataChild("vmmInjectedContGrp", "vmmInjectedLabel")
1✔
1287
                if err != nil {
1✔
UNCOV
1288
                        panic(err.Error())
×
1289
                }
1290
                err = apicapi.AddMetaDataChild("vmmInjectedDepl", "vmmInjectedLabel")
1✔
1291
                if err != nil {
1✔
UNCOV
1292
                        panic(err.Error())
×
1293
                }
1294
        }
1295
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc