• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 11713

16 Mar 2026 10:04AM UTC coverage: 62.537% (-0.4%) from 62.911%
11713

Pull #1687

travis-pro

web-flow
Merge 7a3798e9a into 517f6a009
Pull Request #1687: Add infra querier subnet handling for multipod migration

5 of 122 new or added lines in 4 files covered. (4.1%)

11 existing lines in 2 files now uncovered.

13491 of 21573 relevant lines covered (62.54%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.48
/pkg/controller/controller.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "encoding/json"
19
        "fmt"
20
        "net"
21
        "os"
22
        "reflect"
23
        "strconv"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/sirupsen/logrus"
29
        "github.com/yl2chen/cidranger"
30
        "golang.org/x/time/rate"
31

32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        v1net "k8s.io/api/networking/v1"
35
        "k8s.io/apimachinery/pkg/labels"
36
        "k8s.io/apimachinery/pkg/util/wait"
37
        "k8s.io/client-go/kubernetes"
38
        "k8s.io/client-go/tools/cache"
39
        "k8s.io/client-go/util/workqueue"
40

41
        "github.com/noironetworks/aci-containers/pkg/apicapi"
42
        fabattv1 "github.com/noironetworks/aci-containers/pkg/fabricattachment/apis/aci.fabricattachment/v1"
43
        fabattclset "github.com/noironetworks/aci-containers/pkg/fabricattachment/clientset/versioned"
44
        hppv1 "github.com/noironetworks/aci-containers/pkg/hpp/apis/aci.hpp/v1"
45
        "github.com/noironetworks/aci-containers/pkg/index"
46
        "github.com/noironetworks/aci-containers/pkg/ipam"
47
        istiov1 "github.com/noironetworks/aci-containers/pkg/istiocrd/apis/aci.istio/v1"
48
        "github.com/noironetworks/aci-containers/pkg/metadata"
49
        nodeinfo "github.com/noironetworks/aci-containers/pkg/nodeinfo/apis/aci.snat/v1"
50
        rdConfig "github.com/noironetworks/aci-containers/pkg/rdconfig/apis/aci.snat/v1"
51
        snatglobalinfo "github.com/noironetworks/aci-containers/pkg/snatglobalinfo/apis/aci.snat/v1"
52
        "github.com/noironetworks/aci-containers/pkg/util"
53
)
54

55
type podUpdateFunc func(*v1.Pod) (*v1.Pod, error)
56
type nodeUpdateFunc func(*v1.Node) (*v1.Node, error)
57
type serviceUpdateFunc func(*v1.Service) (*v1.Service, error)
58
type listNetworkPoliciesFunc func(string) (*v1net.NetworkPolicyList, error)
59
type listNamespacesFunc func() (*v1.NamespaceList, error)
60

61
type AciController struct {
62
        log    *logrus.Logger
63
        config *ControllerConfig
64
        env    Environment
65

66
        defaultEg string
67
        defaultSg string
68

69
        unitTestMode bool
70

71
        podQueue               workqueue.RateLimitingInterface
72
        netPolQueue            workqueue.RateLimitingInterface
73
        qosQueue               workqueue.RateLimitingInterface
74
        serviceQueue           workqueue.RateLimitingInterface
75
        snatQueue              workqueue.RateLimitingInterface
76
        netflowQueue           workqueue.RateLimitingInterface
77
        erspanQueue            workqueue.RateLimitingInterface
78
        snatNodeInfoQueue      workqueue.RateLimitingInterface
79
        rdConfigQueue          workqueue.RateLimitingInterface
80
        istioQueue             workqueue.RateLimitingInterface
81
        nodeFabNetAttQueue     workqueue.RateLimitingInterface
82
        netFabConfigQueue      workqueue.RateLimitingInterface
83
        nadVlanMapQueue        workqueue.RateLimitingInterface
84
        fabricVlanPoolQueue    workqueue.RateLimitingInterface
85
        netFabL3ConfigQueue    workqueue.RateLimitingInterface
86
        remIpContQueue         workqueue.RateLimitingInterface
87
        epgDnCacheUpdateQueue  workqueue.RateLimitingInterface
88
        aaepMonitorConfigQueue workqueue.RateLimitingInterface
89

90
        namespaceIndexer                     cache.Indexer
91
        namespaceInformer                    cache.Controller
92
        podIndexer                           cache.Indexer
93
        podInformer                          cache.Controller
94
        endpointsIndexer                     cache.Indexer
95
        endpointsInformer                    cache.Controller
96
        serviceIndexer                       cache.Indexer
97
        serviceInformer                      cache.Controller
98
        replicaSetIndexer                    cache.Indexer
99
        replicaSetInformer                   cache.Controller
100
        deploymentIndexer                    cache.Indexer
101
        deploymentInformer                   cache.Controller
102
        nodeIndexer                          cache.Indexer
103
        nodeInformer                         cache.Controller
104
        networkPolicyIndexer                 cache.Indexer
105
        networkPolicyInformer                cache.Controller
106
        snatIndexer                          cache.Indexer
107
        snatInformer                         cache.Controller
108
        snatNodeInfoIndexer                  cache.Indexer
109
        snatNodeInformer                     cache.Controller
110
        snatLocalInfoInformer                cache.Controller
111
        crdInformer                          cache.Controller
112
        rdConfigInformer                     cache.Controller
113
        rdConfigIndexer                      cache.Indexer
114
        qosIndexer                           cache.Indexer
115
        qosInformer                          cache.Controller
116
        netflowIndexer                       cache.Indexer
117
        netflowInformer                      cache.Controller
118
        erspanIndexer                        cache.Indexer
119
        erspanInformer                       cache.Controller
120
        nodePodIfIndexer                     cache.Indexer
121
        nodePodIfInformer                    cache.Controller
122
        istioIndexer                         cache.Indexer
123
        istioInformer                        cache.Controller
124
        endpointSliceIndexer                 cache.Indexer
125
        endpointSliceInformer                cache.Controller
126
        snatCfgInformer                      cache.Controller
127
        updatePod                            podUpdateFunc
128
        updateNode                           nodeUpdateFunc
129
        updateServiceStatus                  serviceUpdateFunc
130
        listNetworkPolicies                  listNetworkPoliciesFunc
131
        listNamespaces                       listNamespacesFunc
132
        nodeFabNetAttInformer                cache.SharedIndexInformer
133
        netFabConfigInformer                 cache.SharedIndexInformer
134
        nadVlanMapInformer                   cache.SharedIndexInformer
135
        fabricVlanPoolInformer               cache.SharedIndexInformer
136
        networkFabricL3ConfigurationInformer cache.SharedIndexInformer
137
        fabNetAttClient                      *fabattclset.Clientset
138
        proactiveConfInformer                cache.SharedIndexInformer
139
        aaepMonitorInformer                  cache.SharedIndexInformer
140
        poster                               *EventPoster
141

142
        indexMutex sync.Mutex
143
        hppMutex   sync.Mutex
144

145
        configuredPodNetworkIps *netIps
146
        podNetworkIps           *netIps
147
        serviceIps              *ipam.IpCache
148
        staticServiceIps        *netIps
149
        nodeServiceIps          *netIps
150

151
        // index of pods matched by deployments
152
        depPods *index.PodSelectorIndex
153
        // index of pods matched by network policies
154
        netPolPods *index.PodSelectorIndex
155
        // index of pods matched by network policy ingress rules
156
        netPolIngressPods *index.PodSelectorIndex
157
        // index of pods matched by network policy egress rules
158
        netPolEgressPods *index.PodSelectorIndex
159
        // index of IP addresses contained in endpoints objects
160
        endpointsIpIndex cidranger.Ranger
161
        // index of service target ports
162
        targetPortIndex map[string]*portIndexEntry
163
        // index of services with named target ports
164
        namedPortServiceIndex map[string]*namedPortServiceIndexEntry
165
        // index of ip blocks referenced by network policy egress rules
166
        netPolSubnetIndex cidranger.Ranger
167
        // index of pods matched by erspan policies
168
        erspanPolPods *index.PodSelectorIndex
169

170
        apicConn *apicapi.ApicConnection
171

172
        nodeServiceMetaCache map[string]*nodeServiceMeta
173
        nodeACIPod           map[string]aciPodAnnot
174
        nodeACIPodAnnot      map[string]aciPodAnnot
175
        infraQuerierSubnet   string
176
        nodeOpflexDevice     map[string]apicapi.ApicSlice
177
        nodePodNetCache      map[string]*nodePodNetMeta
178
        serviceMetaCache     map[string]*serviceMeta
179
        snatPolicyCache      map[string]*ContSnatPolicy
180
        delayedEpSlices      []*DelayedEpSlice
181
        snatServices         map[string]bool
182
        snatNodeInfoCache    map[string]*nodeinfo.NodeInfo
183
        rdConfigCache        map[string]*rdConfig.RdConfig
184
        rdConfigSubnetCache  map[string]*rdConfig.RdConfigSpec
185
        istioCache           map[string]*istiov1.AciIstioOperator
186
        podIftoEp            map[string]*EndPointData
187
        // Node Name and Policy Name
188
        snatGlobalInfoCache map[string]map[string]*snatglobalinfo.GlobalInfo
189
        nodeSyncEnabled     bool
190
        serviceSyncEnabled  bool
191
        snatSyncEnabled     bool
192
        syncQueue           workqueue.RateLimitingInterface
193
        syncProcessors      map[string]func() bool
194
        serviceEndPoints    ServiceEndPointType
195
        crdHandlers         map[string]func(*AciController, <-chan struct{})
196
        stopCh              <-chan struct{}
197
        //index of containerportname to ctrPortNameEntry
198
        ctrPortNameCache map[string]*ctrPortNameEntry
199
        // named networkPolicies
200
        nmPortNp map[string]bool
201
        //maps network policy hash to hpp
202
        hppRef map[string]hppReference
203
        //map for ns to remoteIpConts
204
        nsRemoteIpCont map[string]remoteIpConts
205
        // cache to look for Epg DNs which are bound to Vmm domain
206
        cachedEpgDns             []string
207
        vmmClusterFaultSupported bool
208
        additionalNetworkCache   map[string]*AdditionalNetworkMeta
209
        //Used in Shared mode
210
        sharedEncapCache       map[int]*sharedEncapData
211
        sharedEncapAepCache    map[string]map[int]bool
212
        sharedEncapSviCache    map[int]*NfL3Data
213
        sharedEncapVrfCache    map[string]*NfVrfData
214
        sharedEncapTenantCache map[string]*NfTenantData
215
        nfl3configGenerationId int64
216
        // vlan to propertiesList
217
        sharedEncapNfcCache         map[int]*NfcData
218
        sharedEncapNfcVlanMap       map[int]*NfcData
219
        sharedEncapNfcLabelMap      map[string]*NfcData
220
        sharedEncapNfcAppProfileMap map[string]bool
221
        // nadVlanMap encapLabel to vlan
222
        sharedEncapLabelMap      map[string][]int
223
        lldpIfCache              map[string]*NfLLDPIfData
224
        globalVlanConfig         globalVlanConfig
225
        fabricVlanPoolMap        map[string]map[string]string
226
        openStackFabricPathDnMap map[string]openstackOpflexOdevInfo
227
        hostFabricPathDnMap      map[string]hostFabricInfo
228
        openStackSystemId        string
229
        sharedAaepMonitor        map[string]map[string]*AaepEpgAttachData
230
}
231

232
type hostFabricInfo struct {
233
        fabricPathDn string
234
        host         string
235
        vpcIfDn      map[string]struct{}
236
}
237

238
type NfLLDPIfData struct {
239
        LLDPIf string
240
        // As of now, manage at the NAD level
241
        // more granular introduces intf tracking complexities
242
        // for not sufficient benefits
243
        Refs map[string]bool
244
}
245

246
type NfL3OutData struct {
247
        // +kubebuilder:validation:Enum:"import"
248
        RtCtrl     string
249
        PodId      int
250
        RtrNodeMap map[int]*fabattv1.FabricL3OutRtrNode
251
        ExtEpgMap  map[string]*fabattv1.PolicyPrefixGroup
252
        SviMap     map[int]bool
253
}
254

255
type NfTenantData struct {
256
        CommonTenant     bool
257
        L3OutConfig      map[string]*NfL3OutData
258
        BGPPeerPfxConfig map[string]*fabattv1.BGPPeerPrefixPolicy
259
}
260

261
type NfVrfData struct {
262
        TenantConfig map[string]*NfTenantData
263
}
264

265
type NfL3Networks struct {
266
        fabattv1.PrimaryNetwork
267
        Subnets map[string]*fabattv1.FabricL3Subnet
268
}
269

270
type NfL3Data struct {
271
        Tenant      string
272
        Vrf         fabattv1.VRF
273
        PodId       int
274
        ConnectedNw *NfL3Networks
275
        NetAddr     map[string]*RoutedNetworkData
276
        Nodes       map[int]fabattv1.FabricL3OutNode
277
}
278

279
// maps pod name to remoteIpCont
280
type remoteIpConts map[string]remoteIpCont
281

282
// remoteIpCont maps ip to pod labels
283
type remoteIpCont map[string]map[string]string
284

285
type NfcData struct {
286
        Aeps map[string]bool
287
        Epg  fabattv1.Epg
288
}
289

290
type sharedEncapData struct {
291
        //node to NAD to pods
292
        Pods   map[string]map[string][]string
293
        NetRef map[string]*AdditionalNetworkMeta
294
        Aeps   map[string]bool
295
}
296

297
type globalVlanConfig struct {
298
        SharedPhysDom apicapi.ApicObject
299
        SharedL3Dom   apicapi.ApicObject
300
}
301

302
type hppReference struct {
303
        RefCount uint              `json:"ref-count,omitempty"`
304
        Npkeys   []string          `json:"npkeys,omitempty"`
305
        HppObj   apicapi.ApicSlice `json:"hpp-obj,omitempty"`
306
        HppCr    hppv1.HostprotPol `json:"hpp-cr,omitempty"`
307
}
308

309
type DelayedEpSlice struct {
310
        ServiceKey  string
311
        OldEpSlice  *discovery.EndpointSlice
312
        NewEpSlice  *discovery.EndpointSlice
313
        DelayedTime time.Time
314
}
315

316
type aciPodAnnot struct {
317
        aciPod         string
318
        disconnectTime time.Time
319
        lastErrorTime  time.Time
320
}
321

322
type nodeServiceMeta struct {
323
        serviceEp metadata.ServiceEndpoint
324
}
325

326
type nodePodNetMeta struct {
327
        nodePods            map[string]bool
328
        podNetIps           metadata.NetIps
329
        podNetIpsAnnotation string
330
}
331

332
type openstackOpflexOdevInfo struct {
333
        opflexODevDn map[string]struct{}
334
        fabricPathDn string
335
}
336

337
type serviceMeta struct {
338
        requestedIps     []net.IP
339
        ingressIps       []net.IP
340
        staticIngressIps []net.IP
341
}
342

343
type ipIndexEntry struct {
344
        ipNet net.IPNet
345
        keys  map[string]bool
346
}
347

348
type targetPort struct {
349
        proto v1.Protocol
350
        ports map[int]bool
351
}
352

353
type portIndexEntry struct {
354
        port              targetPort
355
        serviceKeys       map[string]bool
356
        networkPolicyKeys map[string]bool
357
}
358

359
type namedPortServiceIndexPort struct {
360
        targetPortName string
361
        resolvedPorts  map[int]bool
362
}
363

364
type namedPortServiceIndexEntry map[string]*namedPortServiceIndexPort
365

366
type portRangeSnat struct {
367
        start int
368
        end   int
369
}
370

371
// EndPointData holds PodIF data in controller.
372
type EndPointData struct {
373
        MacAddr    string
374
        EPG        string
375
        Namespace  string
376
        AppProfile string
377
}
378

379
type ctrPortNameEntry struct {
380
        // Proto+port->pods
381
        ctrNmpToPods map[string]map[string]bool
382
}
383

384
type LinkData struct {
385
        Link []string
386
        Pods []string
387
}
388

389
type RoutedNodeData struct {
390
        addr string
391
        idx  int
392
}
393

394
type RoutedNetworkData struct {
395
        subnet       string
396
        netAddress   string
397
        maskLen      int
398
        numAllocated int
399
        maxAddresses int
400
        baseAddress  net.IP
401
        nodeMap      map[string]RoutedNodeData
402
        availableMap map[int]bool
403
}
404

405
type AdditionalNetworkMeta struct {
406
        NetworkName string
407
        EncapVlan   string
408
        //node+localiface->fabricLinks
409
        FabricLink map[string]map[string]LinkData
410
        NodeCache  map[string]*fabattv1.NodeFabricNetworkAttachment
411
        Mode       util.EncapMode
412
}
413

414
type ServiceEndPointType interface {
415
        InitClientInformer(kubeClient *kubernetes.Clientset)
416
        Run(stopCh <-chan struct{})
417
        Wait(stopCh <-chan struct{})
418
        UpdateServicesForNode(nodename string)
419
        GetnodesMetadata(key string, service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint)
420
        SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool
421
        SetNpServiceAugmentForService(servicekey string, service *v1.Service, prs *portRemoteSubnet,
422
                portAugments map[string]*portServiceAugment, subnetIndex cidranger.Ranger, logger *logrus.Entry)
423
}
424

425
type serviceEndpoint struct {
426
        cont *AciController
427
}
428
type serviceEndpointSlice struct {
429
        cont *AciController
430
}
431

432
type AaepEpgAttachData struct {
433
        encapVlan     int
434
        nadName       string
435
        namespaceName string
436
        nadCreated    bool
437
}
438

439
type EpgVlanMap struct {
440
        epgDn     string
441
        encapVlan int
442
}
443

444
func (sep *serviceEndpoint) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
445
        sep.cont.initEndpointsInformerFromClient(kubeClient)
×
446
}
×
447

448
func (seps *serviceEndpointSlice) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
449
        seps.cont.initEndpointSliceInformerFromClient(kubeClient)
×
450
}
×
451

452
func (sep *serviceEndpoint) Run(stopCh <-chan struct{}) {
1✔
453
        go sep.cont.endpointsInformer.Run(stopCh)
1✔
454
}
1✔
455

456
func (seps *serviceEndpointSlice) Run(stopCh <-chan struct{}) {
1✔
457
        go seps.cont.endpointSliceInformer.Run(stopCh)
1✔
458
}
1✔
459

460
func (sep *serviceEndpoint) Wait(stopCh <-chan struct{}) {
1✔
461
        cache.WaitForCacheSync(stopCh,
1✔
462
                sep.cont.endpointsInformer.HasSynced,
1✔
463
                sep.cont.serviceInformer.HasSynced)
1✔
464
}
1✔
465

466
func (seps *serviceEndpointSlice) Wait(stopCh <-chan struct{}) {
1✔
467
        seps.cont.log.Debug("Waiting for EndPointSlicecache sync")
1✔
468
        cache.WaitForCacheSync(stopCh,
1✔
469
                seps.cont.endpointSliceInformer.HasSynced,
1✔
470
                seps.cont.serviceInformer.HasSynced)
1✔
471
}
1✔
472

473
func (e *ipIndexEntry) Network() net.IPNet {
1✔
474
        return e.ipNet
1✔
475
}
1✔
476

477
func newNodePodNetMeta() *nodePodNetMeta {
1✔
478
        return &nodePodNetMeta{
1✔
479
                nodePods: make(map[string]bool),
1✔
480
        }
1✔
481
}
1✔
482

483
func createQueue(name string) workqueue.RateLimitingInterface {
1✔
484
        return workqueue.NewNamedRateLimitingQueue(
1✔
485
                workqueue.NewMaxOfRateLimiter(
1✔
486
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
487
                                10*time.Second),
1✔
488
                        &workqueue.BucketRateLimiter{
1✔
489
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
490
                        },
1✔
491
                ),
1✔
492
                "delta")
1✔
493
}
1✔
494

495
func NewController(config *ControllerConfig, env Environment, log *logrus.Logger, unittestmode bool) *AciController {
1✔
496
        cont := &AciController{
1✔
497
                log:          log,
1✔
498
                config:       config,
1✔
499
                env:          env,
1✔
500
                defaultEg:    "",
1✔
501
                defaultSg:    "",
1✔
502
                unitTestMode: unittestmode,
1✔
503

1✔
504
                podQueue:               createQueue("pod"),
1✔
505
                netPolQueue:            createQueue("networkPolicy"),
1✔
506
                qosQueue:               createQueue("qos"),
1✔
507
                netflowQueue:           createQueue("netflow"),
1✔
508
                erspanQueue:            createQueue("erspan"),
1✔
509
                serviceQueue:           createQueue("service"),
1✔
510
                snatQueue:              createQueue("snat"),
1✔
511
                snatNodeInfoQueue:      createQueue("snatnodeinfo"),
1✔
512
                rdConfigQueue:          createQueue("rdconfig"),
1✔
513
                istioQueue:             createQueue("istio"),
1✔
514
                nodeFabNetAttQueue:     createQueue("nodefabricnetworkattachment"),
1✔
515
                netFabConfigQueue:      createQueue("networkfabricconfiguration"),
1✔
516
                nadVlanMapQueue:        createQueue("nadvlanmap"),
1✔
517
                fabricVlanPoolQueue:    createQueue("fabricvlanpool"),
1✔
518
                netFabL3ConfigQueue:    createQueue("networkfabricl3configuration"),
1✔
519
                remIpContQueue:         createQueue("remoteIpContainer"),
1✔
520
                epgDnCacheUpdateQueue:  createQueue("epgDnCache"),
1✔
521
                aaepMonitorConfigQueue: createQueue("aaepepgmap"),
1✔
522
                syncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
523
                        &workqueue.BucketRateLimiter{
1✔
524
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
525
                        }, "sync"),
1✔
526

1✔
527
                configuredPodNetworkIps: newNetIps(),
1✔
528
                podNetworkIps:           newNetIps(),
1✔
529
                serviceIps:              ipam.NewIpCache(),
1✔
530
                staticServiceIps:        newNetIps(),
1✔
531
                nodeServiceIps:          newNetIps(),
1✔
532

1✔
533
                nodeACIPod:       make(map[string]aciPodAnnot),
1✔
534
                nodeACIPodAnnot:  make(map[string]aciPodAnnot),
1✔
535
                nodeOpflexDevice: make(map[string]apicapi.ApicSlice),
1✔
536

1✔
537
                nodeServiceMetaCache:        make(map[string]*nodeServiceMeta),
1✔
538
                nodePodNetCache:             make(map[string]*nodePodNetMeta),
1✔
539
                serviceMetaCache:            make(map[string]*serviceMeta),
1✔
540
                snatPolicyCache:             make(map[string]*ContSnatPolicy),
1✔
541
                snatServices:                make(map[string]bool),
1✔
542
                snatNodeInfoCache:           make(map[string]*nodeinfo.NodeInfo),
1✔
543
                rdConfigCache:               make(map[string]*rdConfig.RdConfig),
1✔
544
                rdConfigSubnetCache:         make(map[string]*rdConfig.RdConfigSpec),
1✔
545
                podIftoEp:                   make(map[string]*EndPointData),
1✔
546
                snatGlobalInfoCache:         make(map[string]map[string]*snatglobalinfo.GlobalInfo),
1✔
547
                istioCache:                  make(map[string]*istiov1.AciIstioOperator),
1✔
548
                crdHandlers:                 make(map[string]func(*AciController, <-chan struct{})),
1✔
549
                ctrPortNameCache:            make(map[string]*ctrPortNameEntry),
1✔
550
                nmPortNp:                    make(map[string]bool),
1✔
551
                hppRef:                      make(map[string]hppReference),
1✔
552
                additionalNetworkCache:      make(map[string]*AdditionalNetworkMeta),
1✔
553
                sharedEncapCache:            make(map[int]*sharedEncapData),
1✔
554
                sharedEncapAepCache:         make(map[string]map[int]bool),
1✔
555
                sharedEncapSviCache:         make(map[int]*NfL3Data),
1✔
556
                sharedEncapVrfCache:         make(map[string]*NfVrfData),
1✔
557
                sharedEncapTenantCache:      make(map[string]*NfTenantData),
1✔
558
                sharedEncapNfcCache:         make(map[int]*NfcData),
1✔
559
                sharedEncapNfcVlanMap:       make(map[int]*NfcData),
1✔
560
                sharedEncapNfcLabelMap:      make(map[string]*NfcData),
1✔
561
                sharedEncapNfcAppProfileMap: make(map[string]bool),
1✔
562
                sharedEncapLabelMap:         make(map[string][]int),
1✔
563
                lldpIfCache:                 make(map[string]*NfLLDPIfData),
1✔
564
                fabricVlanPoolMap:           make(map[string]map[string]string),
1✔
565
                openStackFabricPathDnMap:    make(map[string]openstackOpflexOdevInfo),
1✔
566
                hostFabricPathDnMap:         make(map[string]hostFabricInfo),
1✔
567
                nsRemoteIpCont:              make(map[string]remoteIpConts),
1✔
568
                sharedAaepMonitor:           make(map[string]map[string]*AaepEpgAttachData),
1✔
569
        }
1✔
570
        cont.syncProcessors = map[string]func() bool{
1✔
571
                "snatGlobalInfo": cont.syncSnatGlobalInfo,
1✔
572
                "rdConfig":       cont.syncRdConfig,
1✔
573
                /* Commenting code to remove dependency from istio.io/istio package.
1✔
574
                   Vulnerabilties were detected by quay.io security scan of aci-containers-controller
1✔
575
                   and aci-containers-operator images for istio.io/istio package
1✔
576

1✔
577
                "istioCR":        cont.createIstioCR,
1✔
578
                */
1✔
579
        }
1✔
580
        return cont
1✔
581
}
1✔
582

583
func (cont *AciController) Init() {
×
584
        if cont.config.ChainedMode {
×
585
                cont.log.Info("In chained mode")
×
586
        }
×
587
        if cont.config.VmmLite {
×
588
                cont.log.Info("In VMM lite mode")
×
589
        }
×
590

591
        egdata, err := json.Marshal(cont.config.DefaultEg)
×
592
        if err != nil {
×
593
                cont.log.Error("Could not serialize default endpoint group")
×
594
                panic(err.Error())
×
595
        }
596
        cont.defaultEg = string(egdata)
×
597

×
598
        sgdata, err := json.Marshal(cont.config.DefaultSg)
×
599
        if err != nil {
×
600
                cont.log.Error("Could not serialize default security groups")
×
601
                panic(err.Error())
×
602
        }
603
        cont.defaultSg = string(sgdata)
×
604

×
605
        cont.log.Debug("Initializing IPAM")
×
606
        cont.initIpam()
×
607

×
608
        cont.serviceEndPoints = &serviceEndpointSlice{}
×
609
        cont.serviceEndPoints.(*serviceEndpointSlice).cont = cont
×
610
        cont.log.Info("Initializing ServiceEndpointSlices")
×
611

×
612
        err = cont.env.Init(cont)
×
613
        if err != nil {
×
614
                panic(err.Error())
×
615
        }
616
}
617

618
func (cont *AciController) processQueue(queue workqueue.RateLimitingInterface,
619
        store cache.Store, handler func(interface{}) bool,
620
        deleteHandler func(string) bool,
621
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
622
        go wait.Until(func() {
2✔
623
                for {
2✔
624
                        key, quit := queue.Get()
1✔
625
                        if quit {
2✔
626
                                break
1✔
627
                        }
628

629
                        var requeue bool
1✔
630
                        switch key := key.(type) {
1✔
631
                        case chan struct{}:
×
632
                                close(key)
×
633
                        case string:
1✔
634
                                if strings.HasPrefix(key, "DELETED_") {
2✔
635
                                        delKey := strings.Trim(key, "DELETED_")
1✔
636
                                        requeue = deleteHandler(delKey)
1✔
637
                                } else {
2✔
638
                                        obj, exists, err := store.GetByKey(key)
1✔
639
                                        if err != nil {
1✔
640
                                                cont.log.Debugf("Error fetching object with key %s from store: %v", key, err)
×
641
                                        }
×
642
                                        //Handle Add/Update/Delete
643
                                        if exists && handler != nil {
2✔
644
                                                requeue = handler(obj)
1✔
645
                                        }
1✔
646
                                        //Handle Post Delete
647
                                        if !exists && postDelHandler != nil {
1✔
648
                                                requeue = postDelHandler()
×
649
                                        }
×
650
                                }
651
                        }
652
                        if requeue {
2✔
653
                                queue.AddRateLimited(key)
1✔
654
                        } else {
2✔
655
                                queue.Forget(key)
1✔
656
                        }
1✔
657
                        queue.Done(key)
1✔
658
                }
659
        }, time.Second, stopCh)
660
        <-stopCh
1✔
661
        queue.ShutDown()
1✔
662
}
663

664
func (cont *AciController) processRemIpContQueue(queue workqueue.RateLimitingInterface,
665
        handler func(interface{}) bool,
666
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
667
        go wait.Until(func() {
2✔
668
                for {
2✔
669
                        key, quit := queue.Get()
1✔
670
                        if quit {
2✔
671
                                break
1✔
672
                        }
673

674
                        var requeue bool
1✔
675
                        switch key := key.(type) {
1✔
676
                        case chan struct{}:
×
677
                                close(key)
×
678
                        case string:
1✔
679
                                if handler != nil {
2✔
680
                                        requeue = handler(key)
1✔
681
                                }
1✔
682
                                if postDelHandler != nil {
2✔
683
                                        requeue = postDelHandler()
1✔
684
                                }
1✔
685
                        }
686
                        if requeue {
1✔
687
                                queue.AddRateLimited(key)
×
688
                        } else {
1✔
689
                                queue.Forget(key)
1✔
690
                        }
1✔
691
                        queue.Done(key)
1✔
692

693
                }
694
        }, time.Second, stopCh)
695
        <-stopCh
1✔
696
        queue.ShutDown()
1✔
697
}
698

699
func (cont *AciController) processEpgDnCacheUpdateQueue(queue workqueue.RateLimitingInterface,
700
        handler func(interface{}) bool,
701
        postDelHandler func() bool, stopCh <-chan struct{}) {
1✔
702
        go wait.Until(func() {
2✔
703
                for {
2✔
704
                        key, quit := queue.Get()
1✔
705
                        if quit {
2✔
706
                                break
1✔
707
                        }
708

709
                        var requeue bool
1✔
710
                        switch key := key.(type) {
1✔
711
                        case chan struct{}:
×
712
                                close(key)
×
713
                        case bool:
1✔
714
                                if handler != nil {
2✔
715
                                        requeue = handler(key)
1✔
716
                                }
1✔
717
                                if postDelHandler != nil {
1✔
718
                                        requeue = postDelHandler()
×
719
                                }
×
720
                        }
721
                        if requeue {
1✔
722
                                queue.AddRateLimited(key)
×
723
                        } else {
1✔
724
                                queue.Forget(key)
1✔
725
                        }
1✔
726
                        queue.Done(key)
1✔
727

728
                }
729
        }, time.Second, stopCh)
730
        <-stopCh
1✔
731
        queue.ShutDown()
1✔
732
}
733

734
func (cont *AciController) globalStaticObjs() apicapi.ApicSlice {
1✔
735
        return apicapi.ApicSlice{}
1✔
736
}
1✔
737

738
func (cont *AciController) aciNameForKey(ktype, key string) string {
1✔
739
        return util.AciNameForKey(cont.config.AciPrefix, ktype, key)
1✔
740
}
1✔
741

742
func (cont *AciController) initStaticObjs() {
1✔
743
        cont.env.InitStaticAciObjects()
1✔
744
        cont.apicConn.WriteStaticApicObjects(cont.config.AciPrefix+"_static",
1✔
745
                cont.globalStaticObjs())
1✔
746
}
1✔
747

748
func (cont *AciController) vmmDomainProvider() (vmmProv string) {
1✔
749
        vmmProv = "Kubernetes"
1✔
750
        if strings.ToLower(cont.config.AciVmmDomainType) == "openshift" {
1✔
751
                vmmProv = "OpenShift"
×
752
        }
×
753
        return
1✔
754
}
755

756
func (cont *AciController) Run(stopCh <-chan struct{}) {
1✔
757
        var err error
1✔
758
        var privKey []byte
1✔
759
        var apicCert []byte
1✔
760

1✔
761
        cont.config.AciVrfDn = "uni/tn-" + cont.config.AciVrfTenant + "/ctx-" + cont.config.AciVrf
1✔
762

1✔
763
        if cont.config.ApicPrivateKeyPath != "" {
1✔
764
                privKey, err = os.ReadFile(cont.config.ApicPrivateKeyPath)
×
765
                if err != nil {
×
766
                        panic(err)
×
767
                }
768
        }
769
        if cont.config.ApicCertPath != "" {
1✔
770
                apicCert, err = os.ReadFile(cont.config.ApicCertPath)
×
771
                if err != nil {
×
772
                        panic(err)
×
773
                }
774
        }
775
        // If not defined, default is 1800
776
        if cont.config.ApicRefreshTimer == "" {
2✔
777
                cont.config.ApicRefreshTimer = "1800"
1✔
778
        }
1✔
779
        refreshTimeout, err := strconv.Atoi(cont.config.ApicRefreshTimer)
1✔
780
        if err != nil {
1✔
781
                panic(err)
×
782
        }
783
        cont.log.Info("ApicRefreshTimer conf is set to: ", refreshTimeout)
1✔
784

1✔
785
        // Bailout if the refreshTimeout is more than 12Hours or less than 5Mins
1✔
786
        if refreshTimeout > (12*60*60) || refreshTimeout < (5*60) {
1✔
787
                cont.log.Info("ApicRefreshTimer can't be more than 12Hrs or less than 5Mins")
×
788
                panic(err)
×
789
        }
790

791
        // If RefreshTickerAdjustInterval is not defined, default to 150Sec.
792
        if cont.config.ApicRefreshTickerAdjust == "" {
2✔
793
                cont.config.ApicRefreshTickerAdjust = "210"
1✔
794
        }
1✔
795
        refreshTickerAdjust, err := strconv.Atoi(cont.config.ApicRefreshTickerAdjust)
1✔
796
        if err != nil {
1✔
797
                panic(err)
×
798
        }
799

800
        // If not defined, default to 900s
801
        if cont.config.LeafRebootCheckInterval == 0 {
2✔
802
                cont.config.LeafRebootCheckInterval = 900
1✔
803
        }
1✔
804

805
        //If ApicSubscriptionDelay is not defined, default to 100ms
806
        if cont.config.ApicSubscriptionDelay == 0 {
2✔
807
                cont.config.ApicSubscriptionDelay = 100
1✔
808
        }
1✔
809
        cont.log.Info("ApicSubscriptionDelay conf is set to: ", cont.config.ApicSubscriptionDelay)
1✔
810

1✔
811
        // If OpflexDeviceDeleteTimeout is not defined, default to 1800s
1✔
812
        if cont.config.OpflexDeviceDeleteTimeout == 0 {
2✔
813
                cont.config.OpflexDeviceDeleteTimeout = 1800
1✔
814
        }
1✔
815

816
        // If OpflexDeviceReconnectWaitTimeout is not defined, default to 25s
817
        if cont.config.OpflexDeviceReconnectWaitTimeout == 0 {
2✔
818
                cont.config.OpflexDeviceReconnectWaitTimeout = 25
1✔
819
        }
1✔
820
        cont.log.Debug("OpflexDeviceReconnectWaitTimeout set to: ", cont.config.OpflexDeviceReconnectWaitTimeout)
1✔
821

1✔
822
        // If SleepTimeSnatGlobalInfoSync is not defined, default to 60
1✔
823
        if cont.config.SleepTimeSnatGlobalInfoSync == 0 {
2✔
824
                cont.config.SleepTimeSnatGlobalInfoSync = 60
1✔
825
        }
1✔
826

827
        // If not defined, default to 32
828
        if cont.config.PodIpPoolChunkSize == 0 {
2✔
829
                cont.config.PodIpPoolChunkSize = 32
1✔
830
        }
1✔
831
        if !cont.isCNOEnabled() {
2✔
832
                cont.log.Info("PodIpPoolChunkSize conf is set to: ", cont.config.PodIpPoolChunkSize)
1✔
833
        }
1✔
834

835
        // If ApicConnectionRetryLimit is not defined, default to 5
836
        if cont.config.ApicConnectionRetryLimit == 0 {
2✔
837
                cont.config.ApicConnectionRetryLimit = 5
1✔
838
        }
1✔
839
        cont.log.Debug("ApicConnectionRetryLimit set to: ", cont.config.ApicConnectionRetryLimit)
1✔
840

1✔
841
        // If not valid, default to 5000-65000
1✔
842
        // other permissible values 1-65000
1✔
843
        defStart := 5000
1✔
844
        defEnd := 65000
1✔
845
        if cont.config.SnatDefaultPortRangeStart == 0 {
2✔
846
                cont.config.SnatDefaultPortRangeStart = defStart
1✔
847
        }
1✔
848
        if cont.config.SnatDefaultPortRangeEnd == 0 {
2✔
849
                cont.config.SnatDefaultPortRangeEnd = defEnd
1✔
850
        }
1✔
851
        if cont.config.SnatDefaultPortRangeStart < 0 || cont.config.SnatDefaultPortRangeEnd < 0 ||
1✔
852
                cont.config.SnatDefaultPortRangeStart > defEnd || cont.config.SnatDefaultPortRangeEnd > defEnd ||
1✔
853
                cont.config.SnatDefaultPortRangeStart > cont.config.SnatDefaultPortRangeEnd {
1✔
854
                cont.config.SnatDefaultPortRangeStart = defStart
×
855
                cont.config.SnatDefaultPortRangeEnd = defEnd
×
856
        }
×
857

858
        // Set default value for pbr programming delay if services list is not empty
859
        // and delay value is empty
860
        if cont.config.ServiceGraphEndpointAddDelay.Delay == 0 &&
1✔
861
                cont.config.ServiceGraphEndpointAddDelay.Services != nil &&
1✔
862
                len(cont.config.ServiceGraphEndpointAddDelay.Services) > 0 {
1✔
863
                cont.config.ServiceGraphEndpointAddDelay.Delay = 90
×
864
        }
×
865
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
866
                cont.log.Info("ServiceGraphEndpointAddDelay set to: ", cont.config.ServiceGraphEndpointAddDelay.Delay)
×
867
        }
×
868

869
        // Set contract scope for snat svc graph to global by default
870
        if cont.config.SnatSvcContractScope == "" {
2✔
871
                cont.config.SnatSvcContractScope = "global"
1✔
872
        }
1✔
873
        if cont.config.MaxSvcGraphNodes == 0 {
2✔
874
                cont.config.MaxSvcGraphNodes = 32
1✔
875
        }
1✔
876
        if !cont.isCNOEnabled() {
2✔
877
                cont.log.Info("Max number of nodes per svc graph is set to: ", cont.config.MaxSvcGraphNodes)
1✔
878
        }
1✔
879
        cont.apicConn, err = apicapi.New(cont.log, cont.config.ApicHosts,
1✔
880
                cont.config.ApicUsername, cont.config.ApicPassword,
1✔
881
                privKey, apicCert, cont.config.AciPrefix,
1✔
882
                refreshTimeout, refreshTickerAdjust, cont.config.LeafRebootCheckInterval, cont.config.ApicSubscriptionDelay,
1✔
883
                cont.config.AciVrfTenant, cont.UpdateLLDPIfLocked, cont.isCNOEnabled())
1✔
884
        if err != nil {
1✔
885
                panic(err)
×
886
        }
887

888
        cont.apicConn.FilterOpflexDevice = cont.config.FilterOpflexDevice
1✔
889
        cont.apicConn.Flavor = cont.config.Flavor
1✔
890
        cont.apicConn.VmmDomain = cont.config.AciVmmDomain
1✔
891
        cont.apicConn.ReconnectRetryLimit = cont.config.ApicConnectionRetryLimit
1✔
892
        cont.apicConn.RequestRetryDelay = cont.config.ApicRequestRetryDelay
1✔
893
        cont.apicConn.EnableRequestRetry = cont.config.EnableApicRequestRetry
1✔
894

1✔
895
        if len(cont.config.ApicHosts) != 0 {
1✔
896
        APIC_SWITCH:
×
897
                cont.log.WithFields(logrus.Fields{
×
898
                        "mod":  "APICAPI",
×
899
                        "host": cont.apicConn.Apic[cont.apicConn.ApicIndex],
×
900
                }).Debug("Connecting to APIC to determine the Version")
×
901

×
902
                version, err := cont.apicConn.GetVersion()
×
903
                if err != nil {
×
904
                        cont.log.Error("Could not get APIC version, switching to next APIC")
×
905
                        cont.apicConn.ApicIndex = (cont.apicConn.ApicIndex + 1) % len(cont.apicConn.Apic)
×
906
                        time.Sleep(cont.apicConn.ReconnectInterval)
×
907
                        goto APIC_SWITCH
×
908
                }
909
                cont.apicConn.CachedVersion = version
×
910
                apicapi.ApicVersion = version
×
911
                if version >= "4.2(4i)" {
×
912
                        cont.apicConn.SnatPbrFltrChain = true
×
913
                } else {
×
914
                        cont.apicConn.SnatPbrFltrChain = false
×
915
                }
×
916
                if version >= "5.2" {
×
917
                        cont.vmmClusterFaultSupported = true
×
918
                }
×
919
        } else { // For unit-tests
1✔
920
                cont.apicConn.SnatPbrFltrChain = true
1✔
921
        }
1✔
922

923
        if !cont.isCNOEnabled() {
2✔
924
                cont.log.Debug("SnatPbrFltrChain set to:", cont.apicConn.SnatPbrFltrChain)
1✔
925
                // Make sure Pod/NodeBDs are assoicated to same VRF.
1✔
926
                if len(cont.config.ApicHosts) != 0 && cont.config.AciPodBdDn != "" && cont.config.AciNodeBdDn != "" {
1✔
927
                        var expectedVrfRelations []string
×
928
                        expectedVrfRelations = append(expectedVrfRelations, cont.config.AciPodBdDn, cont.config.AciNodeBdDn)
×
929
                        cont.log.Debug("expectedVrfRelations:", expectedVrfRelations)
×
930
                        err = cont.apicConn.ValidateAciVrfAssociation(cont.config.AciVrfDn, expectedVrfRelations)
×
931
                        if err != nil {
×
932
                                cont.log.Error("Pod/NodeBDs and AciL3Out VRF association is incorrect")
×
933
                                panic(err)
×
934
                        }
935
                }
936
        }
937

938
        if len(cont.config.ApicHosts) != 0 && cont.vmmClusterFaultSupported && !cont.isCNOEnabled() {
1✔
939
                //Clear fault instances when the controller starts
×
940
                cont.clearFaultInstances()
×
941
                //Subscribe for vmmEpPD for a given domain
×
942
                var tnTargetFilterEpg string
×
943
                tnTargetFilterEpg += fmt.Sprintf("uni/vmmp-%s/dom-%s/", cont.vmmDomainProvider(), cont.config.AciVmmDomain)
×
944
                subnetTargetFilterEpg := fmt.Sprintf("and(wcard(vmmEpPD.dn,\"%s\"))", tnTargetFilterEpg)
×
945
                cont.apicConn.AddSubscriptionClass("vmmEpPD",
×
946
                        []string{"vmmEpPD"}, subnetTargetFilterEpg)
×
947
                cont.apicConn.SetSubscriptionHooks("vmmEpPD",
×
948
                        func(obj apicapi.ApicObject) bool {
×
949
                                cont.vmmEpPDChanged(obj)
×
950
                                return true
×
951
                        },
×
952
                        func(dn string) {
×
953
                                cont.vmmEpPDDeleted(dn)
×
954
                        })
×
955
        }
956

957
        cont.initStaticObjs()
1✔
958

1✔
959
        err = cont.env.PrepareRun(stopCh)
1✔
960
        if err != nil {
1✔
961
                panic(err.Error())
×
962
        }
963

964
        infraQuerierInitDone := false
1✔
965

1✔
966
        cont.apicConn.FullSyncHook = func() {
1✔
967
                // put a channel into each work queue and wait on it to
×
968
                // checkpoint object syncing in response to new subscription
×
969
                // updates
×
970
                cont.log.Debug("Starting checkpoint")
×
971
                var chans []chan struct{}
×
972
                qs := make([]workqueue.RateLimitingInterface, 0)
×
973
                _, ok := cont.env.(*K8sEnvironment)
×
974
                if ok {
×
975
                        qs = []workqueue.RateLimitingInterface{cont.podQueue}
×
976
                        if !cont.isCNOEnabled() {
×
977
                                if !cont.config.DisableHppRendering {
×
978
                                        qs = append(qs, cont.netPolQueue)
×
979
                                }
×
980
                                if cont.config.EnableHppDirect {
×
981
                                        qs = append(qs, cont.remIpContQueue)
×
982
                                }
×
983
                                qs = append(qs, cont.qosQueue, cont.serviceQueue,
×
984
                                        cont.snatQueue, cont.netflowQueue, cont.snatNodeInfoQueue,
×
985
                                        cont.rdConfigQueue, cont.erspanQueue,
×
986
                                        cont.epgDnCacheUpdateQueue)
×
987
                        }
988
                }
989
                for _, q := range qs {
×
990
                        c := make(chan struct{})
×
991
                        chans = append(chans, c)
×
992
                        q.Add(c)
×
993
                }
×
994
                for _, c := range chans {
×
995
                        <-c
×
996
                }
×
997
                cont.log.Debug("Checkpoint complete")
×
NEW
998

×
NEW
999
                if !infraQuerierInitDone && !cont.isCNOEnabled() && cont.config.AciMultipod {
×
NEW
1000
                        if cont.initInfraQuerierSubnet() {
×
NEW
1001
                                infraQuerierInitDone = true
×
NEW
1002
                        }
×
1003
                }
1004
        }
1005

1006
        if len(cont.config.ApicHosts) != 0 && !cont.isCNOEnabled() {
1✔
1007
                cont.BuildSubnetDnCache(cont.config.AciVrfDn, cont.config.AciVrfDn)
×
1008
                cont.scheduleRdConfig()
×
1009
                if strings.Contains(cont.config.Flavor, "openstack") {
×
1010
                        cont.setOpenStackSystemId()
×
1011
                }
×
1012
        }
1013

1014
        if !cont.isCNOEnabled() {
2✔
1015
                if cont.config.AciPolicyTenant != cont.config.AciVrfTenant {
2✔
1016
                        cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
1017
                                []string{"hostprotPol"})
1✔
1018
                }
1✔
1019
        } else {
1✔
1020
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciPolicyTenant,
1✔
1021
                        []string{"fvBD", "fvAp"})
1✔
1022
                cont.apicConn.AddSubscriptionClass("fvnsVlanInstP",
1✔
1023
                        []string{"fvnsVlanInstP"}, "")
1✔
1024
                cont.apicConn.AddSubscriptionClass("infraRsDomP",
1✔
1025
                        []string{"infraRsDomP"}, "")
1✔
1026
                cont.apicConn.AddSubscriptionClass("physDomP",
1✔
1027
                        []string{"physDomP"}, "")
1✔
1028
                cont.apicConn.AddSubscriptionClass("l3extDomP",
1✔
1029
                        []string{"l3extDomP"}, "")
1✔
1030
                cont.apicConn.AddSubscriptionClass("infraRsVlanNs",
1✔
1031
                        []string{"infraRsVlanNs"}, "")
1✔
1032
                cont.apicConn.AddSubscriptionClass("infraGeneric",
1✔
1033
                        []string{"infraGeneric", "infraRsFuncToEpg"}, "")
1✔
1034
                cont.apicConn.AddSubscriptionClass("l3extOut",
1✔
1035
                        []string{"l3extInstP", "l3extSubnet", "fvRsCons", "fvRsProv", "l3extRsEctx", "l3extRsL3DomAtt", "l3extLNodeP", "l3extRsNodeL3OutAtt", "ipRouteP", "ipNexthopP", "l3extLIfP", "l3extVirtualLIfP", "l3extRsDynPathAtt",
1✔
1036
                                "l3extRsPathL3OutAtt", "l3extMember", "l3extIp", "bgpExtP", "bgpPeerP", "bgpAsP", "bgpLocalAsnP", "bgpRsPeerPfxPol"}, "")
1✔
1037
                cont.apicConn.AddSubscriptionClass("bgpPeerPfxPol",
1✔
1038
                        []string{"bgpPeerPfxPol"}, "")
1✔
1039
        }
1✔
1040

1041
        if cont.config.VmmLite {
1✔
1042
                cont.apicConn.AddSubscriptionClass("infraAttEntityP",
×
1043
                        []string{"infraRsFuncToEpg"}, "")
×
1044

×
1045
                cont.apicConn.SetSubscriptionHooks(
×
1046
                        "infraAttEntityP",
×
1047
                        func(obj apicapi.ApicObject) bool {
×
1048
                                cont.log.Debug("EPG attached to AAEP")
×
1049
                                cont.handleAaepEpgAttach(obj)
×
1050
                                return true
×
1051
                        },
×
1052
                        func(dn string) {
×
1053
                                cont.log.Debug("EPG detached from AAEP")
×
1054
                                cont.handleAaepEpgDetach(dn)
×
1055
                        },
×
1056
                )
1057
        }
1058

1059
        if !cont.isCNOEnabled() {
2✔
1060
                // Websocket notifications for objects under vrfTenant are filtered in
1✔
1061
                // handleSocketUpdate() using DN-based prefix matching (conn.prefix + "_").
1✔
1062
                // All MO names created via AciNameForKey contain this prefix pattern.
1✔
1063
                subscribeMo := []string{"fvBD", "vnsLDevVip", "vnsAbsGraph", "vnsLDevCtx",
1✔
1064
                        "vzFilter", "vzBrCP", "l3extInstP", "vnsSvcRedirectPol",
1✔
1065
                        "vnsRedirectHealthGroup", "fvIPSLAMonitoringPol"}
1✔
1066
                if cont.config.AciPolicyTenant == cont.config.AciVrfTenant {
1✔
1067
                        subscribeMo = append(subscribeMo, "hostprotPol")
×
1068
                }
×
1069
                cont.apicConn.AddSubscriptionDn("uni/tn-"+cont.config.AciVrfTenant,
1✔
1070
                        subscribeMo)
1✔
1071
                cont.apicConn.AddSubscriptionDn(fmt.Sprintf("uni/tn-%s/out-%s",
1✔
1072
                        cont.config.AciVrfTenant, cont.config.AciL3Out),
1✔
1073
                        []string{"fvRsCons"})
1✔
1074
                vmmDn := fmt.Sprintf("comp/prov-%s/ctrlr-[%s]-%s/injcont",
1✔
1075
                        cont.env.VmmPolicy(), cont.config.AciVmmDomain,
1✔
1076
                        cont.config.AciVmmController)
1✔
1077
                // Before subscribing to vmm objects, add vmmInjectedLabel as a child after explicit APIC version check
1✔
1078
                // Since it is not supported for APIC versions < "5.0"
1✔
1079
                cont.addVmmInjectedLabel()
1✔
1080
                cont.apicConn.AddSyncDn(vmmDn,
1✔
1081
                        []string{"vmmInjectedHost", "vmmInjectedNs"})
1✔
1082

1✔
1083
                var tnTargetFilter string
1✔
1084
                if len(cont.config.AciVrfRelatedTenants) > 0 {
1✔
1085
                        for _, tn := range cont.config.AciVrfRelatedTenants {
×
1086
                                tnTargetFilter += fmt.Sprintf("tn-%s/|", tn)
×
1087
                        }
×
1088
                } else {
1✔
1089
                        tnTargetFilter += fmt.Sprintf("tn-%s/|tn-%s/",
1✔
1090
                                cont.config.AciPolicyTenant, cont.config.AciVrfTenant)
1✔
1091
                }
1✔
1092
                subnetTargetFilter := fmt.Sprintf("and(wcard(fvSubnet.dn,\"%s\"))",
1✔
1093
                        tnTargetFilter)
1✔
1094
                cont.apicConn.AddSubscriptionClass("fvSubnet",
1✔
1095
                        []string{"fvSubnet"}, subnetTargetFilter)
1✔
1096

1✔
1097
                cont.apicConn.SetSubscriptionHooks("fvSubnet",
1✔
1098
                        func(obj apicapi.ApicObject) bool {
1✔
1099
                                cont.SubnetChanged(obj, cont.config.AciVrfDn)
×
1100
                                return true
×
1101
                        },
×
1102
                        func(dn string) {
×
1103
                                cont.SubnetDeleted(dn)
×
1104
                        })
×
1105

1106
                cont.apicConn.AddSubscriptionClass("opflexODev",
1✔
1107
                        []string{"opflexODev"}, "")
1✔
1108

1✔
1109
                cont.apicConn.SetSubscriptionHooks("opflexODev",
1✔
1110
                        func(obj apicapi.ApicObject) bool {
1✔
1111
                                cont.opflexDeviceChanged(obj)
×
1112
                                return true
×
1113
                        },
×
1114
                        func(dn string) {
×
1115
                                cont.opflexDeviceDeleted(dn)
×
1116
                        })
×
1117

1118
                if !cont.config.DisableServiceVlanPreprovisioning && !strings.Contains(cont.config.Flavor, "openstack") {
2✔
1119
                        if cont.config.AEP == "" {
2✔
1120
                                cont.log.Error("AEP is missing in configuration, preprovisioning of service vlan will be disabled")
1✔
1121
                        } else {
1✔
1122
                                infraRtAttEntPFilter := fmt.Sprintf("and(wcard(infraRtAttEntP.dn,\"/attentp-%s/\"))", cont.config.AEP)
×
1123
                                cont.apicConn.AddSubscriptionClass("infraRtAttEntP",
×
1124
                                        []string{"infraRtAttEntP"}, infraRtAttEntPFilter)
×
1125

×
1126
                                // For bare metal, the infraRtAttEntP associated with an AEP will be empty.
×
1127
                                // We should not receive any updates for such cases.
×
1128
                                cont.apicConn.SetSubscriptionHooks("infraRtAttEntP",
×
1129
                                        func(obj apicapi.ApicObject) bool {
×
1130
                                                cont.infraRtAttEntPChanged(obj)
×
1131
                                                return true
×
1132
                                        },
×
1133
                                        func(dn string) {
×
1134
                                                cont.infraRtAttEntPDeleted(dn)
×
1135
                                        })
×
1136

1137
                                cont.apicConn.AddSubscriptionClass("vpcIf",
×
1138
                                        []string{"vpcIf"}, "")
×
1139

×
1140
                                cont.apicConn.SetSubscriptionHooks("vpcIf",
×
1141
                                        func(obj apicapi.ApicObject) bool {
×
1142
                                                cont.vpcIfChanged(obj)
×
1143
                                                return true
×
1144
                                        },
×
1145
                                        func(dn string) {
×
1146
                                                cont.vpcIfDeleted(dn)
×
1147
                                        })
×
1148
                        }
1149
                }
1150

1151
                cont.apicConn.VersionUpdateHook =
1✔
1152
                        func() {
1✔
1153
                                cont.initStaticServiceObjs()
×
1154
                        }
×
1155
        } else if cont.config.VmmLite {
1✔
1156
                cont.apicConn.VMMLiteSyncHook = func() {
×
1157
                        cont.syncAndCleanNadCache()
×
1158
                        cont.syncAndCleanNads()
×
1159
                }
×
1160
        }
1161
        go cont.apicConn.Run(stopCh)
1✔
1162
}
1163

1164
func (cont *AciController) syncNodeAciPods(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1165
        cont.log.Debug("Go routine to periodically check the aci pod information in the opflexOdev of a node")
1✔
1166
        ticker := time.NewTicker(seconds * time.Second)
1✔
1167
        defer ticker.Stop()
1✔
1168

1✔
1169
        for {
2✔
1170
                select {
1✔
1171
                case <-ticker.C:
1✔
1172
                        if cont.config.EnableOpflexAgentReconnect {
1✔
1173
                                cont.checkChangeOfOpflexOdevAciPod()
×
1174
                        }
×
1175
                        if cont.config.AciMultipod {
1✔
1176
                                cont.checkChangeOfOdevAciPod()
×
1177
                        }
×
1178
                case <-stopCh:
1✔
1179
                        return
1✔
1180
                }
1181
        }
1182
}
1183

NEW
1184
func (cont *AciController) initInfraQuerierSubnet() bool {
×
NEW
1185
        if !cont.updateInfraQuerierSubnet() {
×
NEW
1186
                return false
×
NEW
1187
        }
×
1188

NEW
1189
        for _, nodeName := range cont.nodeIndexer.ListKeys() {
×
NEW
1190
                nodeObj, exists, err := cont.nodeIndexer.GetByKey(nodeName)
×
NEW
1191
                if err != nil || !exists {
×
NEW
1192
                        continue
×
1193
                }
NEW
1194
                node := nodeObj.(*v1.Node)
×
NEW
1195
                if node.ObjectMeta.Annotations[metadata.InfraQuerierSubnetAnnotation] != "" {
×
NEW
1196
                        continue
×
1197
                }
NEW
1198
                go cont.env.NodeAnnotationChanged(nodeName)
×
1199
        }
1200

NEW
1201
        return true
×
1202
}
1203

1204
func (cont *AciController) syncOpflexDevices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1205
        cont.log.Debug("Go routine to periodically delete old opflexdevices started")
1✔
1206
        ticker := time.NewTicker(seconds * time.Second)
1✔
1207
        defer ticker.Stop()
1✔
1208

1✔
1209
        for {
2✔
1210
                select {
1✔
1211
                case <-ticker.C:
1✔
1212
                        cont.deleteOldOpflexDevices()
1✔
1213
                case <-stopCh:
1✔
1214
                        return
1✔
1215
                }
1216
        }
1217
}
1218

1219
func (cont *AciController) syncDelayedEpSlices(stopCh <-chan struct{}, seconds time.Duration) {
1✔
1220
        cont.log.Debug("Go routine to periodically check and process the epslices having delay adding in service")
1✔
1221
        ticker := time.NewTicker(seconds * time.Second)
1✔
1222
        defer ticker.Stop()
1✔
1223

1✔
1224
        for {
2✔
1225
                select {
1✔
1226
                case <-ticker.C:
1✔
1227
                        cont.processDelayedEpSlices()
1✔
1228
                case <-stopCh:
1✔
1229
                        return
1✔
1230
                }
1231
        }
1232
}
1233

1234
func (cont *AciController) snatGlobalInfoSync(stopCh <-chan struct{}, seconds int) {
1✔
1235
        time.Sleep(time.Duration(seconds) * time.Second)
1✔
1236
        cont.log.Debug("Go routine to periodically sync globalinfo and nodeinfo started")
1✔
1237
        iteration := 0
1✔
1238
        for {
2✔
1239
                // To avoid noisy logs, only printing once in 5 minutes
1✔
1240
                if iteration%5 == 0 {
2✔
1241
                        cont.log.Debug("Syncing GlobalInfo with Node infos")
1✔
1242
                }
1✔
1243
                var nodeInfos []*nodeinfo.NodeInfo
1✔
1244
                cont.indexMutex.Lock()
1✔
1245
                cache.ListAll(cont.snatNodeInfoIndexer, labels.Everything(),
1✔
1246
                        func(nodeInfoObj interface{}) {
2✔
1247
                                nodeInfo := nodeInfoObj.(*nodeinfo.NodeInfo)
1✔
1248
                                nodeInfos = append(nodeInfos, nodeInfo)
1✔
1249
                        })
1✔
1250
                expectedmap := make(map[string]map[string]bool)
1✔
1251
                for _, glinfo := range cont.snatGlobalInfoCache {
2✔
1252
                        for nodename, entry := range glinfo {
2✔
1253
                                if _, found := expectedmap[nodename]; !found {
2✔
1254
                                        newentry := make(map[string]bool)
1✔
1255
                                        newentry[entry.SnatPolicyName] = true
1✔
1256
                                        expectedmap[nodename] = newentry
1✔
1257
                                } else {
2✔
1258
                                        currententry := expectedmap[nodename]
1✔
1259
                                        currententry[entry.SnatPolicyName] = true
1✔
1260
                                        expectedmap[nodename] = currententry
1✔
1261
                                }
1✔
1262
                        }
1263
                }
1264
                cont.indexMutex.Unlock()
1✔
1265

1✔
1266
                for _, value := range nodeInfos {
2✔
1267
                        marked := false
1✔
1268
                        policyNames := value.Spec.SnatPolicyNames
1✔
1269
                        nodeName := value.ObjectMeta.Name
1✔
1270
                        _, ok := expectedmap[nodeName]
1✔
1271
                        if !ok && len(policyNames) > 0 {
2✔
1272
                                cont.log.Info("Adding missing entry in snatglobalinfo for node: ", nodeName)
1✔
1273
                                cont.log.Debug("No snat policies found in snatglobalinfo")
1✔
1274
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1275
                                marked = true
1✔
1276
                        } else if len(policyNames) != len(expectedmap[nodeName]) {
3✔
1277
                                cont.log.Info("Adding missing snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1278
                                cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1279
                                cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1280
                                marked = true
1✔
1281
                        } else {
2✔
1282
                                if len(policyNames) == 0 && len(expectedmap[nodeName]) == 0 {
1✔
1283
                                        // No snatpolicies present
×
1284
                                        continue
×
1285
                                }
1286
                                eq := reflect.DeepEqual(expectedmap[nodeName], policyNames)
1✔
1287
                                if !eq {
2✔
1288
                                        cont.log.Debug("Syncing inconsistent snatpolicy entry in snatglobalinfo for node: ", nodeName)
1✔
1289
                                        cont.log.Debug("Snatpolicy list according to snatglobalinfo: ", expectedmap[nodeName])
1✔
1290
                                        cont.log.Debug("Snatpolicy list according to nodeinfo: ", policyNames)
1✔
1291
                                        marked = true
1✔
1292
                                }
1✔
1293
                        }
1294
                        if marked {
2✔
1295
                                cont.log.Info("Nodeinfo and globalinfo out of sync for node: ", nodeName)
1✔
1296
                                nodeinfokey, err := cache.MetaNamespaceKeyFunc(value)
1✔
1297
                                if err != nil {
1✔
1298
                                        cont.log.Error("Not able to get key for node: ", nodeName)
×
1299
                                        continue
×
1300
                                }
1301
                                cont.log.Info("Queuing nodeinfokey for globalinfo sync: ", nodeinfokey)
1✔
1302
                                cont.queueNodeInfoUpdateByKey(nodeinfokey)
1✔
1303
                        } else if iteration%5 == 0 {
2✔
1304
                                cont.log.Info("Nodeinfo and globalinfo in sync for node: ", nodeName)
1✔
1305
                        }
1✔
1306
                }
1307
                time.Sleep(time.Duration(seconds) * time.Second)
1✔
1308
                iteration++
1✔
1309
        }
1310
}
1311

1312
func (cont *AciController) processSyncQueue(queue workqueue.RateLimitingInterface,
1313
        queueStop <-chan struct{}) {
1✔
1314
        go wait.Until(func() {
2✔
1315
                for {
2✔
1316
                        syncType, quit := queue.Get()
1✔
1317
                        if quit {
2✔
1318
                                break
1✔
1319
                        }
1320
                        var requeue bool
1✔
1321
                        if sType, ok := syncType.(string); ok {
2✔
1322
                                if f, ok := cont.syncProcessors[sType]; ok {
2✔
1323
                                        requeue = f()
1✔
1324
                                }
1✔
1325
                        }
1326
                        if requeue {
1✔
1327
                                queue.AddRateLimited(syncType)
×
1328
                        } else {
1✔
1329
                                queue.Forget(syncType)
1✔
1330
                        }
1✔
1331
                        queue.Done(syncType)
1✔
1332
                }
1333
        }, time.Second, queueStop)
1334
        <-queueStop
1✔
1335
        queue.ShutDown()
1✔
1336
}
1337

1338
func (cont *AciController) scheduleSyncGlobalInfo() {
1✔
1339
        cont.syncQueue.AddRateLimited("snatGlobalInfo")
1✔
1340
}
1✔
1341
func (cont *AciController) scheduleRdConfig() {
×
1342
        cont.syncQueue.AddRateLimited("rdConfig")
×
1343
}
×
1344
func (cont *AciController) scheduleCreateIstioCR() {
×
1345
        cont.syncQueue.AddRateLimited("istioCR")
×
1346
}
×
1347

1348
func (cont *AciController) addVmmInjectedLabel() {
1✔
1349
        if apicapi.ApicVersion >= "5.2" {
1✔
1350
                err := apicapi.AddMetaDataChild("vmmInjectedNs", "vmmInjectedLabel")
×
1351
                if err != nil {
×
1352
                        panic(err.Error())
×
1353
                }
1354
                err = apicapi.AddMetaDataChild("vmmInjectedSvc", "vmmInjectedLabel")
×
1355
                if err != nil {
×
1356
                        panic(err.Error())
×
1357
                }
1358
        }
1359
        if apicapi.ApicVersion >= "5.0" {
2✔
1360
                err := apicapi.AddMetaDataChild("vmmInjectedReplSet", "vmmInjectedLabel")
1✔
1361
                if err != nil {
1✔
1362
                        panic(err.Error())
×
1363
                }
1364
                err = apicapi.AddMetaDataChild("vmmInjectedContGrp", "vmmInjectedLabel")
1✔
1365
                if err != nil {
1✔
1366
                        panic(err.Error())
×
1367
                }
1368
                err = apicapi.AddMetaDataChild("vmmInjectedDepl", "vmmInjectedLabel")
1✔
1369
                if err != nil {
1✔
1370
                        panic(err.Error())
×
1371
                }
1372
        }
1373
}
1374

1375
func (cont *AciController) isCNOEnabled() bool {
1✔
1376
        return cont.config.ChainedMode || cont.config.VmmLite
1✔
1377
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc