• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 7030

08 Feb 2023 03:59AM UTC coverage: 56.705% (-0.3%) from 57.004%
7030

push

travis-ci-com

web-flow
Merge pull request #1067 from noironetworks/endointslice-update-fix-kmr2

Delete pbr programming of deleted endpoint right away

141 of 141 new or added lines in 2 files covered. (100.0%)

11866 of 20926 relevant lines covered (56.7%)

0.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.15
/pkg/controller/services.go
1
// Copyright 2016 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "errors"
19
        "fmt"
20
        "net"
21
        "reflect"
22
        "sort"
23
        "strconv"
24
        "strings"
25
        "time"
26

27
        "github.com/noironetworks/aci-containers/pkg/apicapi"
28
        "github.com/noironetworks/aci-containers/pkg/metadata"
29
        "github.com/noironetworks/aci-containers/pkg/util"
30
        "github.com/sirupsen/logrus"
31
        v1 "k8s.io/api/core/v1"
32
        v1beta1 "k8s.io/api/discovery/v1beta1"
33
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34
        "k8s.io/apimachinery/pkg/fields"
35
        "k8s.io/apimachinery/pkg/labels"
36
        "k8s.io/client-go/kubernetes"
37
        "k8s.io/client-go/tools/cache"
38
)
39

40
// Default service contract scope value
41
const DefaultServiceContractScope = "context"
42

43
// Default service ext subnet scope - enable shared security
44
const DefaultServiceExtSubNetShared = false
45

46
func (cont *AciController) initEndpointsInformerFromClient(
47
        kubeClient kubernetes.Interface) {
×
48

×
49
        cont.initEndpointsInformerBase(
×
50
                cache.NewListWatchFromClient(
×
51
                        kubeClient.CoreV1().RESTClient(), "endpoints",
×
52
                        metav1.NamespaceAll, fields.Everything()))
×
53
}
×
54

55
func (cont *AciController) initEndpointSliceInformerFromClient(
56
        kubeClient kubernetes.Interface) {
×
57

×
58
        cont.initEndpointSliceInformerBase(
×
59
                cache.NewListWatchFromClient(
×
60
                        kubeClient.DiscoveryV1beta1().RESTClient(), "endpointslices",
×
61
                        metav1.NamespaceAll, fields.Everything()))
×
62
}
×
63

64
func (cont *AciController) initEndpointSliceInformerBase(listWatch *cache.ListWatch) {
1✔
65
        cont.endpointSliceIndexer, cont.endpointSliceInformer = cache.NewIndexerInformer(
1✔
66
                listWatch, &v1beta1.EndpointSlice{}, 0,
1✔
67
                cache.ResourceEventHandlerFuncs{
1✔
68
                        AddFunc: func(obj interface{}) {
2✔
69
                                cont.endpointSliceAdded(obj)
1✔
70
                        },
1✔
71
                        UpdateFunc: func(oldobj interface{}, newobj interface{}) {
1✔
72
                                cont.endpointSliceUpdated(oldobj, newobj)
1✔
73
                        },
1✔
74
                        DeleteFunc: func(obj interface{}) {
1✔
75
                                cont.endpointSliceDeleted(obj)
1✔
76
                        },
1✔
77
                },
78
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
79
        )
80
}
81

82
func (cont *AciController) initEndpointsInformerBase(listWatch *cache.ListWatch) {
1✔
83
        cont.endpointsIndexer, cont.endpointsInformer = cache.NewIndexerInformer(
1✔
84
                listWatch, &v1.Endpoints{}, 0,
1✔
85
                cache.ResourceEventHandlerFuncs{
1✔
86
                        AddFunc: func(obj interface{}) {
2✔
87
                                cont.endpointsAdded(obj)
1✔
88
                        },
1✔
89
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
90
                                cont.endpointsUpdated(old, new)
1✔
91
                        },
1✔
92
                        DeleteFunc: func(obj interface{}) {
1✔
93
                                cont.endpointsDeleted(obj)
1✔
94
                        },
1✔
95
                },
96
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
97
        )
98
}
99

100
func (cont *AciController) initServiceInformerFromClient(
101
        kubeClient *kubernetes.Clientset) {
×
102

×
103
        cont.initServiceInformerBase(
×
104
                cache.NewListWatchFromClient(
×
105
                        kubeClient.CoreV1().RESTClient(), "services",
×
106
                        metav1.NamespaceAll, fields.Everything()))
×
107
}
×
108

109
func (cont *AciController) initServiceInformerBase(listWatch *cache.ListWatch) {
1✔
110
        cont.serviceIndexer, cont.serviceInformer = cache.NewIndexerInformer(
1✔
111
                listWatch, &v1.Service{}, 0,
1✔
112
                cache.ResourceEventHandlerFuncs{
1✔
113
                        AddFunc: func(obj interface{}) {
2✔
114
                                cont.serviceAdded(obj)
1✔
115
                        },
1✔
116
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
117
                                cont.serviceUpdated(old, new)
1✔
118
                        },
1✔
119
                        DeleteFunc: func(obj interface{}) {
×
120
                                cont.serviceDeleted(obj)
×
121
                        },
×
122
                },
123
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
124
        )
125
}
126

127
func serviceLogger(log *logrus.Logger, as *v1.Service) *logrus.Entry {
1✔
128
        return log.WithFields(logrus.Fields{
1✔
129
                "namespace": as.ObjectMeta.Namespace,
1✔
130
                "name":      as.ObjectMeta.Name,
1✔
131
                "type":      as.Spec.Type,
1✔
132
        })
1✔
133
}
1✔
134

135
func (cont *AciController) queueIPNetPolUpdates(ips map[string]bool) {
1✔
136
        for ipStr := range ips {
2✔
137
                ip := net.ParseIP(ipStr)
1✔
138
                if ip == nil {
1✔
139
                        continue
×
140
                }
141
                entries, err := cont.netPolSubnetIndex.ContainingNetworks(ip)
1✔
142
                if err != nil {
1✔
143
                        cont.log.Error("Corrupted network policy IP index, err: ", err)
×
144
                        return
×
145
                }
×
146
                for _, entry := range entries {
2✔
147
                        for npkey := range entry.(*ipIndexEntry).keys {
2✔
148
                                cont.queueNetPolUpdateByKey(npkey)
1✔
149
                        }
1✔
150
                }
151
        }
152
}
153

154
func (cont *AciController) queuePortNetPolUpdates(ports map[string]targetPort) {
1✔
155
        for portkey := range ports {
2✔
156
                entry, _ := cont.targetPortIndex[portkey]
1✔
157
                if entry == nil {
2✔
158
                        continue
1✔
159
                }
160
                for npkey := range entry.networkPolicyKeys {
2✔
161
                        cont.queueNetPolUpdateByKey(npkey)
1✔
162
                }
1✔
163
        }
164
}
165

166
func (cont *AciController) queueNetPolForEpAddrs(addrs []v1.EndpointAddress) {
1✔
167
        for _, addr := range addrs {
2✔
168
                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" ||
1✔
169
                        addr.TargetRef.Namespace == "" || addr.TargetRef.Name == "" {
2✔
170
                        continue
1✔
171
                }
172
                podkey := addr.TargetRef.Namespace + "/" + addr.TargetRef.Name
1✔
173
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
174
                ps := make(map[string]bool)
1✔
175
                for _, npkey := range npkeys {
2✔
176
                        cont.queueNetPolUpdateByKey(npkey)
1✔
177
                        ps[npkey] = true
1✔
178
                }
1✔
179
                // Process if the  any matching namedport wildcard policy is present
180
                // ignore np already processed policies
181
                cont.queueMatchingNamedNp(ps, podkey)
1✔
182
        }
183
}
184

185
func (cont *AciController) queueMatchingNamedNp(served map[string]bool, podkey string) {
1✔
186
        cont.indexMutex.Lock()
1✔
187
        for npkey := range cont.nmPortNp {
2✔
188
                if _, ok := served[npkey]; !ok {
2✔
189
                        if cont.checkPodNmpMatchesNp(npkey, podkey) {
2✔
190
                                cont.queueNetPolUpdateByKey(npkey)
1✔
191
                        }
1✔
192
                }
193
        }
194
        cont.indexMutex.Unlock()
1✔
195

196
}
197
func (cont *AciController) queueEndpointsNetPolUpdates(endpoints *v1.Endpoints) {
1✔
198
        for _, subset := range endpoints.Subsets {
2✔
199
                cont.queueNetPolForEpAddrs(subset.Addresses)
1✔
200
                cont.queueNetPolForEpAddrs(subset.NotReadyAddresses)
1✔
201
        }
1✔
202
}
203

204
func (cont *AciController) returnServiceIps(ips []net.IP) {
1✔
205
        for _, ip := range ips {
2✔
206
                if ip.To4() != nil {
2✔
207
                        cont.serviceIps.DeallocateIp(ip)
1✔
208
                } else if ip.To16() != nil {
3✔
209
                        cont.serviceIps.DeallocateIp(ip)
1✔
210
                }
1✔
211
        }
212
}
213

214
func returnIps(pool *netIps, ips []net.IP) {
1✔
215
        for _, ip := range ips {
2✔
216
                if ip.To4() != nil {
2✔
217
                        pool.V4.AddIp(ip)
1✔
218
                } else if ip.To16() != nil {
3✔
219
                        pool.V6.AddIp(ip)
1✔
220
                }
1✔
221
        }
222
}
223

224
func (cont *AciController) staticMonPolName() string {
1✔
225
        return cont.aciNameForKey("monPol", cont.env.ServiceBd())
1✔
226
}
1✔
227

228
func (cont *AciController) staticMonPolDn() string {
1✔
229
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
230
                return fmt.Sprintf("uni/tn-%s/ipslaMonitoringPol-%s",
1✔
231
                        cont.config.AciVrfTenant, cont.staticMonPolName())
1✔
232
        }
1✔
233
        return ""
×
234
}
235

236
func (cont *AciController) staticServiceObjs() apicapi.ApicSlice {
1✔
237
        var serviceObjs apicapi.ApicSlice
1✔
238

1✔
239
        // Service bridge domain
1✔
240
        bdName := cont.aciNameForKey("bd", cont.env.ServiceBd())
1✔
241
        bd := apicapi.NewFvBD(cont.config.AciVrfTenant, bdName)
1✔
242
        bd.SetAttr("arpFlood", "yes")
1✔
243
        bd.SetAttr("ipLearning", "no")
1✔
244
        bd.SetAttr("unkMacUcastAct", "flood")
1✔
245
        bdToOut := apicapi.NewRsBdToOut(bd.GetDn(), cont.config.AciL3Out)
1✔
246
        bd.AddChild(bdToOut)
1✔
247
        bdToVrf := apicapi.NewRsCtx(bd.GetDn(), cont.config.AciVrf)
1✔
248
        bd.AddChild(bdToVrf)
1✔
249

1✔
250
        bdn := bd.GetDn()
1✔
251
        for _, cidr := range cont.config.NodeServiceSubnets {
2✔
252
                sn := apicapi.NewFvSubnet(bdn, cidr)
1✔
253
                bd.AddChild(sn)
1✔
254
        }
1✔
255
        serviceObjs = append(serviceObjs, bd)
1✔
256

1✔
257
        // Service IP SLA monitoring policy
1✔
258
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
259
                monPol := apicapi.NewFvIPSLAMonitoringPol(cont.config.AciVrfTenant,
1✔
260
                        cont.staticMonPolName())
1✔
261
                monPol.SetAttr("slaFrequency",
1✔
262
                        strconv.Itoa(cont.config.AciServiceMonitorInterval))
1✔
263
                serviceObjs = append(serviceObjs, monPol)
1✔
264
        }
1✔
265

266
        return serviceObjs
1✔
267
}
268

269
func (cont *AciController) initStaticServiceObjs() {
1✔
270
        cont.apicConn.WriteApicObjects(cont.config.AciPrefix+"_service_static",
1✔
271
                cont.staticServiceObjs())
1✔
272
}
1✔
273

274
func (cont *AciController) updateServicesForNode(nodename string) {
1✔
275
        cont.serviceEndPoints.UpdateServicesForNode(nodename)
1✔
276
}
1✔
277

278
// must have index lock
279
func (cont *AciController) getActiveFabricPathDn(node string) string {
×
280
        var fabricPathDn string
×
281
        sz := len(cont.nodeOpflexDevice[node])
×
282
        for i := range cont.nodeOpflexDevice[node] {
×
283
                device := cont.nodeOpflexDevice[node][sz-1-i]
×
284
                if device.GetAttrStr("state") == "connected" {
×
285
                        fabricPathDn = device.GetAttrStr("fabricPathDn")
×
286
                        break
×
287
                }
288
        }
289
        return fabricPathDn
×
290
}
291

292
func deleteDevicesFromList(delDevices apicapi.ApicSlice, devices apicapi.ApicSlice) apicapi.ApicSlice {
×
293
        var newDevices apicapi.ApicSlice
×
294
        for delDev := range delDevices {
×
295
                for _, device := range devices {
×
296
                        if !reflect.DeepEqual(delDev, device) {
×
297
                                newDevices = append(newDevices, device)
×
298
                        }
×
299
                }
300
        }
301
        return newDevices
×
302
}
303

304
func (cont *AciController) deleteOldOpflexDevices() {
1✔
305
        var nodeUpdates []string
1✔
306
        cont.indexMutex.Lock()
1✔
307
        for node, devices := range cont.nodeOpflexDevice {
1✔
308
                var delDevices apicapi.ApicSlice
×
309
                fabricPathDn := cont.getActiveFabricPathDn(node)
×
310
                if fabricPathDn != "" {
×
311
                        for _, device := range devices {
×
312
                                if device.GetAttrStr("delete") == "true" && device.GetAttrStr("fabricPathDn") != fabricPathDn {
×
313
                                        deleteTimeStr := device.GetAttrStr("deleteTime")
×
314
                                        deleteTime, err := time.Parse(time.RFC3339, deleteTimeStr)
×
315
                                        if err != nil {
×
316
                                                cont.log.Error("Failed to parse opflex device delete time: ", err)
×
317
                                                continue
×
318
                                        }
319
                                        now := time.Now()
×
320
                                        diff := now.Sub(deleteTime)
×
321
                                        if diff.Seconds() >= cont.config.OpflexDeviceDeleteTimeout {
×
322
                                                delDevices = append(delDevices, device)
×
323
                                        }
×
324
                                }
325
                        }
326
                        if len(delDevices) > 0 {
×
327
                                newDevices := deleteDevicesFromList(delDevices, devices)
×
328
                                cont.nodeOpflexDevice[node] = newDevices
×
329
                                if len(newDevices) == 0 {
×
330
                                        delete(cont.nodeOpflexDevice, node)
×
331
                                }
×
332
                                nodeUpdates = append(nodeUpdates, node)
×
333
                        }
334
                }
335
        }
336
        cont.indexMutex.Unlock()
1✔
337
        if len(nodeUpdates) > 0 {
1✔
338
                cont.postOpflexDeviceDelete(nodeUpdates)
×
339
        }
×
340
}
341

342
// must have index lock
343
func (cont *AciController) setDeleteFlagForOldDevices(node, fabricPathDn string) {
1✔
344
        for _, device := range cont.nodeOpflexDevice[node] {
2✔
345
                if device.GetAttrStr("fabricPathDn") != fabricPathDn {
1✔
346
                        t := time.Now()
×
347
                        device.SetAttr("delete", "true")
×
348
                        device.SetAttr("deleteTime", t.Format(time.RFC3339))
×
349
                }
×
350
        }
351
}
352

353
// must have index lock
354
func (cont *AciController) fabricPathForNode(name string) (string, bool) {
1✔
355
        sz := len(cont.nodeOpflexDevice[name])
1✔
356
        for i := range cont.nodeOpflexDevice[name] {
2✔
357
                device := cont.nodeOpflexDevice[name][sz-1-i]
1✔
358
                deviceState := device.GetAttrStr("state")
1✔
359
                if deviceState == "connected" {
2✔
360
                        if deviceState != device.GetAttrStr("prevState") {
2✔
361
                                cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Info("Processing fabric path for node ",
1✔
362
                                        "when connected device state is found")
1✔
363
                                device.SetAttr("prevState", deviceState)
1✔
364
                        }
1✔
365
                        fabricPathDn := device.GetAttrStr("fabricPathDn")
1✔
366
                        cont.setDeleteFlagForOldDevices(name, fabricPathDn)
1✔
367
                        return fabricPathDn, true
1✔
368
                } else {
1✔
369
                        device.SetAttr("prevState", deviceState)
1✔
370
                }
1✔
371
        }
372
        if sz > 0 {
2✔
373
                // When the opflex-device for a node changes, for example during a live migration,
1✔
374
                // we end up with both the old and the new device objects till the old object
1✔
375
                // ages out on APIC. The new object is at end of the devices list (see opflexDeviceChanged),
1✔
376
                // so we return the fabricPathDn of the last opflex-device.
1✔
377
                cont.fabricPathLogger(cont.nodeOpflexDevice[name][sz-1].GetAttrStr("hostName"),
1✔
378
                        cont.nodeOpflexDevice[name][sz-1]).Info("Processing fabricPathDn for node")
1✔
379
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("fabricPathDn"), true
1✔
380
        }
1✔
381
        return "", false
1✔
382
}
383

384
// must have index lock
385
func (cont *AciController) deviceMacForNode(name string) (string, bool) {
1✔
386
        sz := len(cont.nodeOpflexDevice[name])
1✔
387
        if sz > 0 {
2✔
388
                // When the opflex-device for a node changes, for example when the
1✔
389
                // node is recreated, we end up with both the old and the new
1✔
390
                // device objects till the old object ages out on APIC. The
1✔
391
                // new object is at end of the devices list (see
1✔
392
                // opflexDeviceChanged), so we return the MAC address of the
1✔
393
                // last opflex-device.
1✔
394
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("mac"), true
1✔
395
        }
1✔
396
        return "", false
1✔
397
}
398

399
func apicRedirectDst(rpDn string, ip string, mac string,
400
        descr string, healthGroupDn string, enablePbrTracking bool) apicapi.ApicObject {
1✔
401
        dst := apicapi.NewVnsRedirectDest(rpDn, ip, mac).SetAttr("descr", descr)
1✔
402
        if healthGroupDn != "" && enablePbrTracking {
2✔
403
                dst.AddChild(apicapi.NewVnsRsRedirectHealthGroup(dst.GetDn(),
1✔
404
                        healthGroupDn))
1✔
405
        }
1✔
406
        return dst
1✔
407
}
408

409
func (cont *AciController) apicRedirectPol(name string, tenantName string, nodes []string,
410
        nodeMap map[string]*metadata.ServiceEndpoint,
411
        monPolDn string, enablePbrTracking bool) (apicapi.ApicObject, string) {
1✔
412
        rp := apicapi.NewVnsSvcRedirectPol(tenantName, name)
1✔
413
        rp.SetAttr("thresholdDownAction", "deny")
1✔
414
        rpDn := rp.GetDn()
1✔
415
        for _, node := range nodes {
2✔
416
                cont.indexMutex.Lock()
1✔
417
                serviceEp, ok := nodeMap[node]
1✔
418
                if !ok {
1✔
419
                        continue
×
420
                }
421
                if serviceEp.Ipv4 != nil {
2✔
422
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv4.String(),
1✔
423
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
1✔
424
                }
1✔
425
                if serviceEp.Ipv6 != nil {
1✔
426
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv6.String(),
×
427
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
×
428
                }
×
429
                cont.indexMutex.Unlock()
1✔
430
        }
431
        if monPolDn != "" && enablePbrTracking {
2✔
432
                rp.AddChild(apicapi.NewVnsRsIPSLAMonitoringPol(rpDn, monPolDn))
1✔
433
        }
1✔
434
        return rp, rpDn
1✔
435
}
436

437
func apicExtNetCreate(enDn string, ingress string, ipv4 bool,
438
        cidr bool, sharedSec bool) apicapi.ApicObject {
1✔
439

1✔
440
        if !cidr {
2✔
441
                if ipv4 {
2✔
442
                        ingress = ingress + "/32"
1✔
443
                } else {
1✔
444
                        ingress = ingress + "/128"
×
445
                }
×
446
        }
447
        subnet := apicapi.NewL3extSubnet(enDn, ingress)
1✔
448
        if sharedSec {
2✔
449
                subnet.SetAttr("scope", "import-security,shared-security")
1✔
450
        }
1✔
451
        return subnet
1✔
452
}
453

454
func apicExtNet(name string, tenantName string, l3Out string,
455
        ingresses []string, sharedSecurity bool, snat bool) apicapi.ApicObject {
1✔
456

1✔
457
        en := apicapi.NewL3extInstP(tenantName, l3Out, name)
1✔
458
        enDn := en.GetDn()
1✔
459
        if snat {
2✔
460
                en.AddChild(apicapi.NewFvRsCons(enDn, name))
1✔
461
        } else {
2✔
462
                en.AddChild(apicapi.NewFvRsProv(enDn, name))
1✔
463
        }
1✔
464

465
        for _, ingress := range ingresses {
2✔
466
                ip, _, _ := net.ParseCIDR(ingress)
1✔
467
                // If ingress is a subnet
1✔
468
                if ip != nil {
2✔
469
                        if ip != nil && ip.To4() != nil {
2✔
470
                                subnet := apicExtNetCreate(enDn, ingress, true, true, sharedSecurity)
1✔
471
                                en.AddChild(subnet)
1✔
472
                        } else if ip != nil && ip.To16() != nil {
1✔
473
                                subnet := apicExtNetCreate(enDn, ingress, false, true, sharedSecurity)
×
474
                                en.AddChild(subnet)
×
475
                        }
×
476
                } else {
1✔
477
                        // If ingress is an IP address
1✔
478
                        ip := net.ParseIP(ingress)
1✔
479
                        if ip != nil && ip.To4() != nil {
2✔
480
                                subnet := apicExtNetCreate(enDn, ingress, true, false, sharedSecurity)
1✔
481
                                en.AddChild(subnet)
1✔
482
                        } else if ip != nil && ip.To16() != nil {
1✔
483
                                subnet := apicExtNetCreate(enDn, ingress, false, false, sharedSecurity)
×
484
                                en.AddChild(subnet)
×
485
                        }
×
486
                }
487
        }
488
        return en
1✔
489
}
490

491
func apicExtNetCons(conName string, tenantName string,
492
        l3Out string, net string) apicapi.ApicObject {
1✔
493

1✔
494
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
495
        return apicapi.NewFvRsCons(enDn, conName)
1✔
496
}
1✔
497

498
func apicExtNetProv(conName string, tenantName string,
499
        l3Out string, net string) apicapi.ApicObject {
1✔
500

1✔
501
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
502
        return apicapi.NewFvRsProv(enDn, conName)
1✔
503
}
1✔
504

505
// Helper function to check if a string item exists in a slice
506
func stringInSlice(str string, list []string) bool {
1✔
507
        for _, v := range list {
2✔
508
                if v == str {
2✔
509
                        return true
1✔
510
                }
1✔
511
        }
512
        return false
×
513
}
514

515
func validScope(scope string) bool {
1✔
516
        validValues := []string{"", "context", "tenant", "global"}
1✔
517
        return stringInSlice(scope, validValues)
1✔
518
}
1✔
519

520
func apicContract(conName string, tenantName string,
521
        graphName string, scopeName string, isSnatPbrFltrChain bool) apicapi.ApicObject {
1✔
522
        con := apicapi.NewVzBrCP(tenantName, conName)
1✔
523
        if scopeName != "" && scopeName != "context" {
2✔
524
                con.SetAttr("scope", scopeName)
1✔
525
        }
1✔
526
        cs := apicapi.NewVzSubj(con.GetDn(), "loadbalancedservice")
1✔
527
        csDn := cs.GetDn()
1✔
528
        if isSnatPbrFltrChain {
2✔
529
                cs.SetAttr("revFltPorts", "no")
1✔
530
                inTerm := apicapi.NewVzInTerm(csDn)
1✔
531
                outTerm := apicapi.NewVzOutTerm(csDn)
1✔
532
                inTerm.AddChild(apicapi.NewVzRsInTermGraphAtt(inTerm.GetDn(), graphName))
1✔
533
                inTerm.AddChild(apicapi.NewVzRsFiltAtt(inTerm.GetDn(), conName+"_fromCons-toProv"))
1✔
534
                outTerm.AddChild(apicapi.NewVzRsOutTermGraphAtt(outTerm.GetDn(), graphName))
1✔
535
                outTerm.AddChild(apicapi.NewVzRsFiltAtt(outTerm.GetDn(), conName+"_fromProv-toCons"))
1✔
536
                cs.AddChild(inTerm)
1✔
537
                cs.AddChild(outTerm)
1✔
538
        } else {
2✔
539
                cs.AddChild(apicapi.NewVzRsSubjGraphAtt(csDn, graphName))
1✔
540
                cs.AddChild(apicapi.NewVzRsSubjFiltAtt(csDn, conName))
1✔
541
        }
1✔
542
        con.AddChild(cs)
1✔
543
        return con
1✔
544
}
545

546
func apicDevCtx(name string, tenantName string,
547
        graphName string, bdName string, rpDn string, isSnatPbrFltrChain bool) apicapi.ApicObject {
1✔
548

1✔
549
        cc := apicapi.NewVnsLDevCtx(tenantName, name, graphName, "loadbalancer")
1✔
550
        ccDn := cc.GetDn()
1✔
551
        graphDn := fmt.Sprintf("uni/tn-%s/lDevVip-%s", tenantName, graphName)
1✔
552
        lifDn := fmt.Sprintf("%s/lIf-%s", graphDn, "interface")
1✔
553
        bdDn := fmt.Sprintf("uni/tn-%s/BD-%s", tenantName, bdName)
1✔
554
        cc.AddChild(apicapi.NewVnsRsLDevCtxToLDev(ccDn, graphDn))
1✔
555
        rpDnBase := rpDn
1✔
556
        for _, ctxConn := range []string{"consumer", "provider"} {
2✔
557
                lifCtx := apicapi.NewVnsLIfCtx(ccDn, ctxConn)
1✔
558
                if isSnatPbrFltrChain {
2✔
559
                        if ctxConn == "consumer" {
2✔
560
                                rpDn = rpDnBase + "_Cons"
1✔
561
                        } else {
2✔
562
                                rpDn = rpDnBase + "_Prov"
1✔
563
                        }
1✔
564
                }
565
                lifCtxDn := lifCtx.GetDn()
1✔
566
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToSvcRedirectPol(lifCtxDn, rpDn))
1✔
567
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToBD(lifCtxDn, bdDn))
1✔
568
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToLIf(lifCtxDn, lifDn))
1✔
569
                cc.AddChild(lifCtx)
1✔
570
        }
571
        return cc
1✔
572
}
573

574
func apicFilterEntry(filterDn string, count string, p_start string,
575
        p_end string, protocol string, stateful string, snat bool, outTerm bool) apicapi.ApicObject {
1✔
576

1✔
577
        fe := apicapi.NewVzEntry(filterDn, count)
1✔
578
        fe.SetAttr("etherT", "ip")
1✔
579
        fe.SetAttr("prot", protocol)
1✔
580
        if snat {
2✔
581
                if outTerm {
2✔
582
                        if protocol == "tcp" {
2✔
583
                                fe.SetAttr("tcpRules", "est")
1✔
584
                        }
1✔
585
                        // Reverse the ports for outTerm
586
                        fe.SetAttr("dFromPort", p_start)
1✔
587
                        fe.SetAttr("dToPort", p_end)
1✔
588
                } else {
1✔
589
                        fe.SetAttr("sFromPort", p_start)
1✔
590
                        fe.SetAttr("sToPort", p_end)
1✔
591
                }
1✔
592
        } else {
1✔
593
                fe.SetAttr("dFromPort", p_start)
1✔
594
                fe.SetAttr("dToPort", p_end)
1✔
595
        }
1✔
596
        fe.SetAttr("stateful", stateful)
1✔
597
        return fe
1✔
598
}
599
func apicFilter(name string, tenantName string,
600
        portSpec []v1.ServicePort, snat bool, snatRange portRangeSnat) apicapi.ApicObject {
1✔
601

1✔
602
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
603
        filterDn := filter.GetDn()
1✔
604

1✔
605
        var i int
1✔
606
        var port v1.ServicePort
1✔
607
        for i, port = range portSpec {
2✔
608
                pstr := strconv.Itoa(int(port.Port))
1✔
609
                proto := getProtocolStr(port.Protocol)
1✔
610
                fe := apicFilterEntry(filterDn, strconv.Itoa(i), pstr,
1✔
611
                        pstr, proto, "no", false, false)
1✔
612
                filter.AddChild(fe)
1✔
613
        }
1✔
614

615
        if snat {
1✔
616
                portSpec := []portRangeSnat{snatRange}
×
617
                p_start := strconv.Itoa(int(portSpec[0].start))
×
618
                p_end := strconv.Itoa(int(portSpec[0].end))
×
619

×
620
                fe1 := apicFilterEntry(filterDn, strconv.Itoa(i+1), p_start,
×
621
                        p_end, "tcp", "no", false, false)
×
622
                filter.AddChild(fe1)
×
623
                fe2 := apicFilterEntry(filterDn, strconv.Itoa(i+2), p_start,
×
624
                        p_end, "udp", "no", false, false)
×
625
                filter.AddChild(fe2)
×
626
        }
×
627
        return filter
1✔
628
}
629

630
func apicFilterSnat(name string, tenantName string,
631
        portSpec []portRangeSnat, outTerm bool) apicapi.ApicObject {
1✔
632

1✔
633
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
634
        filterDn := filter.GetDn()
1✔
635

1✔
636
        p_start := strconv.Itoa(int(portSpec[0].start))
1✔
637
        p_end := strconv.Itoa(int(portSpec[0].end))
1✔
638

1✔
639
        fe := apicFilterEntry(filterDn, "0", p_start,
1✔
640
                p_end, "tcp", "no", true, outTerm)
1✔
641
        filter.AddChild(fe)
1✔
642
        fe1 := apicFilterEntry(filterDn, "1", p_start,
1✔
643
                p_end, "udp", "no", true, outTerm)
1✔
644
        filter.AddChild(fe1)
1✔
645

1✔
646
        return filter
1✔
647
}
1✔
648

649
func (cont *AciController) updateServiceDeviceInstance(key string,
650
        service *v1.Service) error {
1✔
651
        cont.indexMutex.Lock()
1✔
652
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
653
        cont.serviceEndPoints.GetnodesMetadata(key, service, nodeMap)
1✔
654
        cont.indexMutex.Unlock()
1✔
655

1✔
656
        var nodes []string
1✔
657
        for node := range nodeMap {
2✔
658
                nodes = append(nodes, node)
1✔
659
        }
1✔
660
        sort.Strings(nodes)
1✔
661
        name := cont.aciNameForKey("svc", key)
1✔
662
        var conScope string
1✔
663
        scopeVal, ok := service.ObjectMeta.Annotations[metadata.ServiceContractScopeAnnotation]
1✔
664
        if ok {
2✔
665
                normScopeVal := strings.ToLower(scopeVal)
1✔
666
                if !validScope(normScopeVal) {
1✔
667
                        errString := "Invalid service contract scope value provided " + scopeVal
×
668
                        err := errors.New(errString)
×
669
                        serviceLogger(cont.log, service).Error("Could not create contract: ", err)
×
670
                        return err
×
671

×
672
                } else {
1✔
673
                        conScope = normScopeVal
1✔
674
                }
1✔
675
        } else {
1✔
676
                conScope = DefaultServiceContractScope
1✔
677
        }
1✔
678

679
        var sharedSecurity bool
1✔
680
        if conScope == "global" {
2✔
681
                sharedSecurity = true
1✔
682
        } else {
2✔
683
                sharedSecurity = DefaultServiceExtSubNetShared
1✔
684
        }
1✔
685

686
        graphName := cont.aciNameForKey("svc", "global")
1✔
687
        var serviceObjs apicapi.ApicSlice
1✔
688
        if len(nodes) > 0 {
2✔
689

1✔
690
                // 1. Service redirect policy
1✔
691
                // The service redirect policy contains the MAC address
1✔
692
                // and IP address of each of the service endpoints for
1✔
693
                // each node that hosts a pod for this service.  The
1✔
694
                // example below shows the case of two nodes.
1✔
695
                rp, rpDn :=
1✔
696
                        cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
1✔
697
                                nodeMap, cont.staticMonPolDn(), cont.config.AciPbrTrackingNonSnat)
1✔
698
                serviceObjs = append(serviceObjs, rp)
1✔
699

1✔
700
                // 2. Service graph contract and external network
1✔
701
                // The service graph contract must be bound to the service
1✔
702
                // graph.  This contract must be consumed by the default
1✔
703
                // layer 3 network and provided by the service layer 3
1✔
704
                // network.
1✔
705
                {
2✔
706
                        var ingresses []string
1✔
707
                        for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
708
                                ingresses = append(ingresses, ingress.IP)
1✔
709
                        }
1✔
710
                        serviceObjs = append(serviceObjs,
1✔
711
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
712
                                        cont.config.AciL3Out, ingresses, sharedSecurity, false))
1✔
713
                }
714

715
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, false)
1✔
716
                serviceObjs = append(serviceObjs, contract)
1✔
717
                for _, net := range cont.config.AciExtNetworks {
2✔
718
                        serviceObjs = append(serviceObjs,
1✔
719
                                apicExtNetCons(name, cont.config.AciVrfTenant,
1✔
720
                                        cont.config.AciL3Out, net))
1✔
721
                }
1✔
722

723
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
724
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
725

1✔
726
                _, snat := cont.snatServices[key]
1✔
727
                filter := apicFilter(name, cont.config.AciVrfTenant,
1✔
728
                        service.Spec.Ports, snat, defaultPortRange)
1✔
729
                serviceObjs = append(serviceObjs, filter)
1✔
730

1✔
731
                // 3. Device cluster context
1✔
732
                // The logical device context binds the service contract
1✔
733
                // to the redirect policy and the device cluster and
1✔
734
                // bridge domain for the device cluster.
1✔
735
                serviceObjs = append(serviceObjs,
1✔
736
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName,
1✔
737
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, false))
1✔
738
        }
739

740
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
741
        return nil
1✔
742
}
743

744
func (cont *AciController) updateServiceDeviceInstanceSnat(key string) error {
1✔
745
        nodeList := cont.nodeIndexer.List()
1✔
746
        cont.indexMutex.Lock()
1✔
747
        if len(cont.nodeServiceMetaCache) == 0 {
2✔
748
                cont.indexMutex.Unlock()
1✔
749
                return nil
1✔
750
        }
1✔
751
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
752
        for itr, nodeItem := range nodeList {
2✔
753
                if itr == cont.config.MaxSvcGraphNodes {
1✔
754
                        break
×
755
                }
756
                node := nodeItem.(*v1.Node)
1✔
757
                nodeName := node.ObjectMeta.Name
1✔
758
                nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
759
                if !ok {
2✔
760
                        continue
1✔
761
                }
762
                _, ok = cont.fabricPathForNode(nodeName)
1✔
763
                if !ok {
1✔
764
                        continue
×
765
                }
766
                nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
767
        }
768
        cont.indexMutex.Unlock()
1✔
769

1✔
770
        var nodes []string
1✔
771
        for node := range nodeMap {
2✔
772
                nodes = append(nodes, node)
1✔
773
        }
1✔
774
        sort.Strings(nodes)
1✔
775
        name := cont.aciNameForKey("snat", key)
1✔
776
        var conScope = cont.config.SnatSvcContractScope
1✔
777
        sharedSecurity := true
1✔
778

1✔
779
        graphName := cont.aciNameForKey("svc", "global")
1✔
780
        var serviceObjs apicapi.ApicSlice
1✔
781
        if len(nodes) > 0 {
2✔
782

1✔
783
                // 1. Service redirect policy
1✔
784
                // The service redirect policy contains the MAC address
1✔
785
                // and IP address of each of the service endpoints for
1✔
786
                // each node that hosts a pod for this service.
1✔
787
                // For SNAT with the introduction of filter-chain usage, to work-around
1✔
788
                // an APIC limitation, creating two PBR policies with same nodes.
1✔
789
                var rpDn string
1✔
790
                var rp apicapi.ApicObject
1✔
791
                if cont.apicConn.SnatPbrFltrChain {
2✔
792
                        rpCons, rpDnCons :=
1✔
793
                                cont.apicRedirectPol(name+"_Cons", cont.config.AciVrfTenant, nodes,
1✔
794
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
795
                        serviceObjs = append(serviceObjs, rpCons)
1✔
796
                        rpProv, _ :=
1✔
797
                                cont.apicRedirectPol(name+"_Prov", cont.config.AciVrfTenant, nodes,
1✔
798
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
799
                        serviceObjs = append(serviceObjs, rpProv)
1✔
800
                        rpDn = strings.TrimSuffix(rpDnCons, "_Cons")
1✔
801
                } else {
1✔
802
                        rp, rpDn =
×
803
                                cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
×
804
                                        nodeMap, cont.staticMonPolDn(), true)
×
805
                        serviceObjs = append(serviceObjs, rp)
×
806
                }
×
807
                // 2. Service graph contract and external network
808
                // The service graph contract must be bound to the
809
                // service
810
                // graph.  This contract must be consumed by the default
811
                // layer 3 network and provided by the service layer 3
812
                // network.
813
                {
1✔
814
                        var ingresses []string
1✔
815
                        for _, policy := range cont.snatPolicyCache {
2✔
816
                                ingresses = append(ingresses, policy.SnatIp...)
1✔
817
                        }
1✔
818
                        serviceObjs = append(serviceObjs,
1✔
819
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
820
                                        cont.config.AciL3Out, ingresses, sharedSecurity, true))
1✔
821
                }
822

823
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, cont.apicConn.SnatPbrFltrChain)
1✔
824
                serviceObjs = append(serviceObjs, contract)
1✔
825

1✔
826
                for _, net := range cont.config.AciExtNetworks {
2✔
827
                        serviceObjs = append(serviceObjs,
1✔
828
                                apicExtNetProv(name, cont.config.AciVrfTenant,
1✔
829
                                        cont.config.AciL3Out, net))
1✔
830
                }
1✔
831

832
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
833
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
834
                portSpec := []portRangeSnat{defaultPortRange}
1✔
835
                if cont.apicConn.SnatPbrFltrChain {
2✔
836
                        filterIn := apicFilterSnat(name+"_fromCons-toProv", cont.config.AciVrfTenant, portSpec, false)
1✔
837
                        serviceObjs = append(serviceObjs, filterIn)
1✔
838
                        filterOut := apicFilterSnat(name+"_fromProv-toCons", cont.config.AciVrfTenant, portSpec, true)
1✔
839
                        serviceObjs = append(serviceObjs, filterOut)
1✔
840
                } else {
1✔
841
                        filter := apicFilterSnat(name, cont.config.AciVrfTenant, portSpec, false)
×
842
                        serviceObjs = append(serviceObjs, filter)
×
843
                }
×
844
                // 3. Device cluster context
845
                // The logical device context binds the service contract
846
                // to the redirect policy and the device cluster and
847
                // bridge domain for the device cluster.
848
                serviceObjs = append(serviceObjs,
1✔
849
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName,
1✔
850
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, cont.apicConn.SnatPbrFltrChain))
1✔
851
        }
852
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
853
        return nil
1✔
854
}
855

856
func (cont *AciController) queueServiceUpdateByKey(key string) {
1✔
857
        cont.serviceQueue.Add(key)
1✔
858
}
1✔
859

860
func (cont *AciController) queueServiceUpdate(service *v1.Service) {
1✔
861
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
862
        if err != nil {
1✔
863
                serviceLogger(cont.log, service).
×
864
                        Error("Could not create service key: ", err)
×
865
                return
×
866
        }
×
867
        cont.serviceQueue.Add(key)
1✔
868
}
869

870
func apicDeviceCluster(name string, vrfTenant string,
871
        physDom string, encap string,
872
        nodes []string, nodeMap map[string]string) (apicapi.ApicObject, string) {
1✔
873

1✔
874
        dc := apicapi.NewVnsLDevVip(vrfTenant, name)
1✔
875
        dc.SetAttr("managed", "no")
1✔
876
        dcDn := dc.GetDn()
1✔
877
        dc.AddChild(apicapi.NewVnsRsALDevToPhysDomP(dcDn,
1✔
878
                fmt.Sprintf("uni/phys-%s", physDom)))
1✔
879
        lif := apicapi.NewVnsLIf(dcDn, "interface")
1✔
880
        lif.SetAttr("encap", encap)
1✔
881
        lifDn := lif.GetDn()
1✔
882

1✔
883
        for _, node := range nodes {
2✔
884
                path, ok := nodeMap[node]
1✔
885
                if !ok {
1✔
886
                        continue
×
887
                }
888

889
                cdev := apicapi.NewVnsCDev(dcDn, node)
1✔
890
                cif := apicapi.NewVnsCif(cdev.GetDn(), "interface")
1✔
891
                cif.AddChild(apicapi.NewVnsRsCIfPathAtt(cif.GetDn(), path))
1✔
892
                cdev.AddChild(cif)
1✔
893
                lif.AddChild(apicapi.NewVnsRsCIfAttN(lifDn, cif.GetDn()))
1✔
894
                dc.AddChild(cdev)
1✔
895
        }
896

897
        dc.AddChild(lif)
1✔
898

1✔
899
        return dc, dcDn
1✔
900
}
901

902
func apicServiceGraph(name string, tenantName string,
903
        dcDn string) apicapi.ApicObject {
1✔
904

1✔
905
        sg := apicapi.NewVnsAbsGraph(tenantName, name)
1✔
906
        sgDn := sg.GetDn()
1✔
907
        var provDn string
1✔
908
        var consDn string
1✔
909
        var cTermDn string
1✔
910
        var pTermDn string
1✔
911
        {
2✔
912
                an := apicapi.NewVnsAbsNode(sgDn, "loadbalancer")
1✔
913
                an.SetAttr("managed", "no")
1✔
914
                an.SetAttr("routingMode", "Redirect")
1✔
915
                anDn := an.GetDn()
1✔
916
                cons := apicapi.NewVnsAbsFuncConn(anDn, "consumer")
1✔
917
                consDn = cons.GetDn()
1✔
918
                an.AddChild(cons)
1✔
919
                prov := apicapi.NewVnsAbsFuncConn(anDn, "provider")
1✔
920
                provDn = prov.GetDn()
1✔
921
                an.AddChild(prov)
1✔
922
                an.AddChild(apicapi.NewVnsRsNodeToLDev(anDn, dcDn))
1✔
923
                sg.AddChild(an)
1✔
924
        }
1✔
925
        {
1✔
926
                tnc := apicapi.NewVnsAbsTermNodeCon(sgDn, "T1")
1✔
927
                tncDn := tnc.GetDn()
1✔
928
                cTerm := apicapi.NewVnsAbsTermConn(tncDn)
1✔
929
                cTermDn = cTerm.GetDn()
1✔
930
                tnc.AddChild(cTerm)
1✔
931
                tnc.AddChild(apicapi.NewVnsInTerm(tncDn))
1✔
932
                tnc.AddChild(apicapi.NewVnsOutTerm(tncDn))
1✔
933
                sg.AddChild(tnc)
1✔
934
        }
1✔
935
        {
1✔
936
                tnp := apicapi.NewVnsAbsTermNodeProv(sgDn, "T2")
1✔
937
                tnpDn := tnp.GetDn()
1✔
938
                pTerm := apicapi.NewVnsAbsTermConn(tnpDn)
1✔
939
                pTermDn = pTerm.GetDn()
1✔
940
                tnp.AddChild(pTerm)
1✔
941
                tnp.AddChild(apicapi.NewVnsInTerm(tnpDn))
1✔
942
                tnp.AddChild(apicapi.NewVnsOutTerm(tnpDn))
1✔
943
                sg.AddChild(tnp)
1✔
944
        }
1✔
945
        {
1✔
946
                acc := apicapi.NewVnsAbsConnection(sgDn, "C1")
1✔
947
                acc.SetAttr("connDir", "provider")
1✔
948
                accDn := acc.GetDn()
1✔
949
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, consDn))
1✔
950
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, cTermDn))
1✔
951
                sg.AddChild(acc)
1✔
952
        }
1✔
953
        {
1✔
954
                acp := apicapi.NewVnsAbsConnection(sgDn, "C2")
1✔
955
                acp.SetAttr("connDir", "provider")
1✔
956
                acpDn := acp.GetDn()
1✔
957
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, provDn))
1✔
958
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, pTermDn))
1✔
959
                sg.AddChild(acp)
1✔
960
        }
1✔
961
        return sg
1✔
962
}
963
func (cont *AciController) updateDeviceCluster() {
1✔
964
        nodeMap := make(map[string]string)
1✔
965

1✔
966
        cont.indexMutex.Lock()
1✔
967
        for node := range cont.nodeOpflexDevice {
2✔
968
                cont.log.Debug("Processing node in nodeOpflexDevice cache : ", node)
1✔
969
                fabricPath, ok := cont.fabricPathForNode(node)
1✔
970
                if !ok {
2✔
971
                        continue
1✔
972
                }
973
                nodeMap[node] = fabricPath
1✔
974
        }
975
        cont.indexMutex.Unlock()
1✔
976

1✔
977
        var nodes []string
1✔
978
        for node := range nodeMap {
2✔
979
                nodes = append(nodes, node)
1✔
980
        }
1✔
981
        sort.Strings(nodes)
1✔
982

1✔
983
        name := cont.aciNameForKey("svc", "global")
1✔
984
        var serviceObjs apicapi.ApicSlice
1✔
985

1✔
986
        // 1. Device cluster:
1✔
987
        // The device cluster is a set of physical paths that need to be
1✔
988
        // created for each node in the cluster, that correspond to the
1✔
989
        // service interface for each node.
1✔
990
        dc, dcDn := apicDeviceCluster(name, cont.config.AciVrfTenant,
1✔
991
                cont.config.AciServicePhysDom, cont.config.AciServiceEncap,
1✔
992
                nodes, nodeMap)
1✔
993
        serviceObjs = append(serviceObjs, dc)
1✔
994

1✔
995
        // 2. Service graph template
1✔
996
        // The service graph controls how the traffic will be redirected.
1✔
997
        // A service graph must be created for each device cluster.
1✔
998
        serviceObjs = append(serviceObjs,
1✔
999
                apicServiceGraph(name, cont.config.AciVrfTenant, dcDn))
1✔
1000

1✔
1001
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1002
}
1003

1004
func (cont *AciController) fabricPathLogger(node string,
1005
        obj apicapi.ApicObject) *logrus.Entry {
1✔
1006

1✔
1007
        return cont.log.WithFields(logrus.Fields{
1✔
1008
                "fabricPath": obj.GetAttr("fabricPathDn"),
1✔
1009
                "mac":        obj.GetAttr("mac"),
1✔
1010
                "node":       node,
1✔
1011
                "obj":        obj,
1✔
1012
        })
1✔
1013
}
1✔
1014

1015
func (cont *AciController) opflexDeviceChanged(obj apicapi.ApicObject) {
1✔
1016

1✔
1017
        devType := obj.GetAttrStr("devType")
1✔
1018
        domName := obj.GetAttrStr("domName")
1✔
1019
        ctrlrName := obj.GetAttrStr("ctrlrName")
1✔
1020

1✔
1021
        if (devType == cont.env.OpFlexDeviceType()) && (domName == cont.config.AciVmmDomain) && (ctrlrName == cont.config.AciVmmController) {
2✔
1022
                cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Processing opflex device update")
1✔
1023
                if obj.GetAttrStr("state") == "disconnected" {
2✔
1024
                        cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Opflex device disconnected")
1✔
1025
                        cont.indexMutex.Lock()
1✔
1026
                        for node, devices := range cont.nodeOpflexDevice {
1✔
1027
                                if node == obj.GetAttrStr("hostName") {
×
1028
                                        for _, device := range devices {
×
1029
                                                if device.GetDn() == obj.GetDn() {
×
1030
                                                        device.SetAttr("state", "disconnected")
×
1031
                                                        cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Debug("Opflex device cache updated for disconnected node")
×
1032
                                                }
×
1033
                                        }
1034
                                        cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", devices)
×
1035
                                        break
×
1036
                                }
1037
                        }
1038
                        cont.indexMutex.Unlock()
1✔
1039
                        cont.updateDeviceCluster()
1✔
1040
                        return
1✔
1041
                }
1042
                var nodeUpdates []string
1✔
1043

1✔
1044
                cont.indexMutex.Lock()
1✔
1045
                nodefound := false
1✔
1046
                for node, devices := range cont.nodeOpflexDevice {
2✔
1047
                        found := false
1✔
1048

1✔
1049
                        if node == obj.GetAttrStr("hostName") {
2✔
1050
                                nodefound = true
1✔
1051
                        }
1✔
1052

1053
                        for i, device := range devices {
2✔
1054
                                if device.GetDn() != obj.GetDn() {
2✔
1055
                                        continue
1✔
1056
                                }
1057
                                found = true
1✔
1058

1✔
1059
                                if obj.GetAttrStr("hostName") != node {
2✔
1060
                                        cont.fabricPathLogger(node, device).
1✔
1061
                                                Debug("Moving opflex device from node")
1✔
1062

1✔
1063
                                        devices = append(devices[:i], devices[i+1:]...)
1✔
1064
                                        cont.nodeOpflexDevice[node] = devices
1✔
1065
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1066
                                        break
1✔
1067
                                } else if (device.GetAttrStr("mac") != obj.GetAttrStr("mac")) ||
1✔
1068
                                        (device.GetAttrStr("fabricPathDn") != obj.GetAttrStr("fabricPathDn")) ||
1✔
1069
                                        (device.GetAttrStr("state") != obj.GetAttrStr("state")) {
2✔
1070

1✔
1071
                                        cont.fabricPathLogger(node, obj).
1✔
1072
                                                Debug("Updating opflex device")
1✔
1073

1✔
1074
                                        devices = append(append(devices[:i], devices[i+1:]...), obj)
1✔
1075
                                        cont.nodeOpflexDevice[node] = devices
1✔
1076
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1077
                                        break
1✔
1078
                                }
1079
                        }
1080
                        if !found && obj.GetAttrStr("hostName") == node {
2✔
1081
                                cont.fabricPathLogger(node, obj).
1✔
1082
                                        Debug("Appending opflex device")
1✔
1083

1✔
1084
                                devices = append(devices, obj)
1✔
1085
                                cont.nodeOpflexDevice[node] = devices
1✔
1086
                                nodeUpdates = append(nodeUpdates, node)
1✔
1087
                        }
1✔
1088
                }
1089
                if !nodefound {
2✔
1090
                        node := obj.GetAttrStr("hostName")
1✔
1091
                        cont.fabricPathLogger(node, obj).Debug("Adding opflex device")
1✔
1092
                        cont.nodeOpflexDevice[node] = apicapi.ApicSlice{obj}
1✔
1093
                        nodeUpdates = append(nodeUpdates, node)
1✔
1094
                }
1✔
1095
                cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", cont.nodeOpflexDevice[obj.GetAttrStr("hostName")])
1✔
1096
                cont.indexMutex.Unlock()
1✔
1097

1✔
1098
                for _, node := range nodeUpdates {
2✔
1099
                        cont.env.NodeServiceChanged(node)
1✔
1100
                        cont.erspanSyncOpflexDev()
1✔
1101
                }
1✔
1102
                cont.updateDeviceCluster()
1✔
1103

1104
        }
1105
}
1106

1107
func (cont *AciController) postOpflexDeviceDelete(nodes []string) {
1✔
1108
        cont.updateDeviceCluster()
1✔
1109
        for _, node := range nodes {
2✔
1110
                cont.env.NodeServiceChanged(node)
1✔
1111
                cont.erspanSyncOpflexDev()
1✔
1112
        }
1✔
1113
}
1114

1115
func (cont *AciController) opflexDeviceDeleted(dn string) {
1✔
1116
        var nodeUpdates []string
1✔
1117
        var dnFound bool //to check if the dn belongs to this cluster
1✔
1118
        cont.indexMutex.Lock()
1✔
1119
        for node, devices := range cont.nodeOpflexDevice {
2✔
1120
                for i, device := range devices {
2✔
1121
                        if device.GetDn() != dn {
2✔
1122
                                continue
1✔
1123
                        }
1124
                        dnFound = true
1✔
1125
                        cont.fabricPathLogger(node, device).
1✔
1126
                                Debug("Deleting opflex device path")
1✔
1127
                        devices = append(devices[:i], devices[i+1:]...)
1✔
1128
                        cont.nodeOpflexDevice[node] = devices
1✔
1129
                        nodeUpdates = append(nodeUpdates, node)
1✔
1130
                        break
1✔
1131
                }
1132
                if len(devices) == 0 {
2✔
1133
                        delete(cont.nodeOpflexDevice, node)
1✔
1134
                }
1✔
1135
        }
1136
        cont.indexMutex.Unlock()
1✔
1137

1✔
1138
        if dnFound {
2✔
1139
                cont.postOpflexDeviceDelete(nodeUpdates)
1✔
1140
        }
1✔
1141
}
1142

1143
func (cont *AciController) writeApicSvc(key string, service *v1.Service) {
1✔
1144
        aobj := apicapi.NewVmmInjectedSvc(cont.vmmDomainProvider(),
1✔
1145
                cont.config.AciVmmDomain, cont.config.AciVmmController,
1✔
1146
                service.Namespace, service.Name)
1✔
1147
        aobjDn := aobj.GetDn()
1✔
1148
        aobj.SetAttr("guid", string(service.UID))
1✔
1149

1✔
1150
        svcns := service.ObjectMeta.Namespace
1✔
1151
        _, exists, err := cont.namespaceIndexer.GetByKey(svcns)
1✔
1152
        if err != nil {
1✔
1153
                cont.log.Error("Failed to lookup ns : ", svcns, " ", err)
×
1154
                return
×
1155
        }
×
1156
        if !exists {
2✔
1157
                cont.log.Debug("Namespace of service ", service.ObjectMeta.Name, ": ", svcns, " doesn't exist, hence not sending an update to the APIC")
1✔
1158
                return
1✔
1159
        }
1✔
1160

1161
        if !cont.serviceEndPoints.SetServiceApicObject(aobj, service) {
2✔
1162
                return
1✔
1163
        }
1✔
1164
        var setApicSvcDnsName bool
1✔
1165
        if len(cont.config.ApicHosts) != 0 && apicapi.ApicVersion >= "5.1" {
1✔
1166
                setApicSvcDnsName = true
×
1167
        }
×
1168
        // APIC model only allows one of these
1169
        for _, ingress := range service.Status.LoadBalancer.Ingress {
1✔
1170
                if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
1171
                        aobj.SetAttr("lbIp", ingress.IP)
×
1172
                } else if ingress.Hostname != "" {
×
1173
                        ipList, err := net.LookupHost(ingress.Hostname)
×
1174
                        if err == nil && len(ipList) > 0 {
×
1175
                                aobj.SetAttr("lbIp", ipList[0])
×
1176
                        } else {
×
1177
                                cont.log.Errorf("Lookup: err: %v, ipList: %+v", err, ipList)
×
1178
                        }
×
1179
                }
1180
                break
×
1181
        }
1182
        if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != "None" {
2✔
1183
                aobj.SetAttr("clusterIp", string(service.Spec.ClusterIP))
1✔
1184
        }
1✔
1185

1186
        var t string
1✔
1187
        switch service.Spec.Type {
1✔
1188
        case v1.ServiceTypeClusterIP:
×
1189
                t = "clusterIp"
×
1190
        case v1.ServiceTypeNodePort:
×
1191
                t = "nodePort"
×
1192
        case v1.ServiceTypeLoadBalancer:
1✔
1193
                t = "loadBalancer"
1✔
1194
        case v1.ServiceTypeExternalName:
×
1195
                t = "externalName"
×
1196
        }
1197
        if t != "" {
2✔
1198
                aobj.SetAttr("type", t)
1✔
1199
        }
1✔
1200

1201
        if setApicSvcDnsName || cont.config.Flavor == "k8s-overlay" {
1✔
1202
                dnsName := fmt.Sprintf("%s.%s.svc.cluster.local", service.Name, service.Namespace)
×
1203

×
1204
                for _, ingress := range service.Status.LoadBalancer.Ingress {
×
1205
                        if ingress.Hostname != "" {
×
1206
                                aobj.SetAttr("dnsName", ingress.Hostname)
×
1207
                        } else if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
1208
                                aobj.SetAttr("dnsName", dnsName)
×
1209
                        }
×
1210
                }
1211
                if t == "clusterIp" || t == "nodePort" || t == "externalName" {
×
1212
                        aobj.SetAttr("dnsName", dnsName)
×
1213
                }
×
1214
        }
1215
        for _, port := range service.Spec.Ports {
2✔
1216
                proto := getProtocolStr(port.Protocol)
1✔
1217
                p := apicapi.NewVmmInjectedSvcPort(aobjDn,
1✔
1218
                        strconv.Itoa(int(port.Port)), proto, port.TargetPort.String())
1✔
1219
                p.SetAttr("nodePort", strconv.Itoa(int(port.NodePort)))
1✔
1220
                aobj.AddChild(p)
1✔
1221
        }
1✔
1222
        if cont.config.EnableVmmInjectedLabels && service.ObjectMeta.Labels != nil && apicapi.ApicVersion >= "5.2" {
1✔
1223
                for key, val := range service.ObjectMeta.Labels {
×
1224
                        newLabelKey := cont.aciNameForKey("label", key)
×
1225
                        label := apicapi.NewVmmInjectedLabel(aobj.GetDn(),
×
1226
                                newLabelKey, val)
×
1227
                        aobj.AddChild(label)
×
1228
                }
×
1229
        }
1230
        name := cont.aciNameForKey("service-vmm", key)
1✔
1231
        cont.log.Debug("Write Service Object: ", aobj)
1✔
1232
        cont.apicConn.WriteApicObjects(name, apicapi.ApicSlice{aobj})
1✔
1233
        cont.log.Debugf("svcObject: %+v", aobj)
1✔
1234
}
1235

1236
func (cont *AciController) allocateServiceIps(servicekey string,
1237
        service *v1.Service) bool {
1✔
1238
        logger := serviceLogger(cont.log, service)
1✔
1239

1✔
1240
        cont.indexMutex.Lock()
1✔
1241
        meta, ok := cont.serviceMetaCache[servicekey]
1✔
1242
        if !ok {
2✔
1243
                meta = &serviceMeta{}
1✔
1244
                cont.serviceMetaCache[servicekey] = meta
1✔
1245

1✔
1246
                // Read any existing IPs and attempt to allocate them to the pod
1✔
1247
                for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
1248
                        ip := net.ParseIP(ingress.IP)
1✔
1249
                        if ip == nil {
1✔
1250
                                continue
×
1251
                        }
1252
                        if ip.To4() != nil {
2✔
1253
                                if cont.serviceIps.GetV4IpCache()[0].RemoveIp(ip) {
2✔
1254
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
1255
                                } else if cont.staticServiceIps.V4.RemoveIp(ip) {
3✔
1256
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
1257
                                }
1✔
1258
                        } else if ip.To16() != nil {
2✔
1259
                                if cont.serviceIps.GetV6IpCache()[0].RemoveIp(ip) {
2✔
1260
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
1261
                                } else if cont.staticServiceIps.V6.RemoveIp(ip) {
3✔
1262
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
1263
                                }
1✔
1264
                        }
1265
                }
1266
        }
1267

1268
        if !cont.serviceSyncEnabled {
2✔
1269
                cont.indexMutex.Unlock()
1✔
1270
                return false
1✔
1271
        }
1✔
1272

1273
        // try to give the requested load balancer IP to the pod
1274
        requestedIp := net.ParseIP(service.Spec.LoadBalancerIP)
1✔
1275
        if requestedIp != nil {
2✔
1276
                hasRequestedIp := false
1✔
1277
                for _, ip := range meta.ingressIps {
2✔
1278
                        if reflect.DeepEqual(requestedIp, ip) {
1✔
1279
                                hasRequestedIp = true
×
1280
                        }
×
1281
                }
1282
                if !hasRequestedIp {
2✔
1283
                        if requestedIp.To4() != nil &&
1✔
1284
                                cont.staticServiceIps.V4.RemoveIp(requestedIp) {
2✔
1285
                                hasRequestedIp = true
1✔
1286
                        } else if requestedIp.To16() != nil &&
2✔
1287
                                cont.staticServiceIps.V6.RemoveIp(requestedIp) {
2✔
1288
                                hasRequestedIp = true
1✔
1289
                        }
1✔
1290
                }
1291
                if hasRequestedIp {
2✔
1292
                        cont.returnServiceIps(meta.ingressIps)
1✔
1293
                        meta.ingressIps = nil
1✔
1294
                        meta.staticIngressIps = []net.IP{requestedIp}
1✔
1295
                        meta.requestedIp = requestedIp
1✔
1296
                }
1✔
1297
        } else if meta.requestedIp != nil {
1✔
1298
                meta.requestedIp = nil
×
1299
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
×
1300
                meta.staticIngressIps = nil
×
1301
        }
×
1302

1303
        if len(meta.ingressIps) == 0 && len(meta.staticIngressIps) == 0 {
2✔
1304
                meta.ingressIps = []net.IP{}
1✔
1305

1✔
1306
                ipv4, _ := cont.serviceIps.AllocateIp(true)
1✔
1307
                if ipv4 != nil {
2✔
1308
                        meta.ingressIps = append(meta.ingressIps, ipv4)
1✔
1309
                }
1✔
1310
                ipv6, _ := cont.serviceIps.AllocateIp(false)
1✔
1311
                if ipv6 != nil {
2✔
1312
                        meta.ingressIps = append(meta.ingressIps, ipv6)
1✔
1313
                }
1✔
1314
                if ipv4 == nil && ipv6 == nil {
2✔
1315
                        logger.Error("No IP addresses available for service")
1✔
1316
                        cont.indexMutex.Unlock()
1✔
1317
                        return true
1✔
1318
                }
1✔
1319
        }
1320
        cont.indexMutex.Unlock()
1✔
1321

1✔
1322
        var newIngress []v1.LoadBalancerIngress
1✔
1323
        for _, ip := range meta.ingressIps {
2✔
1324
                newIngress = append(newIngress, v1.LoadBalancerIngress{IP: ip.String()})
1✔
1325
        }
1✔
1326
        for _, ip := range meta.staticIngressIps {
2✔
1327
                newIngress = append(newIngress, v1.LoadBalancerIngress{IP: ip.String()})
1✔
1328
        }
1✔
1329

1330
        if !reflect.DeepEqual(newIngress, service.Status.LoadBalancer.Ingress) {
2✔
1331
                service.Status.LoadBalancer.Ingress = newIngress
1✔
1332

1✔
1333
                _, err := cont.updateServiceStatus(service)
1✔
1334
                if err != nil {
1✔
1335
                        logger.Error("Failed to update service: ", err)
×
1336
                        return true
×
1337
                } else {
1✔
1338
                        logger.WithFields(logrus.Fields{
1✔
1339
                                "status": service.Status.LoadBalancer.Ingress,
1✔
1340
                        }).Info("Updated service load balancer status")
1✔
1341
                }
1✔
1342
        }
1343
        return false
1✔
1344
}
1345

1346
func (cont *AciController) handleServiceUpdate(service *v1.Service) bool {
1✔
1347
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
1348
        if err != nil {
1✔
1349
                serviceLogger(cont.log, service).
×
1350
                        Error("Could not create service key: ", err)
×
1351
                return false
×
1352
        }
×
1353

1354
        var requeue bool
1✔
1355
        isLoadBalancer := service.Spec.Type == v1.ServiceTypeLoadBalancer
1✔
1356
        aciLB := cont.config.LBType == lbTypeAci
1✔
1357
        if isLoadBalancer && aciLB {
2✔
1358
                if *cont.config.AllocateServiceIps {
2✔
1359
                        requeue = cont.allocateServiceIps(servicekey, service)
1✔
1360
                }
1✔
1361
                cont.indexMutex.Lock()
1✔
1362
                if cont.serviceSyncEnabled {
2✔
1363
                        cont.indexMutex.Unlock()
1✔
1364
                        err = cont.updateServiceDeviceInstance(servicekey, service)
1✔
1365
                        if err != nil {
1✔
1366
                                serviceLogger(cont.log, service).
×
1367
                                        Error("Failed to update service device Instance: ", err)
×
1368
                                return true
×
1369
                        }
×
1370
                } else {
1✔
1371
                        cont.indexMutex.Unlock()
1✔
1372
                }
1✔
1373
        } else if aciLB {
2✔
1374
                cont.clearLbService(servicekey)
1✔
1375
        }
1✔
1376

1377
        cont.writeApicSvc(servicekey, service)
1✔
1378
        return requeue
1✔
1379
}
1380

1381
func (cont *AciController) clearLbService(servicekey string) {
1✔
1382
        cont.indexMutex.Lock()
1✔
1383
        if meta, ok := cont.serviceMetaCache[servicekey]; ok {
2✔
1384
                cont.returnServiceIps(meta.ingressIps)
1✔
1385
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
1✔
1386
                delete(cont.serviceMetaCache, servicekey)
1✔
1387
        }
1✔
1388
        cont.indexMutex.Unlock()
1✔
1389
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("svc", servicekey))
1✔
1390
}
1391

1392
func getEndpointsIps(endpoints *v1.Endpoints) map[string]bool {
1✔
1393
        ips := make(map[string]bool)
1✔
1394
        for _, subset := range endpoints.Subsets {
2✔
1395
                for _, addr := range subset.Addresses {
2✔
1396
                        ips[addr.IP] = true
1✔
1397
                }
1✔
1398
                for _, addr := range subset.NotReadyAddresses {
1✔
1399
                        ips[addr.IP] = true
×
1400
                }
×
1401
        }
1402
        return ips
1✔
1403
}
1404

1405
func servicePortKey(p *v1.ServicePort) string {
×
1406
        return portProto(&p.Protocol) + "-num-" + strconv.Itoa(int(p.Port))
×
1407
}
×
1408

1409
func getServiceTargetPorts(service *v1.Service) map[string]targetPort {
1✔
1410
        ports := make(map[string]targetPort)
1✔
1411
        for _, port := range service.Spec.Ports {
2✔
1412
                portNum := port.TargetPort.IntValue()
1✔
1413
                if portNum <= 0 {
2✔
1414
                        portNum = int(port.Port)
1✔
1415
                }
1✔
1416
                key := portProto(&port.Protocol) + "-num-" + strconv.Itoa(int(portNum))
1✔
1417
                ports[key] = targetPort{
1✔
1418
                        proto: port.Protocol,
1✔
1419
                        ports: []int{portNum},
1✔
1420
                }
1✔
1421
        }
1422
        return ports
1✔
1423
}
1424

1425
func (cont *AciController) endpointsAdded(obj interface{}) {
1✔
1426
        endpoints := obj.(*v1.Endpoints)
1✔
1427
        servicekey, err := cache.MetaNamespaceKeyFunc(obj.(*v1.Endpoints))
1✔
1428
        if err != nil {
1✔
1429
                cont.log.Error("Could not create service key: ", err)
×
1430
                return
×
1431
        }
×
1432

1433
        ips := getEndpointsIps(endpoints)
1✔
1434
        cont.indexMutex.Lock()
1✔
1435
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
1436
        cont.queueIPNetPolUpdates(ips)
1✔
1437
        cont.indexMutex.Unlock()
1✔
1438

1✔
1439
        cont.queueEndpointsNetPolUpdates(endpoints)
1✔
1440

1✔
1441
        cont.queueServiceUpdateByKey(servicekey)
1✔
1442
}
1443

1444
func (cont *AciController) endpointsDeleted(obj interface{}) {
1✔
1445
        endpoints, isEndpoints := obj.(*v1.Endpoints)
1✔
1446
        if !isEndpoints {
1✔
1447
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
1448
                if !ok {
×
1449
                        cont.log.Error("Received unexpected object: ", obj)
×
1450
                        return
×
1451
                }
×
1452
                endpoints, ok = deletedState.Obj.(*v1.Endpoints)
×
1453
                if !ok {
×
1454
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpoints object: ", deletedState.Obj)
×
1455
                        return
×
1456
                }
×
1457
        }
1458
        servicekey, err := cache.MetaNamespaceKeyFunc(endpoints)
1✔
1459
        if err != nil {
1✔
1460
                cont.log.Error("Could not create service key: ", err)
×
1461
                return
×
1462
        }
×
1463

1464
        ips := getEndpointsIps(endpoints)
1✔
1465
        cont.indexMutex.Lock()
1✔
1466
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
1467
        cont.queueIPNetPolUpdates(ips)
1✔
1468
        cont.indexMutex.Unlock()
1✔
1469

1✔
1470
        cont.queueEndpointsNetPolUpdates(endpoints)
1✔
1471

1✔
1472
        cont.queueServiceUpdateByKey(servicekey)
1✔
1473
}
1474

1475
func (cont *AciController) endpointsUpdated(old interface{}, new interface{}) {
1✔
1476
        oldendpoints := old.(*v1.Endpoints)
1✔
1477
        newendpoints := new.(*v1.Endpoints)
1✔
1478
        servicekey, err := cache.MetaNamespaceKeyFunc(newendpoints)
1✔
1479
        if err != nil {
1✔
1480
                cont.log.Error("Could not create service key: ", err)
×
1481
                return
×
1482
        }
×
1483

1484
        oldIps := getEndpointsIps(oldendpoints)
1✔
1485
        newIps := getEndpointsIps(newendpoints)
1✔
1486
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
1487
                cont.indexMutex.Lock()
1✔
1488
                cont.queueIPNetPolUpdates(oldIps)
1✔
1489
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
1490
                cont.queueIPNetPolUpdates(newIps)
1✔
1491
                cont.indexMutex.Unlock()
1✔
1492
        }
1✔
1493

1494
        if !reflect.DeepEqual(oldendpoints.Subsets, newendpoints.Subsets) {
2✔
1495
                cont.queueEndpointsNetPolUpdates(oldendpoints)
1✔
1496
                cont.queueEndpointsNetPolUpdates(newendpoints)
1✔
1497
        }
1✔
1498

1499
        cont.queueServiceUpdateByKey(servicekey)
1✔
1500
}
1501

1502
func (cont *AciController) serviceAdded(obj interface{}) {
1✔
1503
        service := obj.(*v1.Service)
1✔
1504
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
1505
        if err != nil {
1✔
1506
                serviceLogger(cont.log, service).
×
1507
                        Error("Could not create service key: ", err)
×
1508
                return
×
1509
        }
×
1510

1511
        ports := getServiceTargetPorts(service)
1✔
1512
        cont.indexMutex.Lock()
1✔
1513
        cont.queuePortNetPolUpdates(ports)
1✔
1514
        cont.updateTargetPortIndex(true, servicekey, nil, ports)
1✔
1515
        cont.indexMutex.Unlock()
1✔
1516

1✔
1517
        cont.queueServiceUpdateByKey(servicekey)
1✔
1518
}
1519

1520
func (cont *AciController) serviceUpdated(old interface{}, new interface{}) {
1✔
1521
        oldservice := old.(*v1.Service)
1✔
1522
        newservice := new.(*v1.Service)
1✔
1523
        servicekey, err := cache.MetaNamespaceKeyFunc(newservice)
1✔
1524
        if err != nil {
1✔
1525
                serviceLogger(cont.log, newservice).
×
1526
                        Error("Could not create service key: ", err)
×
1527
                return
×
1528
        }
×
1529
        oldPorts := getServiceTargetPorts(oldservice)
1✔
1530
        newPorts := getServiceTargetPorts(newservice)
1✔
1531
        if !reflect.DeepEqual(oldPorts, newPorts) {
1✔
1532
                cont.indexMutex.Lock()
×
1533
                cont.queuePortNetPolUpdates(oldPorts)
×
1534
                cont.updateTargetPortIndex(true, servicekey, oldPorts, newPorts)
×
1535
                cont.queuePortNetPolUpdates(newPorts)
×
1536
                cont.indexMutex.Unlock()
×
1537
        }
×
1538

1539
        cont.queueServiceUpdateByKey(servicekey)
1✔
1540
}
1541

1542
func (cont *AciController) serviceDeleted(obj interface{}) {
1✔
1543
        service, isService := obj.(*v1.Service)
1✔
1544
        if !isService {
1✔
1545
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
1546
                if !ok {
×
1547
                        serviceLogger(cont.log, service).
×
1548
                                Error("Received unexpected object: ", obj)
×
1549
                        return
×
1550
                }
×
1551
                service, ok = deletedState.Obj.(*v1.Service)
×
1552
                if !ok {
×
1553
                        serviceLogger(cont.log, service).
×
1554
                                Error("DeletedFinalStateUnknown contained non-Services object: ", deletedState.Obj)
×
1555
                        return
×
1556
                }
×
1557
        }
1558
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
1559
        if err != nil {
1✔
1560
                serviceLogger(cont.log, service).
×
1561
                        Error("Could not create service key: ", err)
×
1562
                return
×
1563
        }
×
1564
        cont.clearLbService(servicekey)
1✔
1565
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("service-vmm",
1✔
1566
                servicekey))
1✔
1567

1✔
1568
        ports := getServiceTargetPorts(service)
1✔
1569
        cont.indexMutex.Lock()
1✔
1570
        cont.updateTargetPortIndex(true, servicekey, ports, nil)
1✔
1571
        cont.queuePortNetPolUpdates(ports)
1✔
1572
        delete(cont.snatServices, servicekey)
1✔
1573
        cont.indexMutex.Unlock()
1✔
1574
}
1575

1576
func (cont *AciController) serviceFullSync() {
1✔
1577
        cache.ListAll(cont.serviceIndexer, labels.Everything(),
1✔
1578
                func(sobj interface{}) {
2✔
1579
                        cont.queueServiceUpdate(sobj.(*v1.Service))
1✔
1580
                })
1✔
1581
}
1582

1583
func (cont *AciController) getEndpointSliceIps(endpointSlice *v1beta1.EndpointSlice) map[string]bool {
1✔
1584
        ips := make(map[string]bool)
1✔
1585
        for _, endpoints := range endpointSlice.Endpoints {
2✔
1586
                for _, addr := range endpoints.Addresses {
2✔
1587
                        ips[addr] = true
1✔
1588
                }
1✔
1589
        }
1590
        return ips
1✔
1591
}
1592

1593
func (cont *AciController) notReadyEndpointPresent(endpointSlice *v1beta1.EndpointSlice) bool {
×
1594
        for _, endpoints := range endpointSlice.Endpoints {
×
1595
                if (endpoints.Conditions.Ready != nil && !*endpoints.Conditions.Ready) &&
×
1596
                        (endpoints.Conditions.Terminating != nil && !*endpoints.Conditions.Terminating) {
×
1597
                        return true
×
1598
                }
×
1599
        }
1600
        return false
×
1601
}
1602

1603
func (cont *AciController) getEndpointSliceEpIps(endpoints v1beta1.Endpoint) map[string]bool {
×
1604
        ips := make(map[string]bool)
×
1605
        for _, addr := range endpoints.Addresses {
×
1606
                ips[addr] = true
×
1607
        }
×
1608
        return ips
×
1609
}
1610

1611
func (cont *AciController) processDelayedEpSlices() {
1✔
1612
        var processEps []DelayedEpSlice
1✔
1613
        cont.indexMutex.Lock()
1✔
1614
        for i := 0; i < len(cont.delayedEpSlices); i++ {
1✔
1615
                delayedepslice := cont.delayedEpSlices[i]
×
1616
                if time.Now().After(delayedepslice.DelayedTime) {
×
1617
                        var toprocess DelayedEpSlice
×
1618
                        err := util.DeepCopyObj(&delayedepslice, &toprocess)
×
1619
                        if err != nil {
×
1620
                                cont.log.Error(err)
×
1621
                                continue
×
1622
                        }
1623
                        processEps = append(processEps, toprocess)
×
1624
                        cont.delayedEpSlices = append(cont.delayedEpSlices[:i], cont.delayedEpSlices[i+1:]...)
×
1625
                }
1626
        }
1627

1628
        cont.indexMutex.Unlock()
1✔
1629
        for _, epslice := range processEps {
1✔
1630
                //ignore the epslice if newly added endpoint is not ready
×
1631
                if cont.notReadyEndpointPresent(epslice.NewEpSlice) {
×
1632
                        cont.log.Debug("Ignoring the update as the new endpoint is not ready : ", epslice.NewEpSlice)
×
1633
                } else {
×
1634
                        cont.log.Debug("Processing update of epslice : ", epslice.NewEpSlice)
×
1635
                        cont.doendpointSliceUpdated(epslice.OldEpSlice, epslice.NewEpSlice)
×
1636
                }
×
1637
        }
1638
}
1639

1640
func (cont *AciController) endpointSliceAdded(obj interface{}) {
1✔
1641
        endpointslice, ok := obj.(*v1beta1.EndpointSlice)
1✔
1642
        if !ok {
1✔
1643
                cont.log.Error("error processing Endpointslice object: ", obj)
×
1644
                return
×
1645
        }
×
1646
        servicekey, valid := getServiceKey(endpointslice)
1✔
1647
        if !valid {
1✔
1648
                return
×
1649
        }
×
1650
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
1651
        cont.indexMutex.Lock()
1✔
1652
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
1653
        cont.queueIPNetPolUpdates(ips)
1✔
1654
        cont.indexMutex.Unlock()
1✔
1655

1✔
1656
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
1657

1✔
1658
        cont.queueServiceUpdateByKey(servicekey)
1✔
1659
        cont.log.Info("EndPointSlice Object Added: ", servicekey)
1✔
1660
}
1661

1662
func (cont *AciController) endpointSliceDeleted(obj interface{}) {
1✔
1663
        endpointslice, isEndpointslice := obj.(*v1beta1.EndpointSlice)
1✔
1664
        if !isEndpointslice {
1✔
1665
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
1666
                if !ok {
×
1667
                        cont.log.Error("Received unexpected object: ", obj)
×
1668
                        return
×
1669
                }
×
1670
                endpointslice, ok = deletedState.Obj.(*v1beta1.EndpointSlice)
×
1671
                if !ok {
×
1672
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpointslice object: ", deletedState.Obj)
×
1673
                        return
×
1674
                }
×
1675
        }
1676
        servicekey, valid := getServiceKey(endpointslice)
1✔
1677
        if !valid {
1✔
1678
                return
×
1679
        }
×
1680
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
1681
        cont.indexMutex.Lock()
1✔
1682
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
1683
        cont.queueIPNetPolUpdates(ips)
1✔
1684
        cont.indexMutex.Unlock()
1✔
1685
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
1686
        cont.queueServiceUpdateByKey(servicekey)
1✔
1687
}
1688

1689
// Checks if the given service is present in the user configured list of services
1690
// for pbr delay and if present, returns the servie specific delay if configured
1691
func (cont *AciController) svcInAddDelayList(name, ns string) (int, bool) {
×
1692
        for _, svc := range cont.config.ServiceGraphEndpointAddDelay.Services {
×
1693
                if svc.Name == name && svc.Namespace == ns {
×
1694
                        return svc.Delay, true
×
1695
                }
×
1696
        }
1697
        return 0, false
×
1698
}
1699

1700
// Check if the endpointslice update notification has any deletion of enpoint
1701
func (cont *AciController) isDeleteEndpointSlice(oldendpointslice, newendpointslice *v1beta1.EndpointSlice) bool {
×
1702
        del := false
×
1703

×
1704
        // if any endpoint is removed from endpontslice
×
1705
        if len(newendpointslice.Endpoints) < len(newendpointslice.Endpoints) {
×
1706
                del = true
×
1707
        }
×
1708

1709
        if !del {
×
1710
                // if any one of the endpoint is in terminating state
×
1711
                for _, endpoint := range newendpointslice.Endpoints {
×
1712
                        if endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating {
×
1713
                                del = true
×
1714
                                break
×
1715
                        }
1716
                }
1717
        }
1718
        if !del {
×
1719
                // if any one of endpoint moved from ready state to not-ready state
×
1720
                for _, oldendpoint := range oldendpointslice.Endpoints {
×
1721
                        oldips := cont.getEndpointSliceEpIps(oldendpoint)
×
1722
                        for _, newendpoint := range newendpointslice.Endpoints {
×
1723
                                newips := cont.getEndpointSliceEpIps(newendpoint)
×
1724
                                if reflect.DeepEqual(oldips, newips) {
×
1725
                                        if (oldendpoint.Conditions.Ready != nil && *oldendpoint.Conditions.Ready) &&
×
1726
                                                (newendpoint.Conditions.Ready != nil && !*newendpoint.Conditions.Ready) {
×
1727
                                                del = true
×
1728
                                        }
×
1729
                                        break
×
1730
                                }
1731
                        }
1732
                }
1733
        }
1734
        return del
×
1735
}
1736

1737
func (cont *AciController) doendpointSliceUpdatedDelay(oldendpointslice *v1beta1.EndpointSlice,
1738
        newendpointslice *v1beta1.EndpointSlice) {
×
1739
        svc, ns, valid := getServiceNameAndNs(newendpointslice)
×
1740
        if !valid {
×
1741
                return
×
1742
        }
×
1743
        svckey, valid := getServiceKey(newendpointslice)
×
1744
        if !valid {
×
1745
                return
×
1746
        }
×
1747
        delay := cont.config.ServiceGraphEndpointAddDelay.Delay
×
1748
        svcDelay, exists := cont.svcInAddDelayList(svc, ns)
×
1749
        if svcDelay > 0 {
×
1750
                delay = svcDelay
×
1751
        }
×
1752
        var delayedsvc bool
×
1753
        delayedsvc = exists && delay > 0
×
1754
        if delayedsvc {
×
1755
                cont.log.Debug("Delay of ", delay, " seconds is applicable for svc :", svc, " in ns: ", ns)
×
1756
                var delayedepslice DelayedEpSlice
×
1757
                delayedepslice.OldEpSlice = oldendpointslice
×
1758
                delayedepslice.ServiceKey = svckey
×
1759
                delayedepslice.NewEpSlice = newendpointslice
×
1760
                currentTime := time.Now()
×
1761
                delayedepslice.DelayedTime = currentTime.Add(time.Duration(delay) * time.Second)
×
1762
                cont.indexMutex.Lock()
×
1763
                cont.delayedEpSlices = append(cont.delayedEpSlices, &delayedepslice)
×
1764
                cont.indexMutex.Unlock()
×
1765
        } else {
×
1766
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
1767
        }
×
1768

1769
        if delayedsvc && cont.isDeleteEndpointSlice(oldendpointslice, newendpointslice) {
×
1770
                cont.log.Debug("Proceeding by ignoring delay as the update is due to delete of endpoint")
×
1771
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
1772
        }
×
1773
        return
×
1774
}
1775
func (cont *AciController) endpointSliceUpdated(oldobj interface{}, newobj interface{}) {
1✔
1776
        oldendpointslice, ok := oldobj.(*v1beta1.EndpointSlice)
1✔
1777
        if !ok {
1✔
1778
                cont.log.Error("error processing Endpointslice object: ", oldobj)
×
1779
                return
×
1780
        }
×
1781
        newendpointslice, ok := newobj.(*v1beta1.EndpointSlice)
1✔
1782
        if !ok {
1✔
1783
                cont.log.Error("error processing Endpointslice object: ", newobj)
×
1784
                return
×
1785
        }
×
1786
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
1787
                cont.doendpointSliceUpdatedDelay(oldendpointslice, newendpointslice)
×
1788
        } else {
1✔
1789
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
1✔
1790
        }
1✔
1791
}
1792

1793
func (cont *AciController) doendpointSliceUpdated(oldendpointslice *v1beta1.EndpointSlice,
1794
        newendpointslice *v1beta1.EndpointSlice) {
1✔
1795
        servicekey, valid := getServiceKey(newendpointslice)
1✔
1796
        if !valid {
1✔
1797
                return
×
1798
        }
×
1799
        oldIps := cont.getEndpointSliceIps(oldendpointslice)
1✔
1800
        newIps := cont.getEndpointSliceIps(newendpointslice)
1✔
1801
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
1802
                cont.indexMutex.Lock()
1✔
1803
                cont.queueIPNetPolUpdates(oldIps)
1✔
1804
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
1805
                cont.queueIPNetPolUpdates(newIps)
1✔
1806
                cont.indexMutex.Unlock()
1✔
1807
        }
1✔
1808

1809
        if !reflect.DeepEqual(oldendpointslice.Endpoints, newendpointslice.Endpoints) {
2✔
1810
                cont.queueEndpointSliceNetPolUpdates(oldendpointslice)
1✔
1811
                cont.queueEndpointSliceNetPolUpdates(newendpointslice)
1✔
1812
        }
1✔
1813
        cont.log.Debug("EndPointSlice Object Update: ", servicekey)
1✔
1814
        cont.queueServiceUpdateByKey(servicekey)
1✔
1815
}
1816

1817
func (cont *AciController) queueEndpointSliceNetPolUpdates(endpointslice *v1beta1.EndpointSlice) {
1✔
1818
        for _, endpoint := range endpointslice.Endpoints {
2✔
1819
                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" ||
1✔
1820
                        endpoint.TargetRef.Namespace == "" || endpoint.TargetRef.Name == "" {
2✔
1821
                        continue
1✔
1822
                }
1823
                if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
1✔
1824
                        continue
×
1825
                }
1826
                podkey := endpoint.TargetRef.Namespace + "/" + endpoint.TargetRef.Name
1✔
1827
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
1828
                ps := make(map[string]bool)
1✔
1829
                for _, npkey := range npkeys {
2✔
1830
                        cont.queueNetPolUpdateByKey(npkey)
1✔
1831
                }
1✔
1832
                // Process if the  any matching namedport wildcard policy is present
1833
                // ignore np already processed policies
1834
                cont.queueMatchingNamedNp(ps, podkey)
1✔
1835
        }
1836
}
1837

1838
func getServiceKey(endPointSlice *v1beta1.EndpointSlice) (string, bool) {
1✔
1839
        serviceName, ok := endPointSlice.Labels[v1beta1.LabelServiceName]
1✔
1840
        if !ok {
1✔
1841
                return "", false
×
1842
        }
×
1843
        return endPointSlice.ObjectMeta.Namespace + "/" + serviceName, true
1✔
1844
}
1845

1846
func getServiceNameAndNs(endPointSlice *v1beta1.EndpointSlice) (string, string, bool) {
×
1847
        serviceName, ok := endPointSlice.Labels[v1beta1.LabelServiceName]
×
1848
        if !ok {
×
1849
                return "", "", false
×
1850
        }
×
1851
        return serviceName, endPointSlice.ObjectMeta.Namespace, true
×
1852
}
1853

1854
// can be called with index lock
1855
func (sep *serviceEndpoint) UpdateServicesForNode(nodename string) {
1✔
1856
        cont := sep.cont
1✔
1857
        cache.ListAll(cont.endpointsIndexer, labels.Everything(),
1✔
1858
                func(endpointsobj interface{}) {
2✔
1859
                        endpoints := endpointsobj.(*v1.Endpoints)
1✔
1860
                        for _, subset := range endpoints.Subsets {
2✔
1861
                                for _, addr := range subset.Addresses {
2✔
1862
                                        if addr.NodeName != nil && *addr.NodeName == nodename {
2✔
1863
                                                servicekey, err :=
1✔
1864
                                                        cache.MetaNamespaceKeyFunc(endpointsobj.(*v1.Endpoints))
1✔
1865
                                                if err != nil {
1✔
1866
                                                        cont.log.Error("Could not create endpoints key: ", err)
×
1867
                                                        return
×
1868
                                                }
×
1869
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
1870
                                                return
1✔
1871
                                        }
1872
                                }
1873
                        }
1874
                })
1875
}
1876

1877
func (seps *serviceEndpointSlice) UpdateServicesForNode(nodename string) {
1✔
1878
        // 1. List all the endpointslice and check for matching nodename
1✔
1879
        // 2. if it matches trigger the Service update and mark it visited
1✔
1880
        cont := seps.cont
1✔
1881
        visited := make(map[string]bool)
1✔
1882
        cache.ListAll(cont.endpointSliceIndexer, labels.Everything(),
1✔
1883
                func(endpointSliceobj interface{}) {
2✔
1884
                        endpointSlices := endpointSliceobj.(*v1beta1.EndpointSlice)
1✔
1885
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
1886
                                _, ok := endpoint.Topology["kubernetes.io/hostname"]
1✔
1887
                                if ok && endpoint.Topology["kubernetes.io/hostname"] == nodename {
2✔
1888
                                        servicekey, valid := getServiceKey(endpointSlices)
1✔
1889
                                        if !valid {
1✔
1890
                                                return
×
1891
                                        }
×
1892
                                        if _, ok := visited[servicekey]; !ok {
2✔
1893
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
1894
                                                visited[servicekey] = true
1✔
1895
                                                return
1✔
1896
                                        }
1✔
1897
                                }
1898
                        }
1899
                })
1900
}
1901
func (cont *AciController) setNodeMap(nodeMap map[string]*metadata.ServiceEndpoint, nodeName string) {
1✔
1902
        nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
1903
        if !ok {
1✔
1904
                return
×
1905
        }
×
1906
        _, ok = cont.fabricPathForNode(nodeName)
1✔
1907
        if !ok {
2✔
1908
                return
1✔
1909
        }
1✔
1910
        nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
1911

1912
}
1913

1914
// 2 cases when epslices corresponding to given service is presnt in delayedEpSlices:
1915
//  1. endpoint not present in delayedEpSlices of the service
1916
//  2. endpoint present in delayedEpSlices of the service but in not ready state
1917
//
1918
// indexMutex lock must be acquired before calling the function
1919
func (cont *AciController) isDelayedEndpoint(endpoint v1beta1.Endpoint, svckey string) bool {
×
1920
        delayed := false
×
1921
        endpointips := cont.getEndpointSliceEpIps(endpoint)
×
1922
        for _, delayedepslices := range cont.delayedEpSlices {
×
1923
                if delayedepslices.ServiceKey == svckey {
×
1924
                        var found bool
×
1925
                        epslice := delayedepslices.OldEpSlice
×
1926
                        for _, ep := range epslice.Endpoints {
×
1927
                                epips := cont.getEndpointSliceEpIps(ep)
×
1928
                                if reflect.DeepEqual(endpointips, epips) {
×
1929
                                        // case 2
×
1930
                                        if ep.Conditions.Ready != nil && !*ep.Conditions.Ready {
×
1931
                                                delayed = true
×
1932
                                        }
×
1933
                                        found = true
×
1934
                                }
1935
                        }
1936
                        // case 1
1937
                        if !found {
×
1938
                                delayed = true
×
1939
                        }
×
1940
                }
1941
        }
1942
        return delayed
×
1943
}
1944

1945
// set nodemap only if endoint is ready and not in delayedEpSlices
1946
func (cont *AciController) setNodeMapDelay(nodeMap map[string]*metadata.ServiceEndpoint,
1947
        endpoint v1beta1.Endpoint, service *v1.Service) {
×
1948

×
1949
        svckey, err := cache.MetaNamespaceKeyFunc(service)
×
1950
        if err != nil {
×
1951
                cont.log.Error("Could not create service key: ", err)
×
1952
                return
×
1953
        }
×
1954
        if cont.config.NoWaitForServiceEpReadiness ||
×
1955
                (endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready) {
×
1956

×
1957
                nodeName, ok := endpoint.Topology["kubernetes.io/hostname"]
×
1958
                if ok {
×
1959
                        // donot setNodeMap for endpoint if:
×
1960
                        //   endpoint is newly added
×
1961
                        //   endpoint status changed from not ready to ready
×
1962
                        if !cont.isDelayedEndpoint(endpoint, svckey) {
×
1963
                                cont.setNodeMap(nodeMap, nodeName)
×
1964
                        }
×
1965
                }
1966
        }
1967
}
1968

1969
func (sep *serviceEndpoint) GetnodesMetadata(key string,
1970
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
1971
        cont := sep.cont
1✔
1972
        endpointsobj, exists, err := cont.endpointsIndexer.GetByKey(key)
1✔
1973
        if err != nil {
1✔
1974
                cont.log.Error("Could not lookup endpoints for " +
×
1975
                        key + ": " + err.Error())
×
1976
        }
×
1977
        if exists && endpointsobj != nil {
2✔
1978
                endpoints := endpointsobj.(*v1.Endpoints)
1✔
1979
                for _, subset := range endpoints.Subsets {
2✔
1980
                        for _, addr := range subset.Addresses {
2✔
1981
                                if addr.NodeName == nil {
2✔
1982
                                        continue
1✔
1983
                                }
1984
                                cont.setNodeMap(nodeMap, *addr.NodeName)
1✔
1985
                        }
1986
                }
1987
        }
1988
        cont.log.Info("NodeMap: ", nodeMap)
1✔
1989
}
1990

1991
func (seps *serviceEndpointSlice) GetnodesMetadata(key string,
1992
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
1993
        cont := seps.cont
1✔
1994
        // 1. Get all the Endpoint slices matching the label service-name
1✔
1995
        // 2. update the node map matching with endpoints nodes name
1✔
1996
        label := map[string]string{"kubernetes.io/service-name": service.ObjectMeta.Name}
1✔
1997
        selector := labels.SelectorFromSet(labels.Set(label))
1✔
1998
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
1999
                func(endpointSliceobj interface{}) {
2✔
2000
                        endpointSlices := endpointSliceobj.(*v1beta1.EndpointSlice)
1✔
2001
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2002
                                if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
2003
                                        cont.setNodeMapDelay(nodeMap, endpoint, service)
×
2004
                                } else if cont.config.NoWaitForServiceEpReadiness ||
1✔
2005
                                        (endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready) {
2✔
2006
                                        nodeName, ok := endpoint.Topology["kubernetes.io/hostname"]
1✔
2007
                                        if ok {
2✔
2008
                                                cont.setNodeMap(nodeMap, nodeName)
1✔
2009
                                        }
1✔
2010
                                }
2011
                        }
2012
                })
2013
        cont.log.Debug("NodeMap: ", nodeMap)
1✔
2014
}
2015

2016
func (sep *serviceEndpoint) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2017
        cont := sep.cont
1✔
2018
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
2019
        if err != nil {
1✔
2020
                serviceLogger(cont.log, service).
×
2021
                        Error("Could not create service key: ", err)
×
2022
                return false
×
2023
        }
×
2024
        endpointsobj, _, err := cont.endpointsIndexer.GetByKey(key)
1✔
2025
        if err != nil {
1✔
2026
                cont.log.Error("Could not lookup endpoints for " +
×
2027
                        key + ": " + err.Error())
×
2028
                return false
×
2029
        }
×
2030
        if endpointsobj != nil {
2✔
2031
                for _, subset := range endpointsobj.(*v1.Endpoints).Subsets {
2✔
2032
                        for _, addr := range subset.Addresses {
2✔
2033
                                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" {
1✔
2034
                                        continue
×
2035
                                }
2036
                                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(),
1✔
2037
                                        addr.TargetRef.Name))
1✔
2038
                        }
2039
                }
2040
        }
2041
        return true
1✔
2042
}
2043

2044
func (seps *serviceEndpointSlice) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2045
        cont := seps.cont
1✔
2046
        label := map[string]string{"kubernetes.io/service-name": service.ObjectMeta.Name}
1✔
2047
        selector := labels.SelectorFromSet(labels.Set(label))
1✔
2048
        epcount := 0
1✔
2049
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2050
                func(endpointSliceobj interface{}) {
2✔
2051
                        endpointSlices := endpointSliceobj.(*v1beta1.EndpointSlice)
1✔
2052
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2053
                                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" {
1✔
2054
                                        continue
×
2055
                                }
2056
                                epcount++
1✔
2057
                                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(),
1✔
2058
                                        endpoint.TargetRef.Name))
1✔
2059
                                cont.log.Debug("EndPoint added: ", endpoint.TargetRef.Name)
1✔
2060
                        }
2061
                })
2062
        return epcount != 0
1✔
2063
}
2064
func getProtocolStr(proto v1.Protocol) string {
1✔
2065
        var protostring string
1✔
2066
        switch proto {
1✔
2067
        case v1.ProtocolUDP:
1✔
2068
                protostring = "udp"
1✔
2069
        case v1.ProtocolTCP:
1✔
2070
                protostring = "tcp"
1✔
2071
        case v1.ProtocolSCTP:
×
2072
                protostring = "sctp"
×
2073
        default:
×
2074
                protostring = "tcp"
×
2075
        }
2076
        return protostring
1✔
2077
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc