• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 11713

16 Mar 2026 10:04AM UTC coverage: 62.537% (-0.4%) from 62.911%
11713

Pull #1687

travis-pro

web-flow
Merge 7a3798e9a into 517f6a009
Pull Request #1687: Add infra querier subnet handling for multipod migration

5 of 122 new or added lines in 4 files covered. (4.1%)

11 existing lines in 2 files now uncovered.

13491 of 21573 relevant lines covered (62.54%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.73
/pkg/controller/services.go
1
// Copyright 2016 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "errors"
19
        "fmt"
20
        "net"
21
        "reflect"
22
        "regexp"
23
        "sort"
24
        "strconv"
25
        "strings"
26
        "time"
27

28
        "github.com/noironetworks/aci-containers/pkg/apicapi"
29
        "github.com/noironetworks/aci-containers/pkg/metadata"
30
        "github.com/noironetworks/aci-containers/pkg/util"
31
        "github.com/sirupsen/logrus"
32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35
        "k8s.io/apimachinery/pkg/fields"
36
        "k8s.io/apimachinery/pkg/labels"
37
        "k8s.io/apimachinery/pkg/util/intstr"
38
        "k8s.io/client-go/kubernetes"
39
        "k8s.io/client-go/tools/cache"
40
)
41

42
// Default service contract scope value
43
const DefaultServiceContractScope = "context"
44

45
// Default service ext subnet scope - enable shared security
46
const DefaultServiceExtSubNetShared = false
47

48
func (cont *AciController) initEndpointsInformerFromClient(
49
        kubeClient kubernetes.Interface) {
×
50
        cont.initEndpointsInformerBase(
×
51
                cache.NewListWatchFromClient(
×
52
                        kubeClient.CoreV1().RESTClient(), "endpoints",
×
53
                        metav1.NamespaceAll, fields.Everything()))
×
54
}
×
55

56
func (cont *AciController) initEndpointSliceInformerFromClient(
57
        kubeClient kubernetes.Interface) {
×
58
        cont.initEndpointSliceInformerBase(
×
59
                cache.NewListWatchFromClient(
×
60
                        kubeClient.DiscoveryV1().RESTClient(), "endpointslices",
×
61
                        metav1.NamespaceAll, fields.Everything()))
×
62
}
×
63

64
func (cont *AciController) initEndpointSliceInformerBase(listWatch *cache.ListWatch) {
1✔
65
        cont.endpointSliceIndexer, cont.endpointSliceInformer = cache.NewIndexerInformer(
1✔
66
                listWatch, &discovery.EndpointSlice{}, 0,
1✔
67
                cache.ResourceEventHandlerFuncs{
1✔
68
                        AddFunc: func(obj interface{}) {
2✔
69
                                cont.endpointSliceAdded(obj)
1✔
70
                        },
1✔
71
                        UpdateFunc: func(oldobj interface{}, newobj interface{}) {
1✔
72
                                cont.endpointSliceUpdated(oldobj, newobj)
1✔
73
                        },
1✔
74
                        DeleteFunc: func(obj interface{}) {
1✔
75
                                cont.endpointSliceDeleted(obj)
1✔
76
                        },
1✔
77
                },
78
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
79
        )
80
}
81

82
func (cont *AciController) initEndpointsInformerBase(listWatch *cache.ListWatch) {
1✔
83
        cont.endpointsIndexer, cont.endpointsInformer = cache.NewIndexerInformer(
1✔
84
                listWatch, &v1.Endpoints{}, 0,
1✔
85
                cache.ResourceEventHandlerFuncs{
1✔
86
                        AddFunc: func(obj interface{}) {
2✔
87
                                cont.endpointsAdded(obj)
1✔
88
                        },
1✔
89
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
90
                                cont.endpointsUpdated(old, new)
1✔
91
                        },
1✔
92
                        DeleteFunc: func(obj interface{}) {
1✔
93
                                cont.endpointsDeleted(obj)
1✔
94
                        },
1✔
95
                },
96
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
97
        )
98
}
99

100
func (cont *AciController) initServiceInformerFromClient(
101
        kubeClient *kubernetes.Clientset) {
×
102
        cont.initServiceInformerBase(
×
103
                cache.NewListWatchFromClient(
×
104
                        kubeClient.CoreV1().RESTClient(), "services",
×
105
                        metav1.NamespaceAll, fields.Everything()))
×
106
}
×
107

108
func (cont *AciController) initServiceInformerBase(listWatch *cache.ListWatch) {
1✔
109
        cont.serviceIndexer, cont.serviceInformer = cache.NewIndexerInformer(
1✔
110
                listWatch, &v1.Service{}, 0,
1✔
111
                cache.ResourceEventHandlerFuncs{
1✔
112
                        AddFunc: func(obj interface{}) {
2✔
113
                                cont.serviceAdded(obj)
1✔
114
                        },
1✔
115
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
116
                                cont.serviceUpdated(old, new)
1✔
117
                        },
1✔
118
                        DeleteFunc: func(obj interface{}) {
×
119
                                cont.serviceDeleted(obj)
×
120
                        },
×
121
                },
122
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
123
        )
124
}
125

126
func serviceLogger(log *logrus.Logger, as *v1.Service) *logrus.Entry {
1✔
127
        return log.WithFields(logrus.Fields{
1✔
128
                "namespace": as.ObjectMeta.Namespace,
1✔
129
                "name":      as.ObjectMeta.Name,
1✔
130
                "type":      as.Spec.Type,
1✔
131
        })
1✔
132
}
1✔
133

134
func (cont *AciController) queueIPNetPolUpdates(ips map[string]bool) {
1✔
135
        for ipStr := range ips {
2✔
136
                ip := net.ParseIP(ipStr)
1✔
137
                if ip == nil {
1✔
138
                        continue
×
139
                }
140
                entries, err := cont.netPolSubnetIndex.ContainingNetworks(ip)
1✔
141
                if err != nil {
1✔
142
                        cont.log.Error("Corrupted network policy IP index, err: ", err)
×
143
                        return
×
144
                }
×
145
                for _, entry := range entries {
2✔
146
                        for npkey := range entry.(*ipIndexEntry).keys {
2✔
147
                                cont.queueNetPolUpdateByKey(npkey)
1✔
148
                        }
1✔
149
                }
150
        }
151
}
152

153
func (cont *AciController) queuePortNetPolUpdates(ports map[string]targetPort) {
1✔
154
        for portkey := range ports {
2✔
155
                entry := cont.targetPortIndex[portkey]
1✔
156
                if entry == nil {
2✔
157
                        continue
1✔
158
                }
159
                for npkey := range entry.networkPolicyKeys {
2✔
160
                        cont.queueNetPolUpdateByKey(npkey)
1✔
161
                }
1✔
162
        }
163
}
164

165
func (cont *AciController) queueNetPolForEpAddrs(addrs []v1.EndpointAddress) {
1✔
166
        for _, addr := range addrs {
2✔
167
                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" ||
1✔
168
                        addr.TargetRef.Namespace == "" || addr.TargetRef.Name == "" {
2✔
169
                        continue
1✔
170
                }
171
                podkey := addr.TargetRef.Namespace + "/" + addr.TargetRef.Name
1✔
172
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
173
                ps := make(map[string]bool)
1✔
174
                for _, npkey := range npkeys {
2✔
175
                        cont.queueNetPolUpdateByKey(npkey)
1✔
176
                        ps[npkey] = true
1✔
177
                }
1✔
178
                // Process if the  any matching namedport wildcard policy is present
179
                // ignore np already processed policies
180
                cont.queueMatchingNamedNp(ps, podkey)
1✔
181
        }
182
}
183

184
func (cont *AciController) queueMatchingNamedNp(served map[string]bool, podkey string) {
1✔
185
        cont.indexMutex.Lock()
1✔
186
        for npkey := range cont.nmPortNp {
2✔
187
                if _, ok := served[npkey]; !ok {
2✔
188
                        if cont.checkPodNmpMatchesNp(npkey, podkey) {
2✔
189
                                cont.queueNetPolUpdateByKey(npkey)
1✔
190
                        }
1✔
191
                }
192
        }
193
        cont.indexMutex.Unlock()
1✔
194
}
195
func (cont *AciController) queueEndpointsNetPolUpdates(endpoints *v1.Endpoints) {
1✔
196
        for _, subset := range endpoints.Subsets {
2✔
197
                cont.queueNetPolForEpAddrs(subset.Addresses)
1✔
198
                cont.queueNetPolForEpAddrs(subset.NotReadyAddresses)
1✔
199
        }
1✔
200
}
201

202
func (cont *AciController) returnServiceIps(ips []net.IP) {
1✔
203
        for _, ip := range ips {
2✔
204
                if ip.To4() != nil {
2✔
205
                        cont.serviceIps.DeallocateIp(ip)
1✔
206
                } else if ip.To16() != nil {
3✔
207
                        cont.serviceIps.DeallocateIp(ip)
1✔
208
                }
1✔
209
        }
210
}
211

212
func returnIps(pool *netIps, ips []net.IP) {
1✔
213
        for _, ip := range ips {
2✔
214
                if ip.To4() != nil {
2✔
215
                        pool.V4.AddIp(ip)
1✔
216
                } else if ip.To16() != nil {
3✔
217
                        pool.V6.AddIp(ip)
1✔
218
                }
1✔
219
        }
220
}
221

222
func (cont *AciController) staticMonPolName() string {
1✔
223
        return cont.aciNameForKey("monPol", cont.env.ServiceBd())
1✔
224
}
1✔
225

226
func (cont *AciController) staticMonPolDn() string {
1✔
227
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
228
                return fmt.Sprintf("uni/tn-%s/ipslaMonitoringPol-%s",
1✔
229
                        cont.config.AciVrfTenant, cont.staticMonPolName())
1✔
230
        }
1✔
231
        return ""
×
232
}
233

234
func (cont *AciController) staticServiceObjs() apicapi.ApicSlice {
1✔
235
        var serviceObjs apicapi.ApicSlice
1✔
236

1✔
237
        // Service bridge domain
1✔
238
        bdName := cont.aciNameForKey("bd", cont.env.ServiceBd())
1✔
239
        bd := apicapi.NewFvBD(cont.config.AciVrfTenant, bdName)
1✔
240
        if apicapi.ApicVersion >= "6.0(4c)" {
1✔
241
                bd.SetAttr("serviceBdRoutingDisable", "yes")
×
242
        }
×
243
        bd.SetAttr("arpFlood", "yes")
1✔
244
        bd.SetAttr("ipLearning", "no")
1✔
245
        bd.SetAttr("unkMacUcastAct", cont.config.UnknownMacUnicastAction)
1✔
246
        bdToOut := apicapi.NewRsBdToOut(bd.GetDn(), cont.config.AciL3Out)
1✔
247
        bd.AddChild(bdToOut)
1✔
248
        bdToVrf := apicapi.NewRsCtx(bd.GetDn(), cont.config.AciVrf)
1✔
249
        bd.AddChild(bdToVrf)
1✔
250

1✔
251
        bdn := bd.GetDn()
1✔
252
        for _, cidr := range cont.config.NodeServiceSubnets {
2✔
253
                sn := apicapi.NewFvSubnet(bdn, cidr)
1✔
254
                bd.AddChild(sn)
1✔
255
        }
1✔
256
        serviceObjs = append(serviceObjs, bd)
1✔
257

1✔
258
        // Service IP SLA monitoring policy
1✔
259
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
260
                monPol := apicapi.NewFvIPSLAMonitoringPol(cont.config.AciVrfTenant,
1✔
261
                        cont.staticMonPolName())
1✔
262
                monPol.SetAttr("slaFrequency",
1✔
263
                        strconv.Itoa(cont.config.AciServiceMonitorInterval))
1✔
264
                serviceObjs = append(serviceObjs, monPol)
1✔
265
        }
1✔
266

267
        return serviceObjs
1✔
268
}
269

270
func (cont *AciController) initStaticServiceObjs() {
1✔
271
        cont.apicConn.WriteApicObjects(cont.config.AciPrefix+"_service_static",
1✔
272
                cont.staticServiceObjs())
1✔
273
}
1✔
274

275
func (cont *AciController) updateServicesForNode(nodename string) {
1✔
276
        cont.serviceEndPoints.UpdateServicesForNode(nodename)
1✔
277
}
1✔
278

279
// must have index lock
280
func (cont *AciController) getActiveFabricPathDn(node string) string {
×
281
        var fabricPathDn string
×
282
        sz := len(cont.nodeOpflexDevice[node])
×
283
        for i := range cont.nodeOpflexDevice[node] {
×
284
                device := cont.nodeOpflexDevice[node][sz-1-i]
×
285
                if device.GetAttrStr("state") == "connected" {
×
286
                        fabricPathDn = device.GetAttrStr("fabricPathDn")
×
287
                        break
×
288
                }
289
        }
290
        return fabricPathDn
×
291
}
292

293
func (cont *AciController) getOpflexOdevCount(node string) (int, string) {
×
294
        fabricPathDnCount := make(map[string]int)
×
295
        var maxCount int
×
296
        var fabricPathDn string
×
297
        for _, device := range cont.nodeOpflexDevice[node] {
×
298
                fabricpathdn := device.GetAttrStr("fabricPathDn")
×
299
                count, ok := fabricPathDnCount[fabricpathdn]
×
300
                if ok {
×
301
                        fabricPathDnCount[fabricpathdn] = count + 1
×
302
                } else {
×
303
                        fabricPathDnCount[fabricpathdn] = 1
×
304
                }
×
305
                if fabricPathDnCount[fabricpathdn] >= maxCount {
×
306
                        maxCount = fabricPathDnCount[fabricpathdn]
×
307
                        fabricPathDn = fabricpathdn
×
308
                }
×
309
        }
310
        return maxCount, fabricPathDn
×
311
}
312

313
func deleteDevicesFromList(delDevices, devices apicapi.ApicSlice) apicapi.ApicSlice {
×
314
        var newDevices apicapi.ApicSlice
×
315
        for _, device := range devices {
×
316
                found := false
×
317
                for _, delDev := range delDevices {
×
318
                        if reflect.DeepEqual(delDev, device) {
×
319
                                found = true
×
320
                        }
×
321
                }
322
                if !found {
×
323
                        newDevices = append(newDevices, device)
×
324
                }
×
325
        }
326
        return newDevices
×
327
}
328

329
func (cont *AciController) getAciPodSubnet(pod string) (string, error) {
×
330
        podslice := strings.Split(pod, "-")
×
331
        if len(podslice) < 2 {
×
332
                return "", fmt.Errorf("Failed to get podid from pod")
×
333
        }
×
334
        podid := podslice[1]
×
335
        var subnet string
×
336
        args := []string{
×
337
                "query-target=self",
×
338
        }
×
339
        url := fmt.Sprintf("/api/node/mo/uni/controller/setuppol/setupp-%s.json?%s", podid, strings.Join(args, "&"))
×
340
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
341
        if err != nil {
×
342
                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
343
                return subnet, err
×
344
        }
×
345
        for _, obj := range apicresp.Imdata {
×
346
                for _, body := range obj {
×
347
                        tepPool, ok := body.Attributes["tepPool"].(string)
×
348
                        if ok {
×
349
                                subnet = tepPool
×
350
                                break
×
351
                        }
352
                }
353
        }
354
        return subnet, nil
×
355
}
356

NEW
357
func (cont *AciController) updateInfraQuerierSubnet() bool {
×
NEW
358
        args := []string{
×
NEW
359
                "query-target=children",
×
NEW
360
                "target-subtree-class=fvSubnet",
×
NEW
361
        }
×
NEW
362
        url := fmt.Sprintf("/api/node/mo/uni/tn-infra/BD-default.json?%s", strings.Join(args, "&"))
×
NEW
363
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
NEW
364
        if err != nil {
×
NEW
365
                cont.log.Debug("Failed to get APIC response for infra default BD subnet lookup, err: ", err.Error())
×
NEW
366
                return false
×
NEW
367
        }
×
368

NEW
369
        var subnet string
×
NEW
370
        for _, obj := range apicresp.Imdata {
×
NEW
371
                for _, body := range obj {
×
NEW
372
                        ctrl, _ := body.Attributes["ctrl"].(string)
×
NEW
373
                        if !strings.Contains(ctrl, "querier") {
×
NEW
374
                                continue
×
375
                        }
NEW
376
                        ip, ok := body.Attributes["ip"].(string)
×
NEW
377
                        if ok && ip != "" {
×
NEW
378
                                subnet = ip
×
NEW
379
                                break
×
380
                        }
381
                }
NEW
382
                if subnet != "" {
×
NEW
383
                        break
×
384
                }
385
        }
NEW
386
        if subnet == "" {
×
NEW
387
                cont.log.Debug("Failed to find querier fvSubnet ip under uni/tn-infra/BD-default")
×
NEW
388
                return false
×
NEW
389
        }
×
390

NEW
391
        cont.indexMutex.Lock()
×
NEW
392
        defer cont.indexMutex.Unlock()
×
NEW
393
        if cont.infraQuerierSubnet == subnet {
×
NEW
394
                return false
×
NEW
395
        }
×
NEW
396
        cont.infraQuerierSubnet = subnet
×
NEW
397
        return true
×
398
}
399

400
func (cont *AciController) isSingleOpflexOdev(fabricPathDn string) (bool, error) {
×
401
        pathSlice := strings.Split(fabricPathDn, "/")
×
402
        if len(pathSlice) > 2 {
×
403
                path := pathSlice[2]
×
404
                // topology/<aci_pod_name>/paths-<id>/pathep-<iface> - fabricPathDn if
×
405
                // host is connnected via single link
×
406
                // topology/<aci_pod_name>/protpaths-<id_1>-<id_2>/pathep-<iface> - fabricPathDn if
×
407
                // host is connected to vpc pair
×
408
                if strings.Contains(path, "protpaths-") {
×
409
                        args := []string{
×
410
                                "query-target=self",
×
411
                        }
×
412
                        url := fmt.Sprintf("/api/node/class/vpcDom.json?%s", strings.Join(args, "&"))
×
413
                        apicresp, err := cont.apicConn.GetApicResponse(url)
×
414
                        if err != nil {
×
415
                                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
416
                                return false, err
×
417
                        }
×
418
                        nodeIdSlice := strings.Split(path, "-")
×
419
                        if len(nodeIdSlice) != 3 {
×
420
                                cont.log.Error("Invalid fabricPathDn ", fabricPathDn)
×
421
                                return false, fmt.Errorf("Invalid path in fabricPathDn %s", fabricPathDn)
×
422
                        }
×
423
                        // As host is connected to vpc pair, check if the status of any of the vpcDom is down
424
                        upCount := 0
×
425
                        for i, nodeId := range nodeIdSlice {
×
426
                                instDn := fmt.Sprintf("topology/%s/node-%s/sys/vpc/inst", pathSlice[1], nodeId)
×
427
                                if i == 0 {
×
428
                                        continue
×
429
                                }
430
                                for _, obj := range apicresp.Imdata {
×
431
                                        for _, body := range obj {
×
432
                                                dn, ok := body.Attributes["dn"].(string)
×
433
                                                if ok && strings.Contains(dn, instDn) {
×
434
                                                        peerSt, ok := body.Attributes["peerSt"].(string)
×
435
                                                        if ok && peerSt == "up" {
×
436
                                                                upCount++
×
437
                                                        }
×
438
                                                }
439
                                        }
440
                                }
441
                        }
442
                        if upCount < 2 {
×
443
                                return true, nil
×
444
                        }
×
445
                        return false, nil
×
446
                } else {
×
447
                        return true, nil
×
448
                }
×
449
        }
450
        return false, fmt.Errorf("Invalid fabricPathDn %s ", fabricPathDn)
×
451
}
452

453
func (cont *AciController) createAciPodAnnotation(node string) (aciPodAnnot, error) {
×
454
        odevCount, fabricPathDn := cont.getOpflexOdevCount(node)
×
455
        nodeAciPodAnnot := cont.nodeACIPod[node]
×
456
        isSingleOdev := false
×
457
        if odevCount == 1 {
×
458
                var err error
×
459
                isSingleOdev, err = cont.isSingleOpflexOdev(fabricPathDn)
×
460
                if err != nil {
×
461
                        cont.log.Error(err)
×
462
                        return nodeAciPodAnnot, err
×
463
                }
×
464
        }
465
        if (odevCount == 0) ||
×
466
                (odevCount == 1 && !isSingleOdev) {
×
467
                if nodeAciPodAnnot.aciPod != "none" {
×
468
                        if nodeAciPodAnnot.disconnectTime.IsZero() {
×
469
                                nodeAciPodAnnot.disconnectTime = time.Now()
×
470
                        } else {
×
471
                                currentTime := time.Now()
×
472
                                diff := currentTime.Sub(nodeAciPodAnnot.disconnectTime)
×
473
                                if diff.Seconds() > float64(cont.config.OpflexDeviceReconnectWaitTimeout) {
×
474
                                        nodeAciPodAnnot.aciPod = "none"
×
475
                                        nodeAciPodAnnot.disconnectTime = time.Time{}
×
476
                                }
×
477
                        }
478
                }
479
                return nodeAciPodAnnot, nil
×
480
        } else if (odevCount == 2) ||
×
481
                (odevCount == 1 && isSingleOdev) {
×
482
                // when there is already a connected opflex device,
×
483
                // fabricPathDn will have latest pod iformation
×
484
                // and annotation will be in the form pod-<podid>-<subnet of pod>
×
485
                nodeAciPodAnnot.disconnectTime = time.Time{}
×
486
                nodeAciPod := nodeAciPodAnnot.aciPod
×
487
                pathSlice := strings.Split(fabricPathDn, "/")
×
488
                if len(pathSlice) > 1 {
×
489
                        pod := pathSlice[1]
×
490

×
491
                        // when there is difference in pod info avaliable from fabricPathDn
×
492
                        // and what we have in cache, update info in cache and change annotation on node
×
493
                        if !strings.Contains(nodeAciPod, pod) {
×
494
                                subnet, err := cont.getAciPodSubnet(pod)
×
495
                                if err != nil {
×
496
                                        cont.log.Error("Failed to get subnet of aci pod ", err.Error())
×
497
                                        return nodeAciPodAnnot, err
×
498
                                } else {
×
499
                                        nodeAciPodAnnot.aciPod = pod + "-" + subnet
×
500
                                        return nodeAciPodAnnot, nil
×
501
                                }
×
502
                        } else {
×
503
                                return nodeAciPodAnnot, nil
×
504
                        }
×
505
                } else {
×
506
                        cont.log.Error("Invalid fabricPathDn of opflexOdev of node ", node)
×
507
                        return nodeAciPodAnnot, fmt.Errorf("Invalid fabricPathDn of opflexOdev")
×
508
                }
×
509
        }
510
        return nodeAciPodAnnot, fmt.Errorf("Failed to get annotation for node %s", node)
×
511
}
512

513
func (cont *AciController) createNodeAciPodAnnotation(node string) (aciPodAnnot, error) {
×
514
        odevCount, fabricPathDn := cont.getOpflexOdevCount(node)
×
515
        nodeAciPodAnnot := cont.nodeACIPodAnnot[node]
×
516
        isSingleOdev := false
×
517
        if odevCount == 1 {
×
518
                var err error
×
519
                isSingleOdev, err = cont.isSingleOpflexOdev(fabricPathDn)
×
520
                if err != nil {
×
521
                        cont.log.Error(err)
×
522
                        return nodeAciPodAnnot, err
×
523
                }
×
524
        }
525
        if (odevCount == 0) ||
×
526
                (odevCount == 1 && !isSingleOdev) {
×
527
                if nodeAciPodAnnot.aciPod != "none" {
×
528
                        nodeAciPodAnnot.aciPod = "none"
×
529
                }
×
530
                return nodeAciPodAnnot, nil
×
531
        } else if (odevCount == 2) ||
×
532
                (odevCount == 1 && isSingleOdev) {
×
533
                pathSlice := strings.Split(fabricPathDn, "/")
×
534
                if len(pathSlice) > 1 {
×
535

×
536
                        nodeAciPodAnnot.aciPod = pathSlice[1]
×
537
                        return nodeAciPodAnnot, nil
×
538
                } else {
×
539
                        cont.log.Error("Invalid fabricPathDn of opflexOdev of node ", node)
×
540
                        return nodeAciPodAnnot, fmt.Errorf("Invalid fabricPathDn of opflexOdev")
×
541
                }
×
542
        }
543
        return nodeAciPodAnnot, fmt.Errorf("Failed to get annotation for node %s", node)
×
544
}
545

546
func (cont *AciController) checkChangeOfOpflexOdevAciPod() {
×
547
        var nodeAnnotationUpdates []string
×
548
        cont.apicConn.SyncMutex.Lock()
×
549
        syncDone := cont.apicConn.SyncDone
×
550
        cont.apicConn.SyncMutex.Unlock()
×
551

×
552
        if !syncDone {
×
553
                return
×
554
        }
×
555

556
        cont.indexMutex.Lock()
×
557
        for node := range cont.nodeACIPodAnnot {
×
558
                annot, err := cont.createNodeAciPodAnnotation(node)
×
559
                if err != nil {
×
560
                        if strings.Contains(fmt.Sprint(err), "Failed to get annotation") {
×
561
                                now := time.Now()
×
562
                                if annot.lastErrorTime.IsZero() || now.Sub(annot.lastErrorTime).Seconds() >= 60 {
×
563
                                        annot.lastErrorTime = now
×
564
                                        cont.nodeACIPodAnnot[node] = annot
×
565
                                        cont.log.Error(err.Error())
×
566
                                }
×
567
                        } else {
×
568
                                cont.log.Error(err.Error())
×
569
                        }
×
570
                } else {
×
571
                        if annot != cont.nodeACIPodAnnot[node] {
×
572
                                cont.nodeACIPodAnnot[node] = annot
×
573
                                nodeAnnotationUpdates = append(nodeAnnotationUpdates, node)
×
574
                        }
×
575
                }
576
        }
577
        cont.indexMutex.Unlock()
×
578
        if len(nodeAnnotationUpdates) > 0 {
×
579
                for _, updatednode := range nodeAnnotationUpdates {
×
580
                        go cont.env.NodeAnnotationChanged(updatednode)
×
581
                }
×
582
        }
583
}
584

585
func (cont *AciController) checkChangeOfOdevAciPod() {
×
586
        var nodeAnnotationUpdates []string
×
587
        cont.apicConn.SyncMutex.Lock()
×
588
        syncDone := cont.apicConn.SyncDone
×
589
        cont.apicConn.SyncMutex.Unlock()
×
590

×
591
        if !syncDone {
×
592
                return
×
593
        }
×
594

595
        cont.indexMutex.Lock()
×
596
        for node := range cont.nodeACIPod {
×
597
                annot, err := cont.createAciPodAnnotation(node)
×
598
                if err != nil {
×
599
                        if strings.Contains(fmt.Sprint(err), "Failed to get annotation") {
×
600
                                now := time.Now()
×
601
                                if annot.lastErrorTime.IsZero() || now.Sub(annot.lastErrorTime).Seconds() >= 60 {
×
602
                                        annot.lastErrorTime = now
×
603
                                        cont.nodeACIPod[node] = annot
×
604
                                        cont.log.Error(err.Error())
×
605
                                }
×
606
                        } else {
×
607
                                cont.log.Error(err.Error())
×
608
                        }
×
609
                } else {
×
610
                        if annot != cont.nodeACIPod[node] {
×
611
                                cont.nodeACIPod[node] = annot
×
612
                                nodeAnnotationUpdates = append(nodeAnnotationUpdates, node)
×
613
                        }
×
614
                }
615
        }
616
        cont.indexMutex.Unlock()
×
617
        if len(nodeAnnotationUpdates) > 0 {
×
618
                for _, updatednode := range nodeAnnotationUpdates {
×
619
                        go cont.env.NodeAnnotationChanged(updatednode)
×
620
                }
×
621
        }
622
}
623

624
func (cont *AciController) deleteOldOpflexDevices() {
1✔
625
        var nodeUpdates []string
1✔
626
        cont.indexMutex.Lock()
1✔
627
        for node, devices := range cont.nodeOpflexDevice {
1✔
628
                var delDevices apicapi.ApicSlice
×
629
                fabricPathDn := cont.getActiveFabricPathDn(node)
×
630
                if fabricPathDn != "" {
×
631
                        for _, device := range devices {
×
632
                                if device.GetAttrStr("delete") == "true" && device.GetAttrStr("fabricPathDn") != fabricPathDn {
×
633
                                        deleteTimeStr := device.GetAttrStr("deleteTime")
×
634
                                        deleteTime, err := time.Parse(time.RFC3339, deleteTimeStr)
×
635
                                        if err != nil {
×
636
                                                cont.log.Error("Failed to parse opflex device delete time: ", err)
×
637
                                                continue
×
638
                                        }
639
                                        now := time.Now()
×
640
                                        diff := now.Sub(deleteTime)
×
641
                                        if diff.Seconds() >= cont.config.OpflexDeviceDeleteTimeout {
×
642
                                                delDevices = append(delDevices, device)
×
643
                                        }
×
644
                                }
645
                        }
646
                        if len(delDevices) > 0 {
×
647
                                newDevices := deleteDevicesFromList(delDevices, devices)
×
648
                                cont.nodeOpflexDevice[node] = newDevices
×
649
                                cont.log.Info("Opflex device list for node ", node, " after deleting stale entries: ", cont.nodeOpflexDevice[node])
×
650
                                if len(newDevices) == 0 {
×
651
                                        delete(cont.nodeOpflexDevice, node)
×
652
                                }
×
653
                                nodeUpdates = append(nodeUpdates, node)
×
654
                        }
655
                }
656
        }
657
        cont.indexMutex.Unlock()
1✔
658
        if len(nodeUpdates) > 0 {
1✔
659
                cont.postOpflexDeviceDelete(nodeUpdates)
×
660
        }
×
661
}
662

663
// must have index lock
664
func (cont *AciController) setDeleteFlagForOldDevices(node, fabricPathDn string) {
1✔
665
        for _, device := range cont.nodeOpflexDevice[node] {
2✔
666
                if device.GetAttrStr("fabricPathDn") != fabricPathDn {
1✔
667
                        t := time.Now()
×
668
                        device.SetAttr("delete", "true")
×
669
                        device.SetAttr("deleteTime", t.Format(time.RFC3339))
×
670
                }
×
671
        }
672
}
673

674
// must have index lock
675
func (cont *AciController) fabricPathForNode(name string) (string, bool) {
1✔
676
        sz := len(cont.nodeOpflexDevice[name])
1✔
677
        for i := range cont.nodeOpflexDevice[name] {
2✔
678
                device := cont.nodeOpflexDevice[name][sz-1-i]
1✔
679
                deviceState := device.GetAttrStr("state")
1✔
680
                if deviceState == "connected" {
2✔
681
                        if deviceState != device.GetAttrStr("prevState") {
2✔
682
                                cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Info("Processing fabric path for node ",
1✔
683
                                        "when connected device state is found")
1✔
684
                                device.SetAttr("prevState", deviceState)
1✔
685
                        }
1✔
686
                        fabricPathDn := device.GetAttrStr("fabricPathDn")
1✔
687
                        cont.setDeleteFlagForOldDevices(name, fabricPathDn)
1✔
688
                        return fabricPathDn, true
1✔
689
                } else {
1✔
690
                        device.SetAttr("prevState", deviceState)
1✔
691
                }
1✔
692
        }
693
        if sz > 0 {
2✔
694
                // When the opflex-device for a node changes, for example during a live migration,
1✔
695
                // we end up with both the old and the new device objects till the old object
1✔
696
                // ages out on APIC. The new object is at end of the devices list (see opflexDeviceChanged),
1✔
697
                // so we return the fabricPathDn of the last opflex-device.
1✔
698
                cont.fabricPathLogger(cont.nodeOpflexDevice[name][sz-1].GetAttrStr("hostName"),
1✔
699
                        cont.nodeOpflexDevice[name][sz-1]).Info("Processing fabricPathDn for node")
1✔
700
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("fabricPathDn"), true
1✔
701
        }
1✔
702
        return "", false
1✔
703
}
704

705
// must have index lock
706
func (cont *AciController) deviceMacForNode(name string) (string, bool) {
1✔
707
        sz := len(cont.nodeOpflexDevice[name])
1✔
708
        if sz > 0 {
2✔
709
                // When the opflex-device for a node changes, for example when the
1✔
710
                // node is recreated, we end up with both the old and the new
1✔
711
                // device objects till the old object ages out on APIC. The
1✔
712
                // new object is at end of the devices list (see
1✔
713
                // opflexDeviceChanged), so we return the MAC address of the
1✔
714
                // last opflex-device.
1✔
715
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("mac"), true
1✔
716
        }
1✔
717
        return "", false
1✔
718
}
719

720
func apicRedirectDst(rpDn string, ip string, mac string,
721
        descr string, healthGroupDn string, enablePbrTracking bool) apicapi.ApicObject {
1✔
722
        dst := apicapi.NewVnsRedirectDest(rpDn, ip, mac).SetAttr("descr", descr)
1✔
723
        if healthGroupDn != "" && enablePbrTracking {
2✔
724
                dst.AddChild(apicapi.NewVnsRsRedirectHealthGroup(dst.GetDn(),
1✔
725
                        healthGroupDn))
1✔
726
        }
1✔
727
        return dst
1✔
728
}
729

730
func (cont *AciController) apicRedirectPol(name string, tenantName string, nodes []string,
731
        nodeMap map[string]*metadata.ServiceEndpoint,
732
        monPolDn string, enablePbrTracking bool) (apicapi.ApicObject, string) {
1✔
733
        rp := apicapi.NewVnsSvcRedirectPol(tenantName, name)
1✔
734
        rp.SetAttr("thresholdDownAction", "deny")
1✔
735
        if cont.config.DisableResilientHashing {
1✔
736
                rp.SetAttr("resilientHashEnabled", "no")
×
737
        }
×
738
        rpDn := rp.GetDn()
1✔
739
        for _, node := range nodes {
2✔
740
                cont.indexMutex.Lock()
1✔
741
                serviceEp, ok := nodeMap[node]
1✔
742
                if !ok {
1✔
743
                        continue
×
744
                }
745
                if serviceEp.Ipv4 != nil {
2✔
746
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv4.String(),
1✔
747
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
1✔
748
                }
1✔
749
                if serviceEp.Ipv6 != nil {
1✔
750
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv6.String(),
×
751
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
×
752
                }
×
753
                cont.indexMutex.Unlock()
1✔
754
        }
755
        if monPolDn != "" && enablePbrTracking {
2✔
756
                rp.AddChild(apicapi.NewVnsRsIPSLAMonitoringPol(rpDn, monPolDn))
1✔
757
        }
1✔
758
        return rp, rpDn
1✔
759
}
760

761
func apicExtNetCreate(enDn string, ingress string, ipv4 bool,
762
        cidr bool, sharedSec bool) apicapi.ApicObject {
1✔
763
        if !cidr {
2✔
764
                if ipv4 {
2✔
765
                        ingress += "/32"
1✔
766
                } else {
1✔
767
                        ingress += "/128"
×
768
                }
×
769
        }
770
        subnet := apicapi.NewL3extSubnet(enDn, ingress, "", "")
1✔
771
        if sharedSec {
2✔
772
                subnet.SetAttr("scope", "import-security,shared-security")
1✔
773
        }
1✔
774
        return subnet
1✔
775
}
776

777
func apicExtNet(name string, tenantName string, l3Out string,
778
        ingresses []string, sharedSecurity bool, snat bool) apicapi.ApicObject {
1✔
779
        en := apicapi.NewL3extInstP(tenantName, l3Out, name)
1✔
780
        enDn := en.GetDn()
1✔
781
        if snat {
2✔
782
                en.AddChild(apicapi.NewFvRsCons(enDn, name))
1✔
783
        } else {
2✔
784
                en.AddChild(apicapi.NewFvRsProv(enDn, name))
1✔
785
        }
1✔
786

787
        for _, ingress := range ingresses {
2✔
788
                ip, _, _ := net.ParseCIDR(ingress)
1✔
789
                // If ingress is a subnet
1✔
790
                if ip != nil {
2✔
791
                        if ip != nil && ip.To4() != nil {
2✔
792
                                subnet := apicExtNetCreate(enDn, ingress, true, true, sharedSecurity)
1✔
793
                                en.AddChild(subnet)
1✔
794
                        } else if ip != nil && ip.To16() != nil {
1✔
795
                                subnet := apicExtNetCreate(enDn, ingress, false, true, sharedSecurity)
×
796
                                en.AddChild(subnet)
×
797
                        }
×
798
                } else {
1✔
799
                        // If ingress is an IP address
1✔
800
                        ip := net.ParseIP(ingress)
1✔
801
                        if ip != nil && ip.To4() != nil {
2✔
802
                                subnet := apicExtNetCreate(enDn, ingress, true, false, sharedSecurity)
1✔
803
                                en.AddChild(subnet)
1✔
804
                        } else if ip != nil && ip.To16() != nil {
1✔
805
                                subnet := apicExtNetCreate(enDn, ingress, false, false, sharedSecurity)
×
806
                                en.AddChild(subnet)
×
807
                        }
×
808
                }
809
        }
810
        return en
1✔
811
}
812

813
func apicDefaultEgCons(conName string, tenantName string,
814
        appProfile string, epg string) apicapi.ApicObject {
×
815
        enDn := fmt.Sprintf("uni/tn-%s/ap-%s/epg-%s", tenantName, appProfile, epg)
×
816
        return apicapi.NewFvRsCons(enDn, conName)
×
817
}
×
818

819
func apicExtNetCons(conName string, tenantName string,
820
        l3Out string, net string) apicapi.ApicObject {
1✔
821
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
822
        return apicapi.NewFvRsCons(enDn, conName)
1✔
823
}
1✔
824

825
func apicExtNetProv(conName string, tenantName string,
826
        l3Out string, net string) apicapi.ApicObject {
1✔
827
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
828
        return apicapi.NewFvRsProv(enDn, conName)
1✔
829
}
1✔
830

831
// Helper function to check if a string item exists in a slice
832
func stringInSlice(str string, list []string) bool {
1✔
833
        for _, v := range list {
2✔
834
                if v == str {
2✔
835
                        return true
1✔
836
                }
1✔
837
        }
838
        return false
×
839
}
840

841
func validScope(scope string) bool {
1✔
842
        validValues := []string{"", "context", "tenant", "global"}
1✔
843
        return stringInSlice(scope, validValues)
1✔
844
}
1✔
845

846
func (cont *AciController) getGraphNameFromContract(name, tenantName string) (string, error) {
×
847
        var graphName string
×
848
        args := []string{
×
849
                "query-target=subtree",
×
850
        }
×
851
        url := fmt.Sprintf("/api/node/mo/uni/tn-%s/brc-%s.json?%s", tenantName, name, strings.Join(args, "&"))
×
852
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
853
        if err != nil {
×
854
                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
855
                return graphName, err
×
856
        }
×
857
        for _, obj := range apicresp.Imdata {
×
858
                for class, body := range obj {
×
859
                        if class == "vzRsSubjGraphAtt" {
×
860
                                tnVnsAbsGraphName, ok := body.Attributes["tnVnsAbsGraphName"].(string)
×
861
                                if ok {
×
862
                                        graphName = tnVnsAbsGraphName
×
863
                                }
×
864
                                break
×
865
                        }
866
                }
867
        }
868
        cont.log.Debug("graphName: ", graphName)
×
869
        return graphName, err
×
870
}
871

872
func apicContract(conName string, tenantName string,
873
        graphName string, scopeName string, isSnatPbrFltrChain bool,
874
        customSGAnnot bool) apicapi.ApicObject {
1✔
875
        con := apicapi.NewVzBrCP(tenantName, conName)
1✔
876
        if scopeName != "" && scopeName != "context" {
2✔
877
                con.SetAttr("scope", scopeName)
1✔
878
        }
1✔
879
        cs := apicapi.NewVzSubj(con.GetDn(), "loadbalancedservice")
1✔
880
        csDn := cs.GetDn()
1✔
881
        if isSnatPbrFltrChain {
2✔
882
                cs.SetAttr("revFltPorts", "no")
1✔
883
                inTerm := apicapi.NewVzInTerm(csDn)
1✔
884
                outTerm := apicapi.NewVzOutTerm(csDn)
1✔
885
                inTerm.AddChild(apicapi.NewVzRsInTermGraphAtt(inTerm.GetDn(), graphName))
1✔
886
                inTerm.AddChild(apicapi.NewVzRsFiltAtt(inTerm.GetDn(), conName+"_fromCons-toProv"))
1✔
887
                outTerm.AddChild(apicapi.NewVzRsOutTermGraphAtt(outTerm.GetDn(), graphName))
1✔
888
                outTerm.AddChild(apicapi.NewVzRsFiltAtt(outTerm.GetDn(), conName+"_fromProv-toCons"))
1✔
889
                cs.AddChild(inTerm)
1✔
890
                cs.AddChild(outTerm)
1✔
891
        } else {
2✔
892
                cs.AddChild(apicapi.NewVzRsSubjGraphAtt(csDn, graphName, customSGAnnot))
1✔
893
                cs.AddChild(apicapi.NewVzRsSubjFiltAtt(csDn, conName))
1✔
894
        }
1✔
895
        con.AddChild(cs)
1✔
896
        return con
1✔
897
}
898

899
func apicDevCtx(name string, tenantName string,
900
        graphName string, deviceName string, bdName string, rpDn string, isSnatPbrFltrChain bool) apicapi.ApicObject {
1✔
901
        cc := apicapi.NewVnsLDevCtx(tenantName, name, graphName, "loadbalancer")
1✔
902
        ccDn := cc.GetDn()
1✔
903
        graphDn := fmt.Sprintf("uni/tn-%s/lDevVip-%s", tenantName, deviceName)
1✔
904
        lifDn := fmt.Sprintf("%s/lIf-%s", graphDn, "interface")
1✔
905
        bdDn := fmt.Sprintf("uni/tn-%s/BD-%s", tenantName, bdName)
1✔
906
        cc.AddChild(apicapi.NewVnsRsLDevCtxToLDev(ccDn, graphDn))
1✔
907
        rpDnBase := rpDn
1✔
908
        for _, ctxConn := range []string{"consumer", "provider"} {
2✔
909
                lifCtx := apicapi.NewVnsLIfCtx(ccDn, ctxConn)
1✔
910
                if isSnatPbrFltrChain {
2✔
911
                        if ctxConn == "consumer" {
2✔
912
                                rpDn = rpDnBase + "_Cons"
1✔
913
                        } else {
2✔
914
                                rpDn = rpDnBase + "_Prov"
1✔
915
                        }
1✔
916
                }
917
                lifCtxDn := lifCtx.GetDn()
1✔
918
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToSvcRedirectPol(lifCtxDn, rpDn))
1✔
919
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToBD(lifCtxDn, bdDn))
1✔
920
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToLIf(lifCtxDn, lifDn))
1✔
921
                cc.AddChild(lifCtx)
1✔
922
        }
923
        return cc
1✔
924
}
925

926
func apicFilterEntry(filterDn string, count string, p_start string,
927
        p_end string, protocol string, stateful string, snat bool, outTerm bool) apicapi.ApicObject {
1✔
928
        fe := apicapi.NewVzEntry(filterDn, count)
1✔
929
        fe.SetAttr("etherT", "ip")
1✔
930
        fe.SetAttr("prot", protocol)
1✔
931
        if snat {
2✔
932
                if outTerm {
2✔
933
                        if protocol == "tcp" {
2✔
934
                                fe.SetAttr("tcpRules", "est")
1✔
935
                        }
1✔
936
                        // Reverse the ports for outTerm
937
                        fe.SetAttr("dFromPort", p_start)
1✔
938
                        fe.SetAttr("dToPort", p_end)
1✔
939
                } else {
1✔
940
                        fe.SetAttr("sFromPort", p_start)
1✔
941
                        fe.SetAttr("sToPort", p_end)
1✔
942
                }
1✔
943
        } else {
1✔
944
                fe.SetAttr("dFromPort", p_start)
1✔
945
                fe.SetAttr("dToPort", p_end)
1✔
946
        }
1✔
947
        fe.SetAttr("stateful", stateful)
1✔
948
        return fe
1✔
949
}
950
func apicFilter(name string, tenantName string,
951
        portSpec []v1.ServicePort, snat bool, snatRange portRangeSnat) apicapi.ApicObject {
1✔
952
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
953
        filterDn := filter.GetDn()
1✔
954

1✔
955
        var i int
1✔
956
        var port v1.ServicePort
1✔
957
        for i, port = range portSpec {
2✔
958
                pstr := strconv.Itoa(int(port.Port))
1✔
959
                proto := getProtocolStr(port.Protocol)
1✔
960
                fe := apicFilterEntry(filterDn, strconv.Itoa(i), pstr,
1✔
961
                        pstr, proto, "no", false, false)
1✔
962
                filter.AddChild(fe)
1✔
963
        }
1✔
964

965
        if snat {
1✔
966
                portSpec := []portRangeSnat{snatRange}
×
967
                p_start := strconv.Itoa(portSpec[0].start)
×
968
                p_end := strconv.Itoa(portSpec[0].end)
×
969

×
970
                fe1 := apicFilterEntry(filterDn, strconv.Itoa(i+1), p_start,
×
971
                        p_end, "tcp", "no", false, false)
×
972
                filter.AddChild(fe1)
×
973
                fe2 := apicFilterEntry(filterDn, strconv.Itoa(i+2), p_start,
×
974
                        p_end, "udp", "no", false, false)
×
975
                filter.AddChild(fe2)
×
976
        }
×
977
        return filter
1✔
978
}
979

980
func apicFilterSnat(name string, tenantName string,
981
        portSpec []portRangeSnat, outTerm bool) apicapi.ApicObject {
1✔
982
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
983
        filterDn := filter.GetDn()
1✔
984

1✔
985
        p_start := strconv.Itoa(portSpec[0].start)
1✔
986
        p_end := strconv.Itoa(portSpec[0].end)
1✔
987

1✔
988
        fe := apicFilterEntry(filterDn, "0", p_start,
1✔
989
                p_end, "tcp", "no", true, outTerm)
1✔
990
        filter.AddChild(fe)
1✔
991
        fe1 := apicFilterEntry(filterDn, "1", p_start,
1✔
992
                p_end, "udp", "no", true, outTerm)
1✔
993
        filter.AddChild(fe1)
1✔
994

1✔
995
        return filter
1✔
996
}
1✔
997

998
func (cont *AciController) updateServiceDeviceInstance(key string,
999
        service *v1.Service) error {
1✔
1000
        cont.indexMutex.Lock()
1✔
1001
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
1002
        cont.serviceEndPoints.GetnodesMetadata(key, service, nodeMap)
1✔
1003
        cont.indexMutex.Unlock()
1✔
1004

1✔
1005
        var nodes []string
1✔
1006
        for node := range nodeMap {
2✔
1007
                nodes = append(nodes, node)
1✔
1008
        }
1✔
1009
        sort.Strings(nodes)
1✔
1010
        name := cont.aciNameForKey("svc", key)
1✔
1011
        var conScope string
1✔
1012
        scopeVal, ok := service.ObjectMeta.Annotations[metadata.ServiceContractScopeAnnotation]
1✔
1013
        if ok {
2✔
1014
                normScopeVal := strings.ToLower(scopeVal)
1✔
1015
                if !validScope(normScopeVal) {
1✔
1016
                        errString := "Invalid service contract scope value provided " + scopeVal
×
1017
                        err := errors.New(errString)
×
1018
                        serviceLogger(cont.log, service).Error("Could not create contract: ", err)
×
1019
                        return err
×
1020
                } else {
1✔
1021
                        conScope = normScopeVal
1✔
1022
                }
1✔
1023
        } else {
1✔
1024
                conScope = DefaultServiceContractScope
1✔
1025
        }
1✔
1026

1027
        var sharedSecurity bool
1✔
1028
        if conScope == "global" {
2✔
1029
                sharedSecurity = true
1✔
1030
        } else {
2✔
1031
                sharedSecurity = DefaultServiceExtSubNetShared
1✔
1032
        }
1✔
1033

1034
        graphName := cont.aciNameForKey("svc", "global")
1✔
1035
        deviceName := cont.aciNameForKey("svc", "global")
1✔
1036
        _, customSGAnnPresent := service.ObjectMeta.Annotations[metadata.ServiceGraphNameAnnotation]
1✔
1037
        if customSGAnnPresent {
1✔
1038
                customSG, err := cont.getGraphNameFromContract(name, cont.config.AciVrfTenant)
×
1039
                if err == nil {
×
1040
                        graphName = customSG
×
1041
                }
×
1042
        }
1043
        cont.log.Debug("Using service graph ", graphName, " for service ", key)
1✔
1044

1✔
1045
        var serviceObjs apicapi.ApicSlice
1✔
1046
        if len(nodes) > 0 {
2✔
1047
                // 1. Service redirect policy
1✔
1048
                // The service redirect policy contains the MAC address
1✔
1049
                // and IP address of each of the service endpoints for
1✔
1050
                // each node that hosts a pod for this service.  The
1✔
1051
                // example below shows the case of two nodes.
1✔
1052
                rp, rpDn :=
1✔
1053
                        cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
1✔
1054
                                nodeMap, cont.staticMonPolDn(), cont.config.AciPbrTrackingNonSnat)
1✔
1055
                serviceObjs = append(serviceObjs, rp)
1✔
1056

1✔
1057
                // 2. Service graph contract and external network
1✔
1058
                // The service graph contract must be bound to the service
1✔
1059
                // graph.  This contract must be consumed by the default
1✔
1060
                // layer 3 network and provided by the service layer 3
1✔
1061
                // network.
1✔
1062
                {
2✔
1063
                        var ingresses []string
1✔
1064
                        for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
1065
                                ingresses = append(ingresses, ingress.IP)
1✔
1066
                        }
1✔
1067
                        serviceObjs = append(serviceObjs,
1✔
1068
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
1069
                                        cont.config.AciL3Out, ingresses, sharedSecurity, false))
1✔
1070
                }
1071

1072
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, false, customSGAnnPresent)
1✔
1073
                serviceObjs = append(serviceObjs, contract)
1✔
1074
                for _, net := range cont.config.AciExtNetworks {
2✔
1075
                        serviceObjs = append(serviceObjs,
1✔
1076
                                apicExtNetCons(name, cont.config.AciVrfTenant,
1✔
1077
                                        cont.config.AciL3Out, net))
1✔
1078
                }
1✔
1079

1080
                if cont.config.AddExternalContractToDefaultEPG && service.Spec.Type == v1.ServiceTypeLoadBalancer {
1✔
1081
                        defaultEpgTenant := cont.config.DefaultEg.PolicySpace
×
1082
                        defaultEpgStringSplit := strings.Split(cont.config.DefaultEg.Name, "|")
×
1083
                        var defaultEpgName, appProfile string
×
1084
                        if len(defaultEpgStringSplit) > 1 {
×
1085
                                appProfile = defaultEpgStringSplit[0]
×
1086
                                defaultEpgName = defaultEpgStringSplit[1]
×
1087
                        } else {
×
1088
                                appProfile = cont.config.AppProfile
×
1089
                                defaultEpgName = defaultEpgStringSplit[0]
×
1090
                        }
×
1091
                        serviceObjs = append(serviceObjs,
×
1092
                                apicDefaultEgCons(name, defaultEpgTenant, appProfile, defaultEpgName))
×
1093
                }
1094

1095
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
1096
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
1097

1✔
1098
                _, snat := cont.snatServices[key]
1✔
1099
                filter := apicFilter(name, cont.config.AciVrfTenant,
1✔
1100
                        service.Spec.Ports, snat, defaultPortRange)
1✔
1101
                serviceObjs = append(serviceObjs, filter)
1✔
1102

1✔
1103
                // 3. Device cluster context
1✔
1104
                // The logical device context binds the service contract
1✔
1105
                // to the redirect policy and the device cluster and
1✔
1106
                // bridge domain for the device cluster.
1✔
1107
                serviceObjs = append(serviceObjs,
1✔
1108
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName, deviceName,
1✔
1109
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, false))
1✔
1110
        }
1111

1112
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1113
        return nil
1✔
1114
}
1115

1116
func (cont *AciController) updateServiceDeviceInstanceSnat(key string) error {
1✔
1117
        nodeList := cont.nodeIndexer.List()
1✔
1118
        cont.indexMutex.Lock()
1✔
1119
        if len(cont.nodeServiceMetaCache) == 0 {
2✔
1120
                cont.indexMutex.Unlock()
1✔
1121
                return nil
1✔
1122
        }
1✔
1123
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
1124
        sort.Slice(nodeList, func(i, j int) bool {
2✔
1125
                nodeA := nodeList[i].(*v1.Node)
1✔
1126
                nodeB := nodeList[j].(*v1.Node)
1✔
1127
                return nodeA.ObjectMeta.Name < nodeB.ObjectMeta.Name
1✔
1128
        })
1✔
1129
        for itr, nodeItem := range nodeList {
2✔
1130
                if itr == cont.config.MaxSvcGraphNodes {
1✔
1131
                        break
×
1132
                }
1133
                node := nodeItem.(*v1.Node)
1✔
1134
                nodeName := node.ObjectMeta.Name
1✔
1135
                nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
1136
                if !ok {
2✔
1137
                        continue
1✔
1138
                }
1139
                _, ok = cont.fabricPathForNode(nodeName)
1✔
1140
                if !ok {
1✔
1141
                        continue
×
1142
                }
1143
                nodeLabels := node.ObjectMeta.Labels
1✔
1144
                excludeNode := cont.nodeLabelsInExcludeList(nodeLabels)
1✔
1145
                if excludeNode {
1✔
1146
                        continue
×
1147
                }
1148
                nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
1149
        }
1150
        cont.indexMutex.Unlock()
1✔
1151

1✔
1152
        var nodes []string
1✔
1153
        for node := range nodeMap {
2✔
1154
                nodes = append(nodes, node)
1✔
1155
        }
1✔
1156
        sort.Strings(nodes)
1✔
1157
        name := cont.aciNameForKey("snat", key)
1✔
1158
        var conScope = cont.config.SnatSvcContractScope
1✔
1159
        sharedSecurity := true
1✔
1160

1✔
1161
        graphName := cont.aciNameForKey("svc", "global")
1✔
1162
        var serviceObjs apicapi.ApicSlice
1✔
1163
        if len(nodes) > 0 {
2✔
1164
                // 1. Service redirect policy
1✔
1165
                // The service redirect policy contains the MAC address
1✔
1166
                // and IP address of each of the service endpoints for
1✔
1167
                // each node that hosts a pod for this service.
1✔
1168
                // For SNAT with the introduction of filter-chain usage, to work-around
1✔
1169
                // an APIC limitation, creating two PBR policies with same nodes.
1✔
1170
                var rpDn string
1✔
1171
                var rp apicapi.ApicObject
1✔
1172
                if cont.apicConn.SnatPbrFltrChain {
2✔
1173
                        rpCons, rpDnCons :=
1✔
1174
                                cont.apicRedirectPol(name+"_Cons", cont.config.AciVrfTenant, nodes,
1✔
1175
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
1176
                        serviceObjs = append(serviceObjs, rpCons)
1✔
1177
                        rpProv, _ :=
1✔
1178
                                cont.apicRedirectPol(name+"_Prov", cont.config.AciVrfTenant, nodes,
1✔
1179
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
1180
                        serviceObjs = append(serviceObjs, rpProv)
1✔
1181
                        rpDn = strings.TrimSuffix(rpDnCons, "_Cons")
1✔
1182
                } else {
1✔
1183
                        rp, rpDn =
×
1184
                                cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
×
1185
                                        nodeMap, cont.staticMonPolDn(), true)
×
1186
                        serviceObjs = append(serviceObjs, rp)
×
1187
                }
×
1188
                // 2. Service graph contract and external network
1189
                // The service graph contract must be bound to the
1190
                // service
1191
                // graph.  This contract must be consumed by the default
1192
                // layer 3 network and provided by the service layer 3
1193
                // network.
1194
                {
1✔
1195
                        var ingresses []string
1✔
1196
                        for _, policy := range cont.snatPolicyCache {
2✔
1197
                                ingresses = append(ingresses, policy.SnatIp...)
1✔
1198
                        }
1✔
1199
                        serviceObjs = append(serviceObjs,
1✔
1200
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
1201
                                        cont.config.AciL3Out, ingresses, sharedSecurity, true))
1✔
1202
                }
1203

1204
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, cont.apicConn.SnatPbrFltrChain, false)
1✔
1205
                serviceObjs = append(serviceObjs, contract)
1✔
1206

1✔
1207
                for _, net := range cont.config.AciExtNetworks {
2✔
1208
                        serviceObjs = append(serviceObjs,
1✔
1209
                                apicExtNetProv(name, cont.config.AciVrfTenant,
1✔
1210
                                        cont.config.AciL3Out, net))
1✔
1211
                }
1✔
1212

1213
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
1214
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
1215
                portSpec := []portRangeSnat{defaultPortRange}
1✔
1216
                if cont.apicConn.SnatPbrFltrChain {
2✔
1217
                        filterIn := apicFilterSnat(name+"_fromCons-toProv", cont.config.AciVrfTenant, portSpec, false)
1✔
1218
                        serviceObjs = append(serviceObjs, filterIn)
1✔
1219
                        filterOut := apicFilterSnat(name+"_fromProv-toCons", cont.config.AciVrfTenant, portSpec, true)
1✔
1220
                        serviceObjs = append(serviceObjs, filterOut)
1✔
1221
                } else {
1✔
1222
                        filter := apicFilterSnat(name, cont.config.AciVrfTenant, portSpec, false)
×
1223
                        serviceObjs = append(serviceObjs, filter)
×
1224
                }
×
1225
                // 3. Device cluster context
1226
                // The logical device context binds the service contract
1227
                // to the redirect policy and the device cluster and
1228
                // bridge domain for the device cluster.
1229
                serviceObjs = append(serviceObjs,
1✔
1230
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName, graphName,
1✔
1231
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, cont.apicConn.SnatPbrFltrChain))
1✔
1232
        }
1233
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1234
        return nil
1✔
1235
}
1236

1237
func (cont *AciController) nodeLabelsInExcludeList(Labels map[string]string) bool {
1✔
1238
        nodeSnatRedirectExclude := cont.config.NodeSnatRedirectExclude
1✔
1239

1✔
1240
        for _, nodeGroup := range nodeSnatRedirectExclude {
1✔
1241
                if len(nodeGroup.Labels) == 0 {
×
1242
                        continue
×
1243
                }
1244
                matchFound := true
×
1245
                for _, label := range nodeGroup.Labels {
×
1246
                        if _, ok := Labels["node-role.kubernetes.io/"+label]; !ok {
×
1247
                                matchFound = false
×
1248
                                break
×
1249
                        }
1250
                }
1251
                if matchFound {
×
1252
                        return true
×
1253
                }
×
1254
        }
1255
        return false
1✔
1256
}
1257

1258
func (cont *AciController) queueServiceUpdateByKey(key string) {
1✔
1259
        cont.serviceQueue.Add(key)
1✔
1260
}
1✔
1261

1262
func (cont *AciController) queueServiceUpdate(service *v1.Service) {
1✔
1263
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
1264
        if err != nil {
1✔
1265
                serviceLogger(cont.log, service).
×
1266
                        Error("Could not create service key: ", err)
×
1267
                return
×
1268
        }
×
1269
        cont.serviceQueue.Add(key)
1✔
1270
}
1271

1272
func apicDeviceCluster(name string, vrfTenant string,
1273
        physDom string, encap string,
1274
        nodes []string, nodeMap map[string]string) (apicapi.ApicObject, string) {
1✔
1275
        dc := apicapi.NewVnsLDevVip(vrfTenant, name)
1✔
1276
        dc.SetAttr("managed", "no")
1✔
1277
        dcDn := dc.GetDn()
1✔
1278
        dc.AddChild(apicapi.NewVnsRsALDevToPhysDomP(dcDn,
1✔
1279
                fmt.Sprintf("uni/phys-%s", physDom)))
1✔
1280
        lif := apicapi.NewVnsLIf(dcDn, "interface")
1✔
1281
        lif.SetAttr("encap", encap)
1✔
1282
        lifDn := lif.GetDn()
1✔
1283

1✔
1284
        for _, node := range nodes {
2✔
1285
                path, ok := nodeMap[node]
1✔
1286
                if !ok {
1✔
1287
                        continue
×
1288
                }
1289

1290
                cdev := apicapi.NewVnsCDev(dcDn, node)
1✔
1291
                cif := apicapi.NewVnsCif(cdev.GetDn(), "interface")
1✔
1292
                cif.AddChild(apicapi.NewVnsRsCIfPathAtt(cif.GetDn(), path))
1✔
1293
                cdev.AddChild(cif)
1✔
1294
                lif.AddChild(apicapi.NewVnsRsCIfAttN(lifDn, cif.GetDn()))
1✔
1295
                dc.AddChild(cdev)
1✔
1296
        }
1297

1298
        dc.AddChild(lif)
1✔
1299

1✔
1300
        return dc, dcDn
1✔
1301
}
1302

1303
func apicServiceGraph(name string, tenantName string,
1304
        dcDn string) apicapi.ApicObject {
1✔
1305
        sg := apicapi.NewVnsAbsGraph(tenantName, name)
1✔
1306
        sgDn := sg.GetDn()
1✔
1307
        var provDn string
1✔
1308
        var consDn string
1✔
1309
        var cTermDn string
1✔
1310
        var pTermDn string
1✔
1311
        {
2✔
1312
                an := apicapi.NewVnsAbsNode(sgDn, "loadbalancer")
1✔
1313
                an.SetAttr("managed", "no")
1✔
1314
                an.SetAttr("routingMode", "Redirect")
1✔
1315
                anDn := an.GetDn()
1✔
1316
                cons := apicapi.NewVnsAbsFuncConn(anDn, "consumer")
1✔
1317
                consDn = cons.GetDn()
1✔
1318
                an.AddChild(cons)
1✔
1319
                prov := apicapi.NewVnsAbsFuncConn(anDn, "provider")
1✔
1320
                provDn = prov.GetDn()
1✔
1321
                an.AddChild(prov)
1✔
1322
                an.AddChild(apicapi.NewVnsRsNodeToLDev(anDn, dcDn))
1✔
1323
                sg.AddChild(an)
1✔
1324
        }
1✔
1325
        {
1✔
1326
                tnc := apicapi.NewVnsAbsTermNodeCon(sgDn, "T1")
1✔
1327
                tncDn := tnc.GetDn()
1✔
1328
                cTerm := apicapi.NewVnsAbsTermConn(tncDn)
1✔
1329
                cTermDn = cTerm.GetDn()
1✔
1330
                tnc.AddChild(cTerm)
1✔
1331
                tnc.AddChild(apicapi.NewVnsInTerm(tncDn))
1✔
1332
                tnc.AddChild(apicapi.NewVnsOutTerm(tncDn))
1✔
1333
                sg.AddChild(tnc)
1✔
1334
        }
1✔
1335
        {
1✔
1336
                tnp := apicapi.NewVnsAbsTermNodeProv(sgDn, "T2")
1✔
1337
                tnpDn := tnp.GetDn()
1✔
1338
                pTerm := apicapi.NewVnsAbsTermConn(tnpDn)
1✔
1339
                pTermDn = pTerm.GetDn()
1✔
1340
                tnp.AddChild(pTerm)
1✔
1341
                tnp.AddChild(apicapi.NewVnsInTerm(tnpDn))
1✔
1342
                tnp.AddChild(apicapi.NewVnsOutTerm(tnpDn))
1✔
1343
                sg.AddChild(tnp)
1✔
1344
        }
1✔
1345
        {
1✔
1346
                acc := apicapi.NewVnsAbsConnection(sgDn, "C1")
1✔
1347
                acc.SetAttr("connDir", "provider")
1✔
1348
                accDn := acc.GetDn()
1✔
1349
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, consDn))
1✔
1350
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, cTermDn))
1✔
1351
                sg.AddChild(acc)
1✔
1352
        }
1✔
1353
        {
1✔
1354
                acp := apicapi.NewVnsAbsConnection(sgDn, "C2")
1✔
1355
                acp.SetAttr("connDir", "provider")
1✔
1356
                acpDn := acp.GetDn()
1✔
1357
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, provDn))
1✔
1358
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, pTermDn))
1✔
1359
                sg.AddChild(acp)
1✔
1360
        }
1✔
1361
        return sg
1✔
1362
}
1363
func (cont *AciController) updateDeviceCluster() {
1✔
1364
        nodeMap := make(map[string]string)
1✔
1365
        cont.indexMutex.Lock()
1✔
1366
        for node := range cont.nodeOpflexDevice {
2✔
1367
                cont.log.Debug("Processing node in nodeOpflexDevice cache : ", node)
1✔
1368
                fabricPath, ok := cont.fabricPathForNode(node)
1✔
1369
                if !ok {
2✔
1370
                        continue
1✔
1371
                }
1372
                nodeMap[node] = fabricPath
1✔
1373
        }
1374

1375
        // For clusters other than OpenShift On OpenStack,
1376
        // openStackFabricPathDnMap will be empty
1377
        for host, opflexOdevInfo := range cont.openStackFabricPathDnMap {
1✔
1378
                nodeMap[host] = opflexOdevInfo.fabricPathDn
×
1379
        }
×
1380

1381
        // For OpenShift On OpenStack clusters,
1382
        // hostFabricPathDnMap will be empty
1383
        for _, hostInfo := range cont.hostFabricPathDnMap {
1✔
1384
                if hostInfo.fabricPathDn != "" {
×
1385
                        nodeMap[hostInfo.host] = hostInfo.fabricPathDn
×
1386
                }
×
1387
        }
1388
        cont.indexMutex.Unlock()
1✔
1389

1✔
1390
        var nodes []string
1✔
1391
        for node := range nodeMap {
2✔
1392
                nodes = append(nodes, node)
1✔
1393
        }
1✔
1394
        sort.Strings(nodes)
1✔
1395

1✔
1396
        name := cont.aciNameForKey("svc", "global")
1✔
1397
        var serviceObjs apicapi.ApicSlice
1✔
1398

1✔
1399
        // 1. Device cluster:
1✔
1400
        // The device cluster is a set of physical paths that need to be
1✔
1401
        // created for each node in the cluster, that correspond to the
1✔
1402
        // service interface for each node.
1✔
1403
        dc, dcDn := apicDeviceCluster(name, cont.config.AciVrfTenant,
1✔
1404
                cont.config.AciServicePhysDom, cont.config.AciServiceEncap,
1✔
1405
                nodes, nodeMap)
1✔
1406
        serviceObjs = append(serviceObjs, dc)
1✔
1407

1✔
1408
        // 2. Service graph template
1✔
1409
        // The service graph controls how the traffic will be redirected.
1✔
1410
        // A service graph must be created for each device cluster.
1✔
1411
        serviceObjs = append(serviceObjs,
1✔
1412
                apicServiceGraph(name, cont.config.AciVrfTenant, dcDn))
1✔
1413

1✔
1414
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1415
}
1416

1417
func (cont *AciController) fabricPathLogger(node string,
1418
        obj apicapi.ApicObject) *logrus.Entry {
1✔
1419
        return cont.log.WithFields(logrus.Fields{
1✔
1420
                "fabricPath": obj.GetAttr("fabricPathDn"),
1✔
1421
                "mac":        obj.GetAttr("mac"),
1✔
1422
                "node":       node,
1✔
1423
                "obj":        obj,
1✔
1424
        })
1✔
1425
}
1✔
1426

1427
func (cont *AciController) setOpenStackSystemId() string {
×
1428

×
1429
        // 1) get opflexIDEp with containerName == <node name of any one of the openshift nodes>
×
1430
        // 2) extract OpenStack system id from compHvDn attribute
×
1431
        //    comp/prov-OpenStack/ctrlr-[k8s-scale]-k8s-scale/hv-overcloud-novacompute-0 - sample compHvDn,
×
1432
        //    where k8s-scale is the system id
×
1433

×
1434
        var systemId string
×
1435
        nodeList := cont.nodeIndexer.List()
×
1436
        if len(nodeList) < 1 {
×
1437
                return systemId
×
1438
        }
×
1439
        node := nodeList[0].(*v1.Node)
×
1440
        nodeName := node.ObjectMeta.Name
×
1441
        opflexIDEpFilter := fmt.Sprintf("query-target-filter=and(eq(opflexIDEp.containerName,\"%s\"))", nodeName)
×
1442
        opflexIDEpArgs := []string{
×
1443
                opflexIDEpFilter,
×
1444
        }
×
1445
        url := fmt.Sprintf("/api/node/class/opflexIDEp.json?%s", strings.Join(opflexIDEpArgs, "&"))
×
1446
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
1447
        if err != nil {
×
1448
                cont.log.Error("Failed to get APIC response, err: ", err.Error())
×
1449
                return systemId
×
1450
        }
×
1451
        for _, obj := range apicresp.Imdata {
×
1452
                for _, body := range obj {
×
1453
                        compHvDn, ok := body.Attributes["compHvDn"].(string)
×
1454
                        if ok {
×
1455
                                systemId = compHvDn[strings.IndexByte(compHvDn, '[')+1 : strings.IndexByte(compHvDn, ']')]
×
1456
                                break
×
1457
                        }
1458
                }
1459
        }
1460
        cont.indexMutex.Lock()
×
1461
        cont.openStackSystemId = systemId
×
1462
        cont.log.Info("Setting OpenStack system id : ", cont.openStackSystemId)
×
1463
        cont.indexMutex.Unlock()
×
1464
        return systemId
×
1465
}
1466

1467
// Returns true when a new OpenStack opflexODev is added
1468
func (cont *AciController) openStackOpflexOdevUpdate(obj apicapi.ApicObject) bool {
×
1469

×
1470
        // If opflexOdev compHvDn contains comp/prov-OpenShift/ctrlr-[<systemid>]-<systemid>,
×
1471
        // it means that it is an OpenStack OpflexOdev which belongs to OpenStack with system id <systemid>
×
1472

×
1473
        var deviceClusterUpdate bool
×
1474
        compHvDn := obj.GetAttrStr("compHvDn")
×
1475
        if strings.Contains(compHvDn, "prov-OpenStack") {
×
1476
                cont.indexMutex.Lock()
×
1477
                systemId := cont.openStackSystemId
×
1478
                cont.indexMutex.Unlock()
×
1479
                if systemId == "" {
×
1480
                        systemId = cont.setOpenStackSystemId()
×
1481
                }
×
1482
                if systemId == "" {
×
1483
                        cont.log.Error("Failed  to get OpenStack system id")
×
1484
                        return deviceClusterUpdate
×
1485
                }
×
1486
                prefix := fmt.Sprintf("comp/prov-OpenStack/ctrlr-[%s]-%s", systemId, systemId)
×
1487
                if strings.Contains(compHvDn, prefix) {
×
1488
                        cont.log.Info("Received notification for OpenStack opflexODev update, hostName: ",
×
1489
                                obj.GetAttrStr("hostName"), " dn: ", obj.GetAttrStr("dn"))
×
1490
                        cont.indexMutex.Lock()
×
1491
                        opflexOdevInfo, ok := cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")]
×
1492
                        if ok {
×
1493
                                opflexOdevInfo.opflexODevDn[obj.GetAttrStr("dn")] = struct{}{}
×
1494
                                cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")] = opflexOdevInfo
×
1495
                        } else {
×
1496
                                var openstackopflexodevinfo openstackOpflexOdevInfo
×
1497
                                opflexODevDn := make(map[string]struct{})
×
1498
                                opflexODevDn[obj.GetAttrStr("dn")] = struct{}{}
×
1499
                                openstackopflexodevinfo.fabricPathDn = obj.GetAttrStr("fabricPathDn")
×
1500
                                openstackopflexodevinfo.opflexODevDn = opflexODevDn
×
1501
                                cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")] = openstackopflexodevinfo
×
1502
                                deviceClusterUpdate = true
×
1503
                        }
×
1504
                        cont.indexMutex.Unlock()
×
1505
                }
1506
        }
1507
        return deviceClusterUpdate
×
1508
}
1509

1510
func (cont *AciController) infraRtAttEntPDeleted(dn string) {
×
1511
        // dn format : uni/infra/attentp-k8s-scale-esxi-aaep/rtattEntP-[uni/infra/funcprof/accbundle-esxi1-vpc-ipg]
×
1512
        cont.log.Info("Processing delete of infraRtAttEntP: ", dn)
×
1513

×
1514
        // extract uni/infra/funcprof/accbundle-esxi1-vpc-ipg
×
1515
        re := regexp.MustCompile(`\[(.*?)\]`)
×
1516
        matches := re.FindStringSubmatch(dn)
×
1517

×
1518
        if len(matches) < 2 {
×
1519
                cont.log.Error("Failed to extract ipg from dn : ", dn)
×
1520
                return
×
1521
        }
×
1522
        tdn := matches[1]
×
1523

×
1524
        cont.indexMutex.Lock()
×
1525
        _, ok := cont.hostFabricPathDnMap[tdn]
×
1526
        if ok {
×
1527
                delete(cont.hostFabricPathDnMap, tdn)
×
1528
                cont.log.Info("Deleted ipg : ", tdn)
×
1529
        }
×
1530
        cont.indexMutex.Unlock()
×
1531

×
1532
        if ok {
×
1533
                cont.updateDeviceCluster()
×
1534
        }
×
1535
}
1536

1537
func (cont *AciController) vpcIfDeleted(dn string) {
×
1538
        var deleted bool
×
1539
        cont.indexMutex.Lock()
×
1540
        for tDn, hostInfo := range cont.hostFabricPathDnMap {
×
1541
                if _, present := hostInfo.vpcIfDn[dn]; present {
×
1542
                        cont.log.Info("Deleting vpcIf, dn :", dn)
×
1543
                        delete(hostInfo.vpcIfDn, dn)
×
1544
                        if len(hostInfo.vpcIfDn) == 0 {
×
1545
                                cont.log.Infof("Removing fabricPathDn(%s) of ipg : %s ", hostInfo.fabricPathDn, hostInfo.host)
×
1546
                                hostInfo.fabricPathDn = ""
×
1547
                                deleted = true
×
1548
                        }
×
1549
                        cont.hostFabricPathDnMap[tDn] = hostInfo
×
1550
                }
1551
        }
1552
        cont.indexMutex.Unlock()
×
1553
        if deleted {
×
1554
                cont.updateDeviceCluster()
×
1555
        }
×
1556
}
1557

1558
func (cont *AciController) vpcIfChanged(obj apicapi.ApicObject) {
×
1559
        if cont.updateHostFabricPathDnMap(obj) {
×
1560
                cont.updateDeviceCluster()
×
1561
        }
×
1562
}
1563

1564
func (cont *AciController) updateHostFabricPathDnMap(obj apicapi.ApicObject) bool {
×
1565
        var accBndlGrpDn, fabricPathDn, dn string
×
1566
        for _, body := range obj {
×
1567
                var ok bool
×
1568
                accBndlGrpDn, ok = body.Attributes["accBndlGrpDn"].(string)
×
1569
                if !ok || (ok && accBndlGrpDn == "") {
×
1570
                        cont.log.Error("accBndlGrpDn missing/empty in vpcIf")
×
1571
                        return false
×
1572
                }
×
1573
                fabricPathDn, ok = body.Attributes["fabricPathDn"].(string)
×
1574
                if !ok && (ok && fabricPathDn == "") {
×
1575
                        cont.log.Error("fabricPathDn missing/empty in vpcIf")
×
1576
                        return false
×
1577
                }
×
1578
                dn, ok = body.Attributes["dn"].(string)
×
1579
                if !ok && (ok && dn == "") {
×
1580
                        cont.log.Error("dn missing/empty in vpcIf")
×
1581
                        return false
×
1582
                }
×
1583
        }
1584
        var updated bool
×
1585
        cont.indexMutex.Lock()
×
1586
        // If accBndlGrpDn exists in hostFabricPathDnMap, the vpcIf belongs to the cluster AEP
×
1587
        hostInfo, exists := cont.hostFabricPathDnMap[accBndlGrpDn]
×
1588
        if exists {
×
1589
                if _, present := hostInfo.vpcIfDn[dn]; !present {
×
1590
                        hostInfo.vpcIfDn[dn] = struct{}{}
×
1591
                        cont.log.Infof("vpcIf processing, dn : %s, accBndlGrpDn: %s", dn, accBndlGrpDn)
×
1592
                }
×
1593
                if hostInfo.fabricPathDn != fabricPathDn {
×
1594
                        hostInfo.fabricPathDn = fabricPathDn
×
1595
                        cont.log.Info("Updated fabricPathDn of ipg :", hostInfo.host, " to: ", hostInfo.fabricPathDn)
×
1596
                        updated = true
×
1597
                }
×
1598
                cont.hostFabricPathDnMap[accBndlGrpDn] = hostInfo
×
1599
        }
1600
        cont.indexMutex.Unlock()
×
1601
        return updated
×
1602
}
1603

1604
func (cont *AciController) infraRtAttEntPChanged(obj apicapi.ApicObject) {
×
1605
        var tdn string
×
1606
        for _, body := range obj {
×
1607
                var ok bool
×
1608
                tdn, ok = body.Attributes["tDn"].(string)
×
1609
                if !ok || (ok && tdn == "") {
×
1610
                        cont.log.Error("tDn missing/empty in infraRtAttEntP")
×
1611
                        return
×
1612
                }
×
1613
        }
1614
        var updated bool
×
1615
        cont.log.Info("infraRtAttEntP updated, tDn : ", tdn)
×
1616

×
1617
        // tdn format for vpc : /uni/infra/funcprof/accbundle-esxi1-vpc-ipg
×
1618
        // tdn format for single leaf : /uni/infra/funcprof/accportgrp-IPG_CLIENT_SIM
×
1619

×
1620
        // Ignore processing of single leaf
×
1621
        if !strings.Contains(tdn, "/accbundle-") {
×
1622
                cont.log.Info("Skipping processing of infraRtAttEntP update, not applicable for non-VPC configuration: ", tdn)
×
1623
                return
×
1624
        }
×
1625

1626
        // extract esxi1-vpc-ipg
1627
        parts := strings.Split(tdn, "/")
×
1628
        lastPart := parts[len(parts)-1]
×
1629
        host := strings.TrimPrefix(lastPart, "accbundle-")
×
1630

×
1631
        // adding entry for ipg in hostFabricPathDnMap
×
1632
        cont.indexMutex.Lock()
×
1633
        _, exists := cont.hostFabricPathDnMap[tdn]
×
1634
        if !exists {
×
1635
                var hostInfo hostFabricInfo
×
1636
                hostInfo.host = host
×
1637
                hostInfo.vpcIfDn = make(map[string]struct{})
×
1638
                cont.hostFabricPathDnMap[tdn] = hostInfo
×
1639
        }
×
1640
        cont.indexMutex.Unlock()
×
1641

×
1642
        accBndlGrpFilter := fmt.Sprintf(`query-target-filter=and(eq(vpcIf.accBndlGrpDn,"%s"))`, tdn)
×
1643
        url := fmt.Sprintf("/api/class/vpcIf.json?%s", accBndlGrpFilter)
×
1644
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
1645
        if err != nil {
×
1646
                cont.log.Error("Failed to get APIC response, err: ", err.Error())
×
1647
                return
×
1648
        }
×
1649

1650
        for _, obj := range apicresp.Imdata {
×
1651
                if cont.updateHostFabricPathDnMap(obj) && !updated {
×
1652
                        updated = true
×
1653
                }
×
1654
        }
1655

1656
        if updated {
×
1657
                cont.updateDeviceCluster()
×
1658
        }
×
1659
        return
×
1660
}
1661

1662
func (cont *AciController) opflexDeviceChanged(obj apicapi.ApicObject) {
1✔
1663
        devType := obj.GetAttrStr("devType")
1✔
1664
        domName := obj.GetAttrStr("domName")
1✔
1665
        ctrlrName := obj.GetAttrStr("ctrlrName")
1✔
1666

1✔
1667
        if !cont.config.DisableServiceVlanPreprovisioning && strings.Contains(cont.config.Flavor, "openstack") {
1✔
1668
                if cont.openStackOpflexOdevUpdate(obj) {
×
1669
                        cont.log.Info("OpenStack opflexODev for ", obj.GetAttrStr("hostName"), " is added")
×
1670
                        cont.updateDeviceCluster()
×
1671
                }
×
1672
        }
1673
        if (devType == cont.env.OpFlexDeviceType()) && (domName == cont.config.AciVmmDomain) && (ctrlrName == cont.config.AciVmmController) {
2✔
1674
                cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Processing opflex device update")
1✔
1675
                if obj.GetAttrStr("state") == "disconnected" {
2✔
1676
                        cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Opflex device disconnected")
1✔
1677
                        cont.indexMutex.Lock()
1✔
1678
                        for node, devices := range cont.nodeOpflexDevice {
1✔
1679
                                if node == obj.GetAttrStr("hostName") {
×
1680
                                        for _, device := range devices {
×
1681
                                                if device.GetDn() == obj.GetDn() {
×
1682
                                                        device.SetAttr("state", "disconnected")
×
1683
                                                        cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Debug("Opflex device cache updated for disconnected node")
×
1684
                                                }
×
1685
                                        }
1686
                                        cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", devices)
×
1687
                                        break
×
1688
                                }
1689
                        }
1690
                        cont.indexMutex.Unlock()
1✔
1691
                        cont.updateDeviceCluster()
1✔
1692
                        return
1✔
1693
                }
1694
                var nodeUpdates []string
1✔
1695

1✔
1696
                cont.indexMutex.Lock()
1✔
1697
                nodefound := false
1✔
1698
                for node, devices := range cont.nodeOpflexDevice {
2✔
1699
                        found := false
1✔
1700

1✔
1701
                        if node == obj.GetAttrStr("hostName") {
2✔
1702
                                nodefound = true
1✔
1703
                        }
1✔
1704

1705
                        for i, device := range devices {
2✔
1706
                                if device.GetDn() != obj.GetDn() {
2✔
1707
                                        continue
1✔
1708
                                }
1709
                                found = true
1✔
1710

1✔
1711
                                if obj.GetAttrStr("hostName") != node {
2✔
1712
                                        cont.fabricPathLogger(node, device).
1✔
1713
                                                Debug("Moving opflex device from node")
1✔
1714

1✔
1715
                                        devices = append(devices[:i], devices[i+1:]...)
1✔
1716
                                        cont.nodeOpflexDevice[node] = devices
1✔
1717
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1718
                                        break
1✔
1719
                                } else if (device.GetAttrStr("mac") != obj.GetAttrStr("mac")) ||
1✔
1720
                                        (device.GetAttrStr("fabricPathDn") != obj.GetAttrStr("fabricPathDn")) ||
1✔
1721
                                        (device.GetAttrStr("state") != obj.GetAttrStr("state")) {
2✔
1722
                                        cont.fabricPathLogger(node, obj).
1✔
1723
                                                Debug("Updating opflex device")
1✔
1724

1✔
1725
                                        devices = append(append(devices[:i], devices[i+1:]...), obj)
1✔
1726
                                        cont.nodeOpflexDevice[node] = devices
1✔
1727
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1728
                                        break
1✔
1729
                                }
1730
                        }
1731
                        if !found && obj.GetAttrStr("hostName") == node {
2✔
1732
                                cont.fabricPathLogger(node, obj).
1✔
1733
                                        Debug("Appending opflex device")
1✔
1734

1✔
1735
                                devices = append(devices, obj)
1✔
1736
                                cont.nodeOpflexDevice[node] = devices
1✔
1737
                                nodeUpdates = append(nodeUpdates, node)
1✔
1738
                        }
1✔
1739
                }
1740
                if !nodefound {
2✔
1741
                        node := obj.GetAttrStr("hostName")
1✔
1742
                        cont.fabricPathLogger(node, obj).Debug("Adding opflex device")
1✔
1743
                        cont.nodeOpflexDevice[node] = apicapi.ApicSlice{obj}
1✔
1744
                        nodeUpdates = append(nodeUpdates, node)
1✔
1745
                }
1✔
1746
                cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", cont.nodeOpflexDevice[obj.GetAttrStr("hostName")])
1✔
1747
                cont.indexMutex.Unlock()
1✔
1748

1✔
1749
                for _, node := range nodeUpdates {
2✔
1750
                        cont.env.NodeServiceChanged(node)
1✔
1751
                        cont.erspanSyncOpflexDev()
1✔
1752
                }
1✔
1753
                cont.updateDeviceCluster()
1✔
1754
        }
1755
}
1756

1757
func (cont *AciController) postOpflexDeviceDelete(nodes []string) {
1✔
1758
        cont.updateDeviceCluster()
1✔
1759
        for _, node := range nodes {
2✔
1760
                cont.env.NodeServiceChanged(node)
1✔
1761
                cont.erspanSyncOpflexDev()
1✔
1762
        }
1✔
1763
}
1764

1765
func (cont *AciController) opflexDeviceDeleted(dn string) {
1✔
1766
        var nodeUpdates []string
1✔
1767
        var dnFound bool //to check if the dn belongs to this cluster
1✔
1768
        cont.log.Info("Processing opflex device delete notification of ", dn)
1✔
1769
        cont.indexMutex.Lock()
1✔
1770
        for node, devices := range cont.nodeOpflexDevice {
2✔
1771
                for i, device := range devices {
2✔
1772
                        if device.GetDn() != dn {
2✔
1773
                                continue
1✔
1774
                        }
1775
                        dnFound = true
1✔
1776
                        cont.fabricPathLogger(node, device).
1✔
1777
                                Debug("Deleting opflex device path")
1✔
1778
                        devices = append(devices[:i], devices[i+1:]...)
1✔
1779
                        cont.nodeOpflexDevice[node] = devices
1✔
1780
                        cont.log.Info("Deleted opflex device of node ", node, ": ", dn)
1✔
1781
                        nodeUpdates = append(nodeUpdates, node)
1✔
1782
                        break
1✔
1783
                }
1784
                if len(devices) == 0 {
2✔
1785
                        delete(cont.nodeOpflexDevice, node)
1✔
1786
                }
1✔
1787
        }
1788

1789
        // For clusters other than OpenShift On OpenStack,
1790
        // openStackFabricPathDnMap will be empty
1791
        for host, opflexOdevInfo := range cont.openStackFabricPathDnMap {
1✔
1792
                if _, ok := opflexOdevInfo.opflexODevDn[dn]; ok {
×
1793
                        cont.log.Info("Received OpenStack opflexODev delete notification for ", dn)
×
1794
                        delete(opflexOdevInfo.opflexODevDn, dn)
×
1795
                        if len(opflexOdevInfo.opflexODevDn) < 1 {
×
1796
                                delete(cont.openStackFabricPathDnMap, host)
×
1797
                                cont.log.Info("OpenStack opflexODev of host ", host, " is deleted from cache")
×
1798
                                dnFound = true
×
1799
                        } else {
×
1800
                                cont.openStackFabricPathDnMap[host] = opflexOdevInfo
×
1801
                        }
×
1802
                        break
×
1803
                }
1804
        }
1805
        cont.indexMutex.Unlock()
1✔
1806

1✔
1807
        if dnFound {
2✔
1808
                cont.postOpflexDeviceDelete(nodeUpdates)
1✔
1809
        }
1✔
1810
}
1811

1812
func (cont *AciController) writeApicSvc(key string, service *v1.Service) {
1✔
1813
        if cont.isCNOEnabled() {
1✔
1814
                return
×
1815
        }
×
1816
        aobj := apicapi.NewVmmInjectedSvc(cont.vmmDomainProvider(),
1✔
1817
                cont.config.AciVmmDomain, cont.config.AciVmmController,
1✔
1818
                service.Namespace, service.Name)
1✔
1819
        aobjDn := aobj.GetDn()
1✔
1820
        aobj.SetAttr("guid", string(service.UID))
1✔
1821

1✔
1822
        svcns := service.ObjectMeta.Namespace
1✔
1823
        _, exists, err := cont.namespaceIndexer.GetByKey(svcns)
1✔
1824
        if err != nil {
1✔
1825
                cont.log.Error("Failed to lookup ns : ", svcns, " ", err)
×
1826
                return
×
1827
        }
×
1828
        if !exists {
2✔
1829
                cont.log.Debug("Namespace of service ", service.ObjectMeta.Name, ": ", svcns, " doesn't exist, hence not sending an update to the APIC")
1✔
1830
                return
1✔
1831
        }
1✔
1832

1833
        if !cont.serviceEndPoints.SetServiceApicObject(aobj, service) {
2✔
1834
                return
1✔
1835
        }
1✔
1836
        var setApicSvcDnsName bool
1✔
1837
        if len(cont.config.ApicHosts) != 0 && apicapi.ApicVersion >= "5.1" {
1✔
1838
                setApicSvcDnsName = true
×
1839
        }
×
1840
        // APIC model only allows one of these
1841
        for _, ingress := range service.Status.LoadBalancer.Ingress {
1✔
1842
                if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
1843
                        aobj.SetAttr("lbIp", ingress.IP)
×
1844
                } else if ingress.Hostname != "" {
×
1845
                        ipList, err := net.LookupHost(ingress.Hostname)
×
1846
                        if err == nil && len(ipList) > 0 {
×
1847
                                aobj.SetAttr("lbIp", ipList[0])
×
1848
                        } else {
×
1849
                                cont.log.Errorf("Lookup: err: %v, ipList: %+v", err, ipList)
×
1850
                        }
×
1851
                }
1852
                break
×
1853
        }
1854
        if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != "None" {
2✔
1855
                aobj.SetAttr("clusterIp", service.Spec.ClusterIP)
1✔
1856
        }
1✔
1857

1858
        var t string
1✔
1859
        switch service.Spec.Type {
1✔
1860
        case v1.ServiceTypeClusterIP:
×
1861
                t = "clusterIp"
×
1862
        case v1.ServiceTypeNodePort:
×
1863
                t = "nodePort"
×
1864
        case v1.ServiceTypeLoadBalancer:
1✔
1865
                t = "loadBalancer"
1✔
1866
        case v1.ServiceTypeExternalName:
×
1867
                t = "externalName"
×
1868
        }
1869
        if t != "" {
2✔
1870
                aobj.SetAttr("type", t)
1✔
1871
        }
1✔
1872

1873
        if setApicSvcDnsName || cont.config.Flavor == "k8s-overlay" {
1✔
1874
                dnsName := fmt.Sprintf("%s.%s.svc.cluster.local", service.Name, service.Namespace)
×
1875

×
1876
                for _, ingress := range service.Status.LoadBalancer.Ingress {
×
1877
                        if ingress.Hostname != "" {
×
1878
                                aobj.SetAttr("dnsName", ingress.Hostname)
×
1879
                        } else if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
1880
                                aobj.SetAttr("dnsName", dnsName)
×
1881
                        }
×
1882
                }
1883
                if t == "clusterIp" || t == "nodePort" || t == "externalName" {
×
1884
                        aobj.SetAttr("dnsName", dnsName)
×
1885
                }
×
1886
        }
1887
        for _, port := range service.Spec.Ports {
2✔
1888
                proto := getProtocolStr(port.Protocol)
1✔
1889
                p := apicapi.NewVmmInjectedSvcPort(aobjDn,
1✔
1890
                        strconv.Itoa(int(port.Port)), proto, port.TargetPort.String())
1✔
1891
                p.SetAttr("nodePort", strconv.Itoa(int(port.NodePort)))
1✔
1892
                aobj.AddChild(p)
1✔
1893
        }
1✔
1894
        if cont.config.EnableVmmInjectedLabels && service.ObjectMeta.Labels != nil && apicapi.ApicVersion >= "5.2" {
1✔
1895
                for key, val := range service.ObjectMeta.Labels {
×
1896
                        newLabelKey := cont.aciNameForKey("label", key)
×
1897
                        label := apicapi.NewVmmInjectedLabel(aobj.GetDn(),
×
1898
                                newLabelKey, val)
×
1899
                        aobj.AddChild(label)
×
1900
                }
×
1901
        }
1902
        name := cont.aciNameForKey("service-vmm", key)
1✔
1903
        cont.log.Debug("Write Service Object: ", aobj)
1✔
1904
        cont.apicConn.WriteApicObjects(name, apicapi.ApicSlice{aobj})
1✔
1905
        cont.log.Debugf("svcObject: %+v", aobj)
1✔
1906
}
1907

1908
func removeAllConditions(conditions []metav1.Condition, conditionType string) []metav1.Condition {
1✔
1909
        i := 0
1✔
1910
        for _, cond := range conditions {
1✔
1911
                if cond.Type != conditionType {
×
1912
                        conditions[i] = cond
×
1913
                }
×
1914
        }
1915
        return conditions[:i]
1✔
1916
}
1917

1918
func (cont *AciController) updateServiceCondition(service *v1.Service, success bool, reason string, message string) bool {
1✔
1919
        conditionType := "LbIpamAllocation"
1✔
1920

1✔
1921
        var condition metav1.Condition
1✔
1922
        if success {
2✔
1923
                condition.Status = metav1.ConditionTrue
1✔
1924
        } else {
2✔
1925
                condition.Status = metav1.ConditionFalse
1✔
1926
                condition.Message = message
1✔
1927
        }
1✔
1928
        condition.Type = conditionType
1✔
1929
        condition.Reason = reason
1✔
1930
        condition.LastTransitionTime = metav1.Time{time.Now()}
1✔
1931
        for _, cond := range service.Status.Conditions {
2✔
1932
                if cond.Type == conditionType &&
1✔
1933
                        cond.Status == condition.Status &&
1✔
1934
                        cond.Message == condition.Message &&
1✔
1935
                        cond.Reason == condition.Reason {
2✔
1936
                        return false
1✔
1937
                }
1✔
1938
        }
1939

1940
        service.Status.Conditions = removeAllConditions(service.Status.Conditions, conditionType)
1✔
1941
        service.Status.Conditions = append(service.Status.Conditions, condition)
1✔
1942
        return true
1✔
1943
}
1944

1945
func (cont *AciController) validateRequestedIps(lbIpList []string) (net.IP, net.IP, bool) {
1✔
1946
        var ipv4, ipv6 net.IP
1✔
1947
        for _, lbIp := range lbIpList {
2✔
1948
                ip := net.ParseIP(lbIp)
1✔
1949
                if ip != nil {
2✔
1950
                        if ip.To4() != nil {
2✔
1951
                                if ipv4.Equal(net.IP{}) {
2✔
1952
                                        ipv4 = ip
1✔
1953
                                } else {
2✔
1954
                                        cont.log.Error("Annotation should have only one ipv4")
1✔
1955
                                        return ipv4, ipv6, false
1✔
1956
                                }
1✔
1957
                        } else if ip.To16() != nil {
2✔
1958
                                if ipv6.Equal(net.IP{}) {
2✔
1959
                                        ipv6 = ip
1✔
1960
                                } else {
2✔
1961
                                        cont.log.Error("Annotation should have only one ipv6")
1✔
1962
                                        return ipv4, ipv6, false
1✔
1963
                                }
1✔
1964
                        }
1965
                }
1966
        }
1967
        return ipv4, ipv6, true
1✔
1968
}
1969

1970
func (cont *AciController) returnUnusedStaticIngressIps(staticIngressIps, requestedIps []net.IP) {
1✔
1971
        for _, staticIp := range staticIngressIps {
2✔
1972
                found := false
1✔
1973
                for _, reqIp := range requestedIps {
2✔
1974
                        if reqIp.Equal(staticIp) {
2✔
1975
                                found = true
1✔
1976
                        }
1✔
1977
                }
1978
                if !found {
2✔
1979
                        returnIps(cont.staticServiceIps, []net.IP{staticIp})
1✔
1980
                }
1✔
1981
        }
1982
}
1983

1984
func (cont *AciController) allocateServiceIps(servicekey string,
1985
        service *v1.Service) bool {
1✔
1986
        logger := serviceLogger(cont.log, service)
1✔
1987
        cont.indexMutex.Lock()
1✔
1988
        meta, ok := cont.serviceMetaCache[servicekey]
1✔
1989
        if !ok {
2✔
1990
                meta = &serviceMeta{}
1✔
1991
                cont.serviceMetaCache[servicekey] = meta
1✔
1992

1✔
1993
                // Read any existing IPs and attempt to allocate them to the pod
1✔
1994
                for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
1995
                        ip := net.ParseIP(ingress.IP)
1✔
1996
                        if ip == nil {
1✔
1997
                                continue
×
1998
                        }
1999
                        if ip.To4() != nil {
2✔
2000
                                if cont.serviceIps.GetV4IpCache()[0].RemoveIp(ip) {
2✔
2001
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
2002
                                } else if cont.staticServiceIps.V4.RemoveIp(ip) {
3✔
2003
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
2004
                                }
1✔
2005
                        } else if ip.To16() != nil {
2✔
2006
                                if cont.serviceIps.GetV6IpCache()[0].RemoveIp(ip) {
2✔
2007
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
2008
                                } else if cont.staticServiceIps.V6.RemoveIp(ip) {
3✔
2009
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
2010
                                }
1✔
2011
                        }
2012
                }
2013
        }
2014

2015
        if !cont.serviceSyncEnabled {
2✔
2016
                cont.indexMutex.Unlock()
1✔
2017
                return false
1✔
2018
        }
1✔
2019

2020
        var requestedIps []net.IP
1✔
2021
        // try to give the requested load balancer IP to the pod
1✔
2022
        lbIps, ok := service.ObjectMeta.Annotations[metadata.LbIpAnnotation]
1✔
2023
        if ok {
2✔
2024
                lbIpList := strings.Split(lbIps, ",")
1✔
2025
                ipv4, ipv6, valid := cont.validateRequestedIps(lbIpList)
1✔
2026
                if valid {
2✔
2027
                        if ipv4 != nil {
2✔
2028
                                requestedIps = append(requestedIps, ipv4)
1✔
2029
                        }
1✔
2030
                        if ipv6 != nil {
2✔
2031
                                requestedIps = append(requestedIps, ipv6)
1✔
2032
                        }
1✔
2033
                } else {
1✔
2034
                        cont.returnServiceIps(meta.ingressIps)
1✔
2035
                        cont.log.Error("Invalid LB IP annotation for service ", servicekey)
1✔
2036
                        condUpdated := cont.updateServiceCondition(service, false, "InvalidAnnotation", "Invalid Loadbalancer IP annotation")
1✔
2037
                        if condUpdated {
2✔
2038
                                _, err := cont.updateServiceStatus(service)
1✔
2039
                                if err != nil {
1✔
2040
                                        logger.Error("Failed to update service status : ", err)
×
2041
                                        cont.indexMutex.Unlock()
×
2042
                                        return true
×
2043
                                }
×
2044
                        }
2045
                        cont.indexMutex.Unlock()
1✔
2046
                        return false
1✔
2047
                }
2048
        } else {
1✔
2049
                requestedIp := net.ParseIP(service.Spec.LoadBalancerIP)
1✔
2050
                if requestedIp != nil {
2✔
2051
                        requestedIps = append(requestedIps, requestedIp)
1✔
2052
                }
1✔
2053
        }
2054
        if len(requestedIps) > 0 {
2✔
2055
                meta.requestedIps = []net.IP{}
1✔
2056
                for _, requestedIp := range requestedIps {
2✔
2057
                        hasRequestedIp := false
1✔
2058
                        for _, ip := range meta.staticIngressIps {
2✔
2059
                                if reflect.DeepEqual(requestedIp, ip) {
2✔
2060
                                        hasRequestedIp = true
1✔
2061
                                }
1✔
2062
                        }
2063
                        if !hasRequestedIp {
2✔
2064
                                if requestedIp.To4() != nil &&
1✔
2065
                                        cont.staticServiceIps.V4.RemoveIp(requestedIp) {
2✔
2066
                                        hasRequestedIp = true
1✔
2067
                                } else if requestedIp.To16() != nil &&
2✔
2068
                                        cont.staticServiceIps.V6.RemoveIp(requestedIp) {
2✔
2069
                                        hasRequestedIp = true
1✔
2070
                                }
1✔
2071
                        }
2072
                        if hasRequestedIp {
2✔
2073
                                meta.requestedIps = append(meta.requestedIps, requestedIp)
1✔
2074
                        }
1✔
2075
                }
2076
                cont.returnUnusedStaticIngressIps(meta.staticIngressIps, meta.requestedIps)
1✔
2077
                meta.staticIngressIps = meta.requestedIps
1✔
2078
                cont.returnServiceIps(meta.ingressIps)
1✔
2079
                meta.ingressIps = nil
1✔
2080
                // If no requested ips are allocatable
1✔
2081
                if len(meta.requestedIps) < 1 {
2✔
2082
                        logger.Error("No Requested Ip addresses available for service ", servicekey)
1✔
2083
                        condUpdated := cont.updateServiceCondition(service, false, "RequestedIpsNotAllocatable", "The requested ips for loadbalancer service are not available or not in extern static range")
1✔
2084
                        if condUpdated {
2✔
2085
                                _, err := cont.updateServiceStatus(service)
1✔
2086
                                if err != nil {
1✔
2087
                                        cont.indexMutex.Unlock()
×
2088
                                        logger.Error("Failed to update service status: ", err)
×
2089
                                        return true
×
2090
                                }
×
2091
                        }
2092
                        cont.indexMutex.Unlock()
1✔
2093
                        return false
1✔
2094
                }
2095
        } else if len(meta.requestedIps) > 0 {
1✔
2096
                meta.requestedIps = nil
×
2097
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
×
2098
                meta.staticIngressIps = nil
×
2099
        }
×
2100
        ingressIps := make([]net.IP, 0)
1✔
2101
        ingressIps = append(ingressIps, meta.ingressIps...)
1✔
2102
        ingressIps = append(ingressIps, meta.staticIngressIps...)
1✔
2103

1✔
2104
        var ipv4, ipv6 net.IP
1✔
2105
        for _, ip := range ingressIps {
2✔
2106
                if ip.To4() != nil {
2✔
2107
                        ipv4 = ip
1✔
2108
                } else if ip.To16() != nil {
3✔
2109
                        ipv6 = ip
1✔
2110
                }
1✔
2111
        }
2112
        var clusterIPv4, clusterIPv6 net.IP
1✔
2113
        clusterIPs := append([]string{service.Spec.ClusterIP}, service.Spec.ClusterIPs...)
1✔
2114
        for _, ipStr := range clusterIPs {
2✔
2115
                ip := net.ParseIP(ipStr)
1✔
2116
                if ip == nil {
1✔
2117
                        continue
×
2118
                }
2119
                if ip.To4() != nil && clusterIPv4 == nil {
2✔
2120
                        clusterIPv4 = ip
1✔
2121
                } else if ip.To16() != nil && strings.Contains(ip.String(), ":") && clusterIPv6 == nil {
3✔
2122
                        clusterIPv6 = ip
1✔
2123
                }
1✔
2124
        }
2125
        if clusterIPv4 != nil && ipv4 == nil {
2✔
2126
                if len(requestedIps) < 1 {
2✔
2127
                        ipv4, _ = cont.serviceIps.AllocateIp(true)
1✔
2128
                        if ipv4 != nil {
2✔
2129
                                ingressIps = append(ingressIps, ipv4)
1✔
2130
                        }
1✔
2131
                }
2132
        } else if clusterIPv4 == nil && ipv4 != nil {
1✔
2133
                cont.removeIpFromIngressIPList(&ingressIps, ipv4)
×
2134
        }
×
2135

2136
        if clusterIPv6 != nil && ipv6 == nil {
2✔
2137
                if len(requestedIps) < 1 {
2✔
2138
                        ipv6, _ = cont.serviceIps.AllocateIp(false)
1✔
2139
                        if ipv6 != nil {
2✔
2140
                                ingressIps = append(ingressIps, ipv6)
1✔
2141
                        }
1✔
2142
                }
2143
        } else if clusterIPv6 == nil && ipv6 != nil {
1✔
2144
                cont.removeIpFromIngressIPList(&ingressIps, ipv6)
×
2145
        }
×
2146

2147
        if len(requestedIps) < 1 {
2✔
2148
                meta.ingressIps = ingressIps
1✔
2149
        }
1✔
2150
        if ipv4 == nil && ipv6 == nil {
2✔
2151
                logger.Error("No IP addresses available for service")
1✔
2152
                cont.indexMutex.Unlock()
1✔
2153
                return true
1✔
2154
        }
1✔
2155
        cont.indexMutex.Unlock()
1✔
2156
        var newIngress []v1.LoadBalancerIngress
1✔
2157
        for _, ip := range meta.ingressIps {
2✔
2158
                newIngress = append(newIngress, v1.LoadBalancerIngress{IP: ip.String()})
1✔
2159
        }
1✔
2160
        for _, ip := range meta.staticIngressIps {
2✔
2161
                newIngress = append(newIngress, v1.LoadBalancerIngress{IP: ip.String()})
1✔
2162
        }
1✔
2163

2164
        ipUpdated := false
1✔
2165
        if !reflect.DeepEqual(newIngress, service.Status.LoadBalancer.Ingress) {
2✔
2166
                service.Status.LoadBalancer.Ingress = newIngress
1✔
2167

1✔
2168
                logger.WithFields(logrus.Fields{
1✔
2169
                        "status": service.Status.LoadBalancer.Ingress,
1✔
2170
                }).Info("Updating service load balancer status")
1✔
2171

1✔
2172
                ipUpdated = true
1✔
2173
        }
1✔
2174

2175
        success := true
1✔
2176
        reason := "Success"
1✔
2177
        message := ""
1✔
2178
        if len(requestedIps) > 0 && len(requestedIps) != len(meta.staticIngressIps) {
1✔
2179
                success = false
×
2180
                reason = "OneIpNotAllocatable"
×
2181
                message = "One of the requested Ips is not allocatable"
×
2182
        }
×
2183
        condUpdated := cont.updateServiceCondition(service, success, reason, message)
1✔
2184
        if ipUpdated || condUpdated {
2✔
2185
                _, err := cont.updateServiceStatus(service)
1✔
2186
                if err != nil {
1✔
2187
                        logger.Error("Failed to update service status: ", err)
×
2188
                        return true
×
2189
                }
×
2190
        }
2191
        return false
1✔
2192
}
2193

2194
func (cont *AciController) handleServiceDelete(servicekey string) bool {
1✔
2195
        if cont.isCNOEnabled() {
1✔
2196
                return false
×
2197
        }
×
2198
        cont.clearLbService(servicekey)
1✔
2199
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("service-vmm",
1✔
2200
                servicekey))
1✔
2201
        return false
1✔
2202
}
2203

2204
func (cont *AciController) handleServiceUpdate(service *v1.Service) bool {
1✔
2205
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2206
        if err != nil {
1✔
2207
                serviceLogger(cont.log, service).
×
2208
                        Error("Could not create service key: ", err)
×
2209
                return false
×
2210
        }
×
2211
        if cont.isCNOEnabled() {
1✔
2212
                return false
×
2213
        }
×
2214
        var requeue bool
1✔
2215
        isLoadBalancer := service.Spec.Type == v1.ServiceTypeLoadBalancer
1✔
2216
        if isLoadBalancer {
2✔
2217
                if *cont.config.AllocateServiceIps {
2✔
2218
                        requeue = cont.allocateServiceIps(servicekey, service)
1✔
2219
                }
1✔
2220
                cont.indexMutex.Lock()
1✔
2221
                if cont.serviceSyncEnabled {
2✔
2222
                        cont.indexMutex.Unlock()
1✔
2223
                        err = cont.updateServiceDeviceInstance(servicekey, service)
1✔
2224
                        if err != nil {
1✔
2225
                                serviceLogger(cont.log, service).
×
2226
                                        Error("Failed to update service device Instance: ", err)
×
2227
                                return true
×
2228
                        }
×
2229
                } else {
1✔
2230
                        cont.indexMutex.Unlock()
1✔
2231
                }
1✔
2232
        } else {
1✔
2233
                cont.clearLbService(servicekey)
1✔
2234
        }
1✔
2235
        cont.writeApicSvc(servicekey, service)
1✔
2236
        return requeue
1✔
2237
}
2238

2239
func (cont *AciController) clearLbService(servicekey string) {
1✔
2240
        cont.indexMutex.Lock()
1✔
2241
        if meta, ok := cont.serviceMetaCache[servicekey]; ok {
2✔
2242
                cont.returnServiceIps(meta.ingressIps)
1✔
2243
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
1✔
2244
                delete(cont.serviceMetaCache, servicekey)
1✔
2245
        }
1✔
2246
        cont.indexMutex.Unlock()
1✔
2247
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("svc", servicekey))
1✔
2248
}
2249

2250
func getEndpointsIps(endpoints *v1.Endpoints) map[string]bool {
1✔
2251
        ips := make(map[string]bool)
1✔
2252
        for _, subset := range endpoints.Subsets {
2✔
2253
                for _, addr := range subset.Addresses {
2✔
2254
                        ips[addr.IP] = true
1✔
2255
                }
1✔
2256
                for _, addr := range subset.NotReadyAddresses {
1✔
2257
                        ips[addr.IP] = true
×
2258
                }
×
2259
        }
2260
        return ips
1✔
2261
}
2262

2263
func (cont *AciController) processServiceTargetPorts(service *v1.Service, svcKey string, old bool) map[string]targetPort {
1✔
2264
        ports := make(map[string]targetPort)
1✔
2265
        for _, port := range service.Spec.Ports {
2✔
2266
                var key string
1✔
2267
                portnums := make(map[int]bool)
1✔
2268

1✔
2269
                if port.TargetPort.Type == intstr.String {
1✔
2270
                        entry, exists := cont.namedPortServiceIndex[svcKey]
×
2271
                        if !old {
×
2272
                                if !exists {
×
2273
                                        cont.log.Debugf("Creating named port index for service: %s, port: %s", svcKey, port.Name)
×
2274
                                        newEntry := make(namedPortServiceIndexEntry)
×
2275
                                        entry = &newEntry
×
2276
                                }
×
2277
                                (*entry)[port.Name] = &namedPortServiceIndexPort{
×
2278
                                        targetPortName: port.TargetPort.String(),
×
2279
                                        resolvedPorts:  make(map[int]bool),
×
2280
                                }
×
2281
                                cont.namedPortServiceIndex[svcKey] = entry
×
2282
                        } else if exists {
×
2283
                                delete(*entry, port.Name)
×
2284
                                cont.log.Debugf("Removed named port index for service: %s port: %s, entry: %v", svcKey, port.Name, entry)
×
2285
                                if len(*entry) == 0 {
×
2286
                                        delete(cont.namedPortServiceIndex, svcKey)
×
2287
                                } else {
×
2288
                                        cont.namedPortServiceIndex[svcKey] = entry
×
2289
                                }
×
2290
                        }
2291
                        key = portProto(&port.Protocol) + "-name-" + port.TargetPort.String()
×
2292
                } else {
1✔
2293
                        portNum := port.TargetPort.IntValue()
1✔
2294
                        if portNum <= 0 {
2✔
2295
                                portNum = int(port.Port)
1✔
2296
                        }
1✔
2297
                        key = portProto(&port.Protocol) + "-num-" + strconv.Itoa(portNum)
1✔
2298
                        portnums[portNum] = true
1✔
2299
                }
2300

2301
                ports[key] = targetPort{
1✔
2302
                        proto: port.Protocol,
1✔
2303
                        ports: portnums,
1✔
2304
                }
1✔
2305
        }
2306
        return ports
1✔
2307
}
2308

2309
func (cont *AciController) endpointsAdded(obj interface{}) {
1✔
2310
        endpoints := obj.(*v1.Endpoints)
1✔
2311
        servicekey, err := cache.MetaNamespaceKeyFunc(obj.(*v1.Endpoints))
1✔
2312
        if err != nil {
1✔
2313
                cont.log.Error("Could not create service key: ", err)
×
2314
                return
×
2315
        }
×
2316

2317
        ips := getEndpointsIps(endpoints)
1✔
2318
        cont.indexMutex.Lock()
1✔
2319
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
2320
        cont.queueIPNetPolUpdates(ips)
1✔
2321
        cont.indexMutex.Unlock()
1✔
2322

1✔
2323
        cont.queueEndpointsNetPolUpdates(endpoints)
1✔
2324

1✔
2325
        cont.queueServiceUpdateByKey(servicekey)
1✔
2326
}
2327

2328
func (cont *AciController) endpointsDeleted(obj interface{}) {
1✔
2329
        endpoints, isEndpoints := obj.(*v1.Endpoints)
1✔
2330
        if !isEndpoints {
1✔
2331
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2332
                if !ok {
×
2333
                        cont.log.Error("Received unexpected object: ", obj)
×
2334
                        return
×
2335
                }
×
2336
                endpoints, ok = deletedState.Obj.(*v1.Endpoints)
×
2337
                if !ok {
×
2338
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpoints object: ", deletedState.Obj)
×
2339
                        return
×
2340
                }
×
2341
        }
2342
        servicekey, err := cache.MetaNamespaceKeyFunc(endpoints)
1✔
2343
        if err != nil {
1✔
2344
                cont.log.Error("Could not create service key: ", err)
×
2345
                return
×
2346
        }
×
2347

2348
        ips := getEndpointsIps(endpoints)
1✔
2349
        cont.indexMutex.Lock()
1✔
2350
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
2351
        cont.queueIPNetPolUpdates(ips)
1✔
2352
        cont.indexMutex.Unlock()
1✔
2353

1✔
2354
        cont.queueEndpointsNetPolUpdates(endpoints)
1✔
2355

1✔
2356
        cont.queueServiceUpdateByKey(servicekey)
1✔
2357
}
2358

2359
func (cont *AciController) endpointsUpdated(oldEps, newEps interface{}) {
1✔
2360
        oldendpoints := oldEps.(*v1.Endpoints)
1✔
2361
        newendpoints := newEps.(*v1.Endpoints)
1✔
2362
        servicekey, err := cache.MetaNamespaceKeyFunc(newendpoints)
1✔
2363
        if err != nil {
1✔
2364
                cont.log.Error("Could not create service key: ", err)
×
2365
                return
×
2366
        }
×
2367

2368
        oldIps := getEndpointsIps(oldendpoints)
1✔
2369
        newIps := getEndpointsIps(newendpoints)
1✔
2370
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
2371
                cont.indexMutex.Lock()
1✔
2372
                cont.queueIPNetPolUpdates(oldIps)
1✔
2373
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
2374
                cont.queueIPNetPolUpdates(newIps)
1✔
2375
                cont.indexMutex.Unlock()
1✔
2376
        }
1✔
2377

2378
        if !reflect.DeepEqual(oldendpoints.Subsets, newendpoints.Subsets) {
2✔
2379
                cont.queueEndpointsNetPolUpdates(oldendpoints)
1✔
2380
                cont.queueEndpointsNetPolUpdates(newendpoints)
1✔
2381
        }
1✔
2382

2383
        cont.queueServiceUpdateByKey(servicekey)
1✔
2384
}
2385

2386
func (cont *AciController) serviceAdded(obj interface{}) {
1✔
2387
        service := obj.(*v1.Service)
1✔
2388
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2389
        if err != nil {
1✔
2390
                serviceLogger(cont.log, service).
×
2391
                        Error("Could not create service key: ", err)
×
2392
                return
×
2393
        }
×
2394

2395
        cont.indexMutex.Lock()
1✔
2396
        ports := cont.processServiceTargetPorts(service, servicekey, false)
1✔
2397
        cont.queuePortNetPolUpdates(ports)
1✔
2398
        cont.updateTargetPortIndex(true, servicekey, nil, ports)
1✔
2399
        cont.indexMutex.Unlock()
1✔
2400

1✔
2401
        cont.queueServiceUpdateByKey(servicekey)
1✔
2402
}
2403

2404
func (cont *AciController) serviceUpdated(oldSvc, newSvc interface{}) {
1✔
2405
        oldservice := oldSvc.(*v1.Service)
1✔
2406
        newservice := newSvc.(*v1.Service)
1✔
2407
        servicekey, err := cache.MetaNamespaceKeyFunc(newservice)
1✔
2408
        if err != nil {
1✔
2409
                serviceLogger(cont.log, newservice).
×
2410
                        Error("Could not create service key: ", err)
×
2411
                return
×
2412
        }
×
2413
        if !reflect.DeepEqual(oldservice.Spec.Ports, newservice.Spec.Ports) {
1✔
2414
                cont.indexMutex.Lock()
×
2415
                oldPorts := cont.processServiceTargetPorts(oldservice, servicekey, true)
×
2416
                newPorts := cont.processServiceTargetPorts(newservice, servicekey, false)
×
2417
                cont.queuePortNetPolUpdates(oldPorts)
×
2418
                cont.updateTargetPortIndex(true, servicekey, oldPorts, newPorts)
×
2419
                cont.queuePortNetPolUpdates(newPorts)
×
2420
                cont.indexMutex.Unlock()
×
2421
        }
×
2422
        cont.queueServiceUpdateByKey(servicekey)
1✔
2423
}
2424

2425
func (cont *AciController) serviceDeleted(obj interface{}) {
1✔
2426
        service, isService := obj.(*v1.Service)
1✔
2427
        if !isService {
1✔
2428
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2429
                if !ok {
×
2430
                        serviceLogger(cont.log, service).
×
2431
                                Error("Received unexpected object: ", obj)
×
2432
                        return
×
2433
                }
×
2434
                service, ok = deletedState.Obj.(*v1.Service)
×
2435
                if !ok {
×
2436
                        serviceLogger(cont.log, service).
×
2437
                                Error("DeletedFinalStateUnknown contained non-Services object: ", deletedState.Obj)
×
2438
                        return
×
2439
                }
×
2440
        }
2441
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2442
        if err != nil {
1✔
2443
                serviceLogger(cont.log, service).
×
2444
                        Error("Could not create service key: ", err)
×
2445
                return
×
2446
        }
×
2447

2448
        cont.indexMutex.Lock()
1✔
2449
        ports := cont.processServiceTargetPorts(service, servicekey, true)
1✔
2450
        cont.updateTargetPortIndex(true, servicekey, ports, nil)
1✔
2451
        cont.queuePortNetPolUpdates(ports)
1✔
2452
        delete(cont.snatServices, servicekey)
1✔
2453
        cont.indexMutex.Unlock()
1✔
2454

1✔
2455
        deletedServiceKey := "DELETED_" + servicekey
1✔
2456
        cont.queueServiceUpdateByKey(deletedServiceKey)
1✔
2457
}
2458

2459
func (cont *AciController) serviceFullSync() {
1✔
2460
        cache.ListAll(cont.serviceIndexer, labels.Everything(),
1✔
2461
                func(sobj interface{}) {
2✔
2462
                        cont.queueServiceUpdate(sobj.(*v1.Service))
1✔
2463
                })
1✔
2464
}
2465

2466
func (cont *AciController) getEndpointSliceIps(endpointSlice *discovery.EndpointSlice) map[string]bool {
1✔
2467
        ips := make(map[string]bool)
1✔
2468
        for _, endpoints := range endpointSlice.Endpoints {
2✔
2469
                for _, addr := range endpoints.Addresses {
2✔
2470
                        ips[addr] = true
1✔
2471
                }
1✔
2472
        }
2473
        return ips
1✔
2474
}
2475

2476
func (cont *AciController) notReadyEndpointPresent(endpointSlice *discovery.EndpointSlice) bool {
×
2477
        for _, endpoints := range endpointSlice.Endpoints {
×
2478
                if (endpoints.Conditions.Ready != nil && !*endpoints.Conditions.Ready) &&
×
2479
                        (endpoints.Conditions.Terminating == nil || !*endpoints.Conditions.Terminating) {
×
2480
                        return true
×
2481
                }
×
2482
        }
2483
        return false
×
2484
}
2485

2486
func (cont *AciController) getEndpointSliceEpIps(endpoints *discovery.Endpoint) map[string]bool {
×
2487
        ips := make(map[string]bool)
×
2488
        for _, addr := range endpoints.Addresses {
×
2489
                ips[addr] = true
×
2490
        }
×
2491
        return ips
×
2492
}
2493

2494
func (cont *AciController) processDelayedEpSlices() {
1✔
2495
        var processEps []DelayedEpSlice
1✔
2496
        cont.indexMutex.Lock()
1✔
2497
        for i := 0; i < len(cont.delayedEpSlices); i++ {
1✔
2498
                delayedepslice := cont.delayedEpSlices[i]
×
2499
                if time.Now().After(delayedepslice.DelayedTime) {
×
2500
                        var toprocess DelayedEpSlice
×
2501
                        err := util.DeepCopyObj(&delayedepslice, &toprocess)
×
2502
                        if err != nil {
×
2503
                                cont.log.Error(err)
×
2504
                                continue
×
2505
                        }
2506
                        processEps = append(processEps, toprocess)
×
2507
                        cont.delayedEpSlices = append(cont.delayedEpSlices[:i], cont.delayedEpSlices[i+1:]...)
×
2508
                }
2509
        }
2510

2511
        cont.indexMutex.Unlock()
1✔
2512
        for _, epslice := range processEps {
1✔
2513
                //ignore the epslice if newly added endpoint is not ready
×
2514
                if cont.notReadyEndpointPresent(epslice.NewEpSlice) {
×
2515
                        cont.log.Debug("Ignoring the update as the new endpoint is not ready : ", epslice.NewEpSlice)
×
2516
                } else {
×
2517
                        cont.log.Debug("Processing update of epslice : ", epslice.NewEpSlice)
×
2518
                        cont.doendpointSliceUpdated(epslice.OldEpSlice, epslice.NewEpSlice)
×
2519
                }
×
2520
        }
2521
}
2522

2523
func (cont *AciController) resolveServiceNamedPortFromEpSlice(epSlice *discovery.EndpointSlice, serviceKey string, old bool) {
1✔
2524
        indexEntry, ok := cont.namedPortServiceIndex[serviceKey]
1✔
2525
        if !ok {
2✔
2526
                return
1✔
2527
        }
1✔
2528
        for _, port := range epSlice.Ports {
×
2529
                if port.Name == nil || port.Port == nil {
×
2530
                        continue
×
2531
                }
2532
                if portEntry, ok := (*indexEntry)[*port.Name]; ok && portEntry != nil {
×
2533
                        portNum := int(*port.Port)
×
2534
                        if old {
×
2535
                                delete(portEntry.resolvedPorts, portNum)
×
2536
                                cont.log.Debugf("Deleting port: %d from service %s resolved target port. Resolved ports: %v", portNum, serviceKey, portEntry.resolvedPorts)
×
2537
                        } else {
×
2538
                                portEntry.resolvedPorts[portNum] = true
×
2539
                                cont.log.Debugf("Adding port: %d to service %s resolved target port. Resolved ports: %v", portNum, serviceKey, portEntry.resolvedPorts)
×
2540
                        }
×
2541
                        key := portProto(port.Protocol) + "-num-" + strconv.Itoa(portNum)
×
2542
                        targetPortIndexEntry := cont.targetPortIndex[key]
×
2543
                        if targetPortIndexEntry == nil && len(portEntry.resolvedPorts) == 1 {
×
2544
                                targetPortIndexEntry = &portIndexEntry{
×
2545
                                        port: targetPort{
×
2546
                                                proto: *port.Protocol,
×
2547
                                                ports: make(map[int]bool),
×
2548
                                        },
×
2549
                                        serviceKeys:       make(map[string]bool),
×
2550
                                        networkPolicyKeys: make(map[string]bool),
×
2551
                                }
×
2552
                                targetPortIndexEntry.port.ports[portNum] = true
×
2553
                                cont.targetPortIndex[key] = targetPortIndexEntry
×
2554
                        }
×
2555
                        if targetPortIndexEntry != nil {
×
2556
                                if len(portEntry.resolvedPorts) == 1 {
×
2557
                                        targetPortIndexEntry.serviceKeys[serviceKey] = true
×
2558
                                } else {
×
2559
                                        delete(targetPortIndexEntry.serviceKeys, serviceKey)
×
2560
                                }
×
2561
                        }
2562
                }
2563
        }
2564
}
2565
func (cont *AciController) endpointSliceAdded(obj interface{}) {
1✔
2566
        endpointslice, ok := obj.(*discovery.EndpointSlice)
1✔
2567
        if !ok {
1✔
2568
                cont.log.Error("error processing Endpointslice object: ", obj)
×
2569
                return
×
2570
        }
×
2571
        servicekey, valid := getServiceKey(endpointslice)
1✔
2572
        if !valid {
1✔
2573
                return
×
2574
        }
×
2575
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
2576
        cont.indexMutex.Lock()
1✔
2577
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
2578
        cont.resolveServiceNamedPortFromEpSlice(endpointslice, servicekey, false)
1✔
2579
        cont.queueIPNetPolUpdates(ips)
1✔
2580
        cont.indexMutex.Unlock()
1✔
2581

1✔
2582
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
2583

1✔
2584
        cont.queueServiceUpdateByKey(servicekey)
1✔
2585
        cont.log.Info("EndPointSlice Object Added: ", servicekey)
1✔
2586
}
2587

2588
func (cont *AciController) endpointSliceDeleted(obj interface{}) {
1✔
2589
        endpointslice, isEndpointslice := obj.(*discovery.EndpointSlice)
1✔
2590
        if !isEndpointslice {
1✔
2591
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2592
                if !ok {
×
2593
                        cont.log.Error("Received unexpected object: ", obj)
×
2594
                        return
×
2595
                }
×
2596
                endpointslice, ok = deletedState.Obj.(*discovery.EndpointSlice)
×
2597
                if !ok {
×
2598
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpointslice object: ", deletedState.Obj)
×
2599
                        return
×
2600
                }
×
2601
        }
2602
        servicekey, valid := getServiceKey(endpointslice)
1✔
2603
        if !valid {
1✔
2604
                return
×
2605
        }
×
2606
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
2607
        cont.indexMutex.Lock()
1✔
2608
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
2609
        cont.resolveServiceNamedPortFromEpSlice(endpointslice, servicekey, true)
1✔
2610
        cont.queueIPNetPolUpdates(ips)
1✔
2611
        cont.indexMutex.Unlock()
1✔
2612
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
2613
        cont.queueServiceUpdateByKey(servicekey)
1✔
2614
}
2615

2616
// Checks if the given service is present in the user configured list of services
2617
// for pbr delay and if present, returns the servie specific delay if configured
2618
func (cont *AciController) svcInAddDelayList(name, ns string) (int, bool) {
×
2619
        for _, svc := range cont.config.ServiceGraphEndpointAddDelay.Services {
×
2620
                if svc.Name == name && svc.Namespace == ns {
×
2621
                        return svc.Delay, true
×
2622
                }
×
2623
        }
2624
        return 0, false
×
2625
}
2626

2627
// Check if the endpointslice update notification has any deletion of enpoint
2628
func (cont *AciController) isDeleteEndpointSlice(oldendpointslice, newendpointslice *discovery.EndpointSlice) bool {
×
2629
        del := false
×
2630

×
2631
        // if any endpoint is removed from endpontslice
×
2632
        if len(newendpointslice.Endpoints) < len(oldendpointslice.Endpoints) {
×
2633
                del = true
×
2634
        }
×
2635

2636
        if !del {
×
2637
                // if any one of the endpoint is in terminating state
×
2638
                for _, endpoint := range newendpointslice.Endpoints {
×
2639
                        if endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating {
×
2640
                                del = true
×
2641
                                break
×
2642
                        }
2643
                }
2644
        }
2645
        if !del {
×
2646
                // if any one of endpoint moved from ready state to not-ready state
×
2647
                for ix := range oldendpointslice.Endpoints {
×
2648
                        oldips := cont.getEndpointSliceEpIps(&oldendpointslice.Endpoints[ix])
×
2649
                        for newIx := range newendpointslice.Endpoints {
×
2650
                                newips := cont.getEndpointSliceEpIps(&newendpointslice.Endpoints[newIx])
×
2651
                                if reflect.DeepEqual(oldips, newips) {
×
2652
                                        if (oldendpointslice.Endpoints[ix].Conditions.Ready != nil && *oldendpointslice.Endpoints[ix].Conditions.Ready) &&
×
2653
                                                (newendpointslice.Endpoints[newIx].Conditions.Ready != nil && !*newendpointslice.Endpoints[newIx].Conditions.Ready) {
×
2654
                                                del = true
×
2655
                                        }
×
2656
                                        break
×
2657
                                }
2658
                        }
2659
                }
2660
        }
2661
        return del
×
2662
}
2663

2664
func (cont *AciController) doendpointSliceUpdatedDelay(oldendpointslice *discovery.EndpointSlice,
2665
        newendpointslice *discovery.EndpointSlice) {
×
2666
        svc, ns, valid := getServiceNameAndNs(newendpointslice)
×
2667
        if !valid {
×
2668
                return
×
2669
        }
×
2670
        svckey, valid := getServiceKey(newendpointslice)
×
2671
        if !valid {
×
2672
                return
×
2673
        }
×
2674
        delay := cont.config.ServiceGraphEndpointAddDelay.Delay
×
2675
        svcDelay, exists := cont.svcInAddDelayList(svc, ns)
×
2676
        if svcDelay > 0 {
×
2677
                delay = svcDelay
×
2678
        }
×
2679
        delayedsvc := exists && delay > 0
×
2680
        if delayedsvc {
×
2681
                cont.log.Debug("Delay of ", delay, " seconds is applicable for svc :", svc, " in ns: ", ns)
×
2682
                var delayedepslice DelayedEpSlice
×
2683
                delayedepslice.OldEpSlice = oldendpointslice
×
2684
                delayedepslice.ServiceKey = svckey
×
2685
                delayedepslice.NewEpSlice = newendpointslice
×
2686
                currentTime := time.Now()
×
2687
                delayedepslice.DelayedTime = currentTime.Add(time.Duration(delay) * time.Second)
×
2688
                cont.indexMutex.Lock()
×
2689
                cont.delayedEpSlices = append(cont.delayedEpSlices, &delayedepslice)
×
2690
                cont.indexMutex.Unlock()
×
2691
        } else {
×
2692
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
2693
        }
×
2694

2695
        if delayedsvc && cont.isDeleteEndpointSlice(oldendpointslice, newendpointslice) {
×
2696
                cont.log.Debug("Proceeding by ignoring delay as the update is due to delete of endpoint")
×
2697
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
2698
        }
×
2699
}
2700

2701
func (cont *AciController) endpointSliceUpdated(oldobj, newobj interface{}) {
1✔
2702
        oldendpointslice, ok := oldobj.(*discovery.EndpointSlice)
1✔
2703
        if !ok {
1✔
2704
                cont.log.Error("error processing Endpointslice object: ", oldobj)
×
2705
                return
×
2706
        }
×
2707
        newendpointslice, ok := newobj.(*discovery.EndpointSlice)
1✔
2708
        if !ok {
1✔
2709
                cont.log.Error("error processing Endpointslice object: ", newobj)
×
2710
                return
×
2711
        }
×
2712
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
2713
                cont.doendpointSliceUpdatedDelay(oldendpointslice, newendpointslice)
×
2714
        } else {
1✔
2715
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
1✔
2716
        }
1✔
2717
}
2718

2719
func (cont *AciController) doendpointSliceUpdated(oldendpointslice *discovery.EndpointSlice,
2720
        newendpointslice *discovery.EndpointSlice) {
1✔
2721
        servicekey, valid := getServiceKey(newendpointslice)
1✔
2722
        if !valid {
1✔
2723
                return
×
2724
        }
×
2725
        oldIps := cont.getEndpointSliceIps(oldendpointslice)
1✔
2726
        newIps := cont.getEndpointSliceIps(newendpointslice)
1✔
2727
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
2728
                cont.indexMutex.Lock()
1✔
2729
                cont.resolveServiceNamedPortFromEpSlice(oldendpointslice, servicekey, true)
1✔
2730
                cont.resolveServiceNamedPortFromEpSlice(newendpointslice, servicekey, false)
1✔
2731
                cont.queueIPNetPolUpdates(oldIps)
1✔
2732
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
2733
                cont.queueIPNetPolUpdates(newIps)
1✔
2734
                cont.indexMutex.Unlock()
1✔
2735
        }
1✔
2736

2737
        if !reflect.DeepEqual(oldendpointslice.Endpoints, newendpointslice.Endpoints) {
2✔
2738
                cont.queueEndpointSliceNetPolUpdates(oldendpointslice)
1✔
2739
                cont.queueEndpointSliceNetPolUpdates(newendpointslice)
1✔
2740
        }
1✔
2741
        cont.log.Debug("EndPointSlice Object Update: ", servicekey)
1✔
2742
        cont.queueServiceUpdateByKey(servicekey)
1✔
2743
}
2744

2745
func (cont *AciController) queueEndpointSliceNetPolUpdates(endpointslice *discovery.EndpointSlice) {
1✔
2746
        for _, endpoint := range endpointslice.Endpoints {
2✔
2747
                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" ||
1✔
2748
                        endpoint.TargetRef.Namespace == "" || endpoint.TargetRef.Name == "" {
2✔
2749
                        continue
1✔
2750
                }
2751
                if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
1✔
2752
                        continue
×
2753
                }
2754
                podkey := endpoint.TargetRef.Namespace + "/" + endpoint.TargetRef.Name
1✔
2755
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
2756
                ps := make(map[string]bool)
1✔
2757
                for _, npkey := range npkeys {
2✔
2758
                        cont.queueNetPolUpdateByKey(npkey)
1✔
2759
                }
1✔
2760
                // Process if the  any matching namedport wildcard policy is present
2761
                // ignore np already processed policies
2762
                cont.queueMatchingNamedNp(ps, podkey)
1✔
2763
        }
2764
}
2765

2766
func getServiceKey(endPointSlice *discovery.EndpointSlice) (string, bool) {
1✔
2767
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
1✔
2768
        if !ok {
1✔
2769
                return "", false
×
2770
        }
×
2771
        return endPointSlice.ObjectMeta.Namespace + "/" + serviceName, true
1✔
2772
}
2773

2774
func getServiceNameAndNs(endPointSlice *discovery.EndpointSlice) (string, string, bool) {
×
2775
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
×
2776
        if !ok {
×
2777
                return "", "", false
×
2778
        }
×
2779
        return serviceName, endPointSlice.ObjectMeta.Namespace, true
×
2780
}
2781

2782
// can be called with index lock
2783
func (sep *serviceEndpoint) UpdateServicesForNode(nodename string) {
1✔
2784
        cont := sep.cont
1✔
2785
        cache.ListAll(cont.endpointsIndexer, labels.Everything(),
1✔
2786
                func(endpointsobj interface{}) {
2✔
2787
                        endpoints := endpointsobj.(*v1.Endpoints)
1✔
2788
                        for _, subset := range endpoints.Subsets {
2✔
2789
                                for _, addr := range subset.Addresses {
2✔
2790
                                        if addr.NodeName != nil && *addr.NodeName == nodename {
2✔
2791
                                                servicekey, err :=
1✔
2792
                                                        cache.MetaNamespaceKeyFunc(endpointsobj.(*v1.Endpoints))
1✔
2793
                                                if err != nil {
1✔
2794
                                                        cont.log.Error("Could not create endpoints key: ", err)
×
2795
                                                        return
×
2796
                                                }
×
2797
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
2798
                                                return
1✔
2799
                                        }
2800
                                }
2801
                        }
2802
                })
2803
}
2804

2805
func (seps *serviceEndpointSlice) UpdateServicesForNode(nodename string) {
1✔
2806
        // 1. List all the endpointslice and check for matching nodename
1✔
2807
        // 2. if it matches trigger the Service update and mark it visited
1✔
2808
        cont := seps.cont
1✔
2809
        visited := make(map[string]bool)
1✔
2810
        cache.ListAll(cont.endpointSliceIndexer, labels.Everything(),
1✔
2811
                func(endpointSliceobj interface{}) {
2✔
2812
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2813
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2814
                                if endpoint.NodeName != nil && *endpoint.NodeName == nodename {
2✔
2815
                                        servicekey, valid := getServiceKey(endpointSlices)
1✔
2816
                                        if !valid {
1✔
2817
                                                return
×
2818
                                        }
×
2819
                                        if _, ok := visited[servicekey]; !ok {
2✔
2820
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
2821
                                                visited[servicekey] = true
1✔
2822
                                                return
1✔
2823
                                        }
1✔
2824
                                }
2825
                        }
2826
                })
2827
}
2828
func (cont *AciController) setNodeMap(nodeMap map[string]*metadata.ServiceEndpoint, nodeName string) {
1✔
2829
        nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
2830
        if !ok {
1✔
2831
                return
×
2832
        }
×
2833
        _, ok = cont.fabricPathForNode(nodeName)
1✔
2834
        if !ok {
2✔
2835
                return
1✔
2836
        }
1✔
2837
        nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
2838
}
2839

2840
// 2 cases when epslices corresponding to given service is presnt in delayedEpSlices:
2841
//  1. endpoint not present in delayedEpSlices of the service
2842
//  2. endpoint present in delayedEpSlices of the service but in not ready state
2843
//
2844
// indexMutex lock must be acquired before calling the function
2845
func (cont *AciController) isDelayedEndpoint(endpoint *discovery.Endpoint, svckey string) bool {
×
2846
        delayed := false
×
2847
        endpointips := cont.getEndpointSliceEpIps(endpoint)
×
2848
        for _, delayedepslices := range cont.delayedEpSlices {
×
2849
                if delayedepslices.ServiceKey == svckey {
×
2850
                        var found bool
×
2851
                        epslice := delayedepslices.OldEpSlice
×
2852
                        for ix := range epslice.Endpoints {
×
2853
                                epips := cont.getEndpointSliceEpIps(&epslice.Endpoints[ix])
×
2854
                                if reflect.DeepEqual(endpointips, epips) {
×
2855
                                        // case 2
×
2856
                                        if epslice.Endpoints[ix].Conditions.Ready != nil && !*epslice.Endpoints[ix].Conditions.Ready {
×
2857
                                                delayed = true
×
2858
                                        }
×
2859
                                        found = true
×
2860
                                }
2861
                        }
2862
                        // case 1
2863
                        if !found {
×
2864
                                delayed = true
×
2865
                        }
×
2866
                }
2867
        }
2868
        return delayed
×
2869
}
2870

2871
// set nodemap only if endoint is ready and not in delayedEpSlices
2872
func (cont *AciController) setNodeMapDelay(nodeMap map[string]*metadata.ServiceEndpoint,
2873
        endpoint *discovery.Endpoint, service *v1.Service) {
×
2874
        svckey, err := cache.MetaNamespaceKeyFunc(service)
×
2875
        if err != nil {
×
2876
                cont.log.Error("Could not create service key: ", err)
×
2877
                return
×
2878
        }
×
2879
        if cont.config.NoWaitForServiceEpReadiness ||
×
2880
                (endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready) {
×
2881
                if endpoint.NodeName != nil && *endpoint.NodeName != "" {
×
2882
                        // donot setNodeMap for endpoint if:
×
2883
                        //   endpoint is newly added
×
2884
                        //   endpoint status changed from not ready to ready
×
2885
                        if !cont.isDelayedEndpoint(endpoint, svckey) {
×
2886
                                cont.setNodeMap(nodeMap, *endpoint.NodeName)
×
2887
                        }
×
2888
                }
2889
        }
2890
}
2891

2892
func (sep *serviceEndpoint) GetnodesMetadata(key string,
2893
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
2894
        cont := sep.cont
1✔
2895
        endpointsobj, exists, err := cont.endpointsIndexer.GetByKey(key)
1✔
2896
        if err != nil {
1✔
2897
                cont.log.Error("Could not lookup endpoints for " +
×
2898
                        key + ": " + err.Error())
×
2899
        }
×
2900
        if exists && endpointsobj != nil {
2✔
2901
                endpoints := endpointsobj.(*v1.Endpoints)
1✔
2902
                for _, subset := range endpoints.Subsets {
2✔
2903
                        for _, addr := range subset.Addresses {
2✔
2904
                                if addr.NodeName == nil {
2✔
2905
                                        continue
1✔
2906
                                }
2907
                                cont.setNodeMap(nodeMap, *addr.NodeName)
1✔
2908
                        }
2909
                }
2910
        }
2911
        cont.log.Info("NodeMap: ", nodeMap)
1✔
2912
}
2913

2914
func (seps *serviceEndpointSlice) GetnodesMetadata(key string,
2915
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
2916
        cont := seps.cont
1✔
2917
        // 1. Get all the Endpoint slices matching the label service-name
1✔
2918
        // 2. update the node map matching with endpoints nodes name
1✔
2919
        label := map[string]string{discovery.LabelServiceName: service.ObjectMeta.Name}
1✔
2920
        selector := labels.SelectorFromSet(label)
1✔
2921
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2922
                func(endpointSliceobj interface{}) {
2✔
2923
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2924
                        for ix := range endpointSlices.Endpoints {
2✔
2925
                                if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
2926
                                        cont.setNodeMapDelay(nodeMap, &endpointSlices.Endpoints[ix], service)
×
2927
                                } else if cont.config.NoWaitForServiceEpReadiness ||
1✔
2928
                                        (endpointSlices.Endpoints[ix].Conditions.Ready != nil && *endpointSlices.Endpoints[ix].Conditions.Ready) {
2✔
2929
                                        if endpointSlices.Endpoints[ix].NodeName != nil && *endpointSlices.Endpoints[ix].NodeName != "" {
2✔
2930
                                                cont.setNodeMap(nodeMap, *endpointSlices.Endpoints[ix].NodeName)
1✔
2931
                                        }
1✔
2932
                                }
2933
                        }
2934
                })
2935
        cont.log.Debug("NodeMap: ", nodeMap)
1✔
2936
}
2937

2938
func (sep *serviceEndpoint) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2939
        cont := sep.cont
1✔
2940
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
2941
        if err != nil {
1✔
2942
                serviceLogger(cont.log, service).
×
2943
                        Error("Could not create service key: ", err)
×
2944
                return false
×
2945
        }
×
2946
        endpointsobj, _, err := cont.endpointsIndexer.GetByKey(key)
1✔
2947
        if err != nil {
1✔
2948
                cont.log.Error("Could not lookup endpoints for " +
×
2949
                        key + ": " + err.Error())
×
2950
                return false
×
2951
        }
×
2952
        if endpointsobj != nil {
2✔
2953
                for _, subset := range endpointsobj.(*v1.Endpoints).Subsets {
2✔
2954
                        for _, addr := range subset.Addresses {
2✔
2955
                                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" {
1✔
2956
                                        continue
×
2957
                                }
2958
                                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(),
1✔
2959
                                        addr.TargetRef.Name))
1✔
2960
                        }
2961
                }
2962
        }
2963
        return true
1✔
2964
}
2965

2966
func (seps *serviceEndpointSlice) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2967
        cont := seps.cont
1✔
2968
        label := map[string]string{discovery.LabelServiceName: service.ObjectMeta.Name}
1✔
2969
        selector := labels.SelectorFromSet(label)
1✔
2970
        epcount := 0
1✔
2971
        childs := make(map[string]struct{})
1✔
2972
        var exists = struct{}{}
1✔
2973
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2974
                func(endpointSliceobj interface{}) {
2✔
2975
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2976
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2977
                                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" {
1✔
2978
                                        continue
×
2979
                                }
2980
                                epcount++
1✔
2981
                                childs[endpoint.TargetRef.Name] = exists
1✔
2982
                                cont.log.Debug("EndPoint added: ", endpoint.TargetRef.Name)
1✔
2983
                        }
2984
                })
2985
        for child := range childs {
2✔
2986
                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(), child))
1✔
2987
        }
1✔
2988
        return epcount != 0
1✔
2989
}
2990

2991
func getProtocolStr(proto v1.Protocol) string {
1✔
2992
        var protostring string
1✔
2993
        switch proto {
1✔
2994
        case v1.ProtocolUDP:
1✔
2995
                protostring = "udp"
1✔
2996
        case v1.ProtocolTCP:
1✔
2997
                protostring = "tcp"
1✔
2998
        case v1.ProtocolSCTP:
×
2999
                protostring = "sctp"
×
3000
        default:
×
3001
                protostring = "tcp"
×
3002
        }
3003
        return protostring
1✔
3004
}
3005

3006
func (cont *AciController) removeIpFromIngressIPList(ingressIps *[]net.IP, ip net.IP) {
×
3007
        cont.returnServiceIps([]net.IP{ip})
×
3008
        index := -1
×
3009
        for i, v := range *ingressIps {
×
3010
                if v.Equal(ip) {
×
3011
                        index = i
×
3012
                        break
×
3013
                }
3014
        }
3015
        if index == -1 {
×
3016
                return
×
3017
        }
×
3018
        *ingressIps = append((*ingressIps)[:index], (*ingressIps)[index+1:]...)
×
3019
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc