• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 8274

06 Dec 2023 10:05AM UTC coverage: 53.692% (-0.1%) from 53.79%
8274

Pull #1206

travis-pro

web-flow
Merge cbf87aef6 into afa25a6a5
Pull Request #1206: Added paremeter enable-opflex-agent-reconnect

5 of 29 new or added lines in 3 files covered. (17.24%)

17 existing lines in 3 files now uncovered.

13263 of 24702 relevant lines covered (53.69%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.19
/pkg/controller/services.go
1
// Copyright 2016 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "errors"
19
        "fmt"
20
        "net"
21
        "reflect"
22
        "sort"
23
        "strconv"
24
        "strings"
25
        "time"
26

27
        "github.com/noironetworks/aci-containers/pkg/apicapi"
28
        "github.com/noironetworks/aci-containers/pkg/metadata"
29
        "github.com/noironetworks/aci-containers/pkg/util"
30
        "github.com/sirupsen/logrus"
31
        v1 "k8s.io/api/core/v1"
32
        discovery "k8s.io/api/discovery/v1"
33
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34
        "k8s.io/apimachinery/pkg/fields"
35
        "k8s.io/apimachinery/pkg/labels"
36
        "k8s.io/client-go/kubernetes"
37
        "k8s.io/client-go/tools/cache"
38
)
39

40
// Default service contract scope value
41
const DefaultServiceContractScope = "context"
42

43
// Default service ext subnet scope - enable shared security
44
const DefaultServiceExtSubNetShared = false
45

46
func (cont *AciController) initEndpointsInformerFromClient(
47
        kubeClient kubernetes.Interface) {
×
48
        cont.initEndpointsInformerBase(
×
49
                cache.NewListWatchFromClient(
×
50
                        kubeClient.CoreV1().RESTClient(), "endpoints",
×
51
                        metav1.NamespaceAll, fields.Everything()))
×
52
}
×
53

54
func (cont *AciController) initEndpointSliceInformerFromClient(
55
        kubeClient kubernetes.Interface) {
×
56
        cont.initEndpointSliceInformerBase(
×
57
                cache.NewListWatchFromClient(
×
58
                        kubeClient.DiscoveryV1().RESTClient(), "endpointslices",
×
59
                        metav1.NamespaceAll, fields.Everything()))
×
60
}
×
61

62
func (cont *AciController) initEndpointSliceInformerBase(listWatch *cache.ListWatch) {
1✔
63
        cont.endpointSliceIndexer, cont.endpointSliceInformer = cache.NewIndexerInformer(
1✔
64
                listWatch, &discovery.EndpointSlice{}, 0,
1✔
65
                cache.ResourceEventHandlerFuncs{
1✔
66
                        AddFunc: func(obj interface{}) {
2✔
67
                                cont.endpointSliceAdded(obj)
1✔
68
                        },
1✔
69
                        UpdateFunc: func(oldobj interface{}, newobj interface{}) {
1✔
70
                                cont.endpointSliceUpdated(oldobj, newobj)
1✔
71
                        },
1✔
72
                        DeleteFunc: func(obj interface{}) {
1✔
73
                                cont.endpointSliceDeleted(obj)
1✔
74
                        },
1✔
75
                },
76
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
77
        )
78
}
79

80
func (cont *AciController) initEndpointsInformerBase(listWatch *cache.ListWatch) {
1✔
81
        cont.endpointsIndexer, cont.endpointsInformer = cache.NewIndexerInformer(
1✔
82
                listWatch, &v1.Endpoints{}, 0,
1✔
83
                cache.ResourceEventHandlerFuncs{
1✔
84
                        AddFunc: func(obj interface{}) {
2✔
85
                                cont.endpointsAdded(obj)
1✔
86
                        },
1✔
87
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
88
                                cont.endpointsUpdated(old, new)
1✔
89
                        },
1✔
90
                        DeleteFunc: func(obj interface{}) {
1✔
91
                                cont.endpointsDeleted(obj)
1✔
92
                        },
1✔
93
                },
94
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
95
        )
96
}
97

98
func (cont *AciController) initServiceInformerFromClient(
99
        kubeClient *kubernetes.Clientset) {
×
100
        cont.initServiceInformerBase(
×
101
                cache.NewListWatchFromClient(
×
102
                        kubeClient.CoreV1().RESTClient(), "services",
×
103
                        metav1.NamespaceAll, fields.Everything()))
×
104
}
×
105

106
func (cont *AciController) initServiceInformerBase(listWatch *cache.ListWatch) {
1✔
107
        cont.serviceIndexer, cont.serviceInformer = cache.NewIndexerInformer(
1✔
108
                listWatch, &v1.Service{}, 0,
1✔
109
                cache.ResourceEventHandlerFuncs{
1✔
110
                        AddFunc: func(obj interface{}) {
2✔
111
                                cont.serviceAdded(obj)
1✔
112
                        },
1✔
113
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
114
                                cont.serviceUpdated(old, new)
1✔
115
                        },
1✔
116
                        DeleteFunc: func(obj interface{}) {
×
117
                                cont.serviceDeleted(obj)
×
118
                        },
×
119
                },
120
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
121
        )
122
}
123

124
func serviceLogger(log *logrus.Logger, as *v1.Service) *logrus.Entry {
1✔
125
        return log.WithFields(logrus.Fields{
1✔
126
                "namespace": as.ObjectMeta.Namespace,
1✔
127
                "name":      as.ObjectMeta.Name,
1✔
128
                "type":      as.Spec.Type,
1✔
129
        })
1✔
130
}
1✔
131

132
func (cont *AciController) queueIPNetPolUpdates(ips map[string]bool) {
1✔
133
        for ipStr := range ips {
2✔
134
                ip := net.ParseIP(ipStr)
1✔
135
                if ip == nil {
1✔
136
                        continue
×
137
                }
138
                entries, err := cont.netPolSubnetIndex.ContainingNetworks(ip)
1✔
139
                if err != nil {
1✔
140
                        cont.log.Error("Corrupted network policy IP index, err: ", err)
×
141
                        return
×
142
                }
×
143
                for _, entry := range entries {
2✔
144
                        for npkey := range entry.(*ipIndexEntry).keys {
2✔
145
                                cont.queueNetPolUpdateByKey(npkey)
1✔
146
                        }
1✔
147
                }
148
        }
149
}
150

151
func (cont *AciController) queuePortNetPolUpdates(ports map[string]targetPort) {
1✔
152
        for portkey := range ports {
2✔
153
                entry := cont.targetPortIndex[portkey]
1✔
154
                if entry == nil {
2✔
155
                        continue
1✔
156
                }
157
                for npkey := range entry.networkPolicyKeys {
2✔
158
                        cont.queueNetPolUpdateByKey(npkey)
1✔
159
                }
1✔
160
        }
161
}
162

163
func (cont *AciController) queueNetPolForEpAddrs(addrs []v1.EndpointAddress) {
1✔
164
        for _, addr := range addrs {
2✔
165
                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" ||
1✔
166
                        addr.TargetRef.Namespace == "" || addr.TargetRef.Name == "" {
2✔
167
                        continue
1✔
168
                }
169
                podkey := addr.TargetRef.Namespace + "/" + addr.TargetRef.Name
1✔
170
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
171
                ps := make(map[string]bool)
1✔
172
                for _, npkey := range npkeys {
2✔
173
                        cont.queueNetPolUpdateByKey(npkey)
1✔
174
                        ps[npkey] = true
1✔
175
                }
1✔
176
                // Process if the  any matching namedport wildcard policy is present
177
                // ignore np already processed policies
178
                cont.queueMatchingNamedNp(ps, podkey)
1✔
179
        }
180
}
181

182
func (cont *AciController) queueMatchingNamedNp(served map[string]bool, podkey string) {
1✔
183
        cont.indexMutex.Lock()
1✔
184
        for npkey := range cont.nmPortNp {
2✔
185
                if _, ok := served[npkey]; !ok {
2✔
186
                        if cont.checkPodNmpMatchesNp(npkey, podkey) {
2✔
187
                                cont.queueNetPolUpdateByKey(npkey)
1✔
188
                        }
1✔
189
                }
190
        }
191
        cont.indexMutex.Unlock()
1✔
192
}
193
func (cont *AciController) queueEndpointsNetPolUpdates(endpoints *v1.Endpoints) {
1✔
194
        for _, subset := range endpoints.Subsets {
2✔
195
                cont.queueNetPolForEpAddrs(subset.Addresses)
1✔
196
                cont.queueNetPolForEpAddrs(subset.NotReadyAddresses)
1✔
197
        }
1✔
198
}
199

200
func (cont *AciController) returnServiceIps(ips []net.IP) {
1✔
201
        for _, ip := range ips {
2✔
202
                if ip.To4() != nil {
2✔
203
                        cont.serviceIps.DeallocateIp(ip)
1✔
204
                } else if ip.To16() != nil {
3✔
205
                        cont.serviceIps.DeallocateIp(ip)
1✔
206
                }
1✔
207
        }
208
}
209

210
func returnIps(pool *netIps, ips []net.IP) {
1✔
211
        for _, ip := range ips {
2✔
212
                if ip.To4() != nil {
2✔
213
                        pool.V4.AddIp(ip)
1✔
214
                } else if ip.To16() != nil {
3✔
215
                        pool.V6.AddIp(ip)
1✔
216
                }
1✔
217
        }
218
}
219

220
func (cont *AciController) staticMonPolName() string {
1✔
221
        return cont.aciNameForKey("monPol", cont.env.ServiceBd())
1✔
222
}
1✔
223

224
func (cont *AciController) staticMonPolDn() string {
1✔
225
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
226
                return fmt.Sprintf("uni/tn-%s/ipslaMonitoringPol-%s",
1✔
227
                        cont.config.AciVrfTenant, cont.staticMonPolName())
1✔
228
        }
1✔
229
        return ""
×
230
}
231

232
func (cont *AciController) staticServiceObjs() apicapi.ApicSlice {
1✔
233
        var serviceObjs apicapi.ApicSlice
1✔
234

1✔
235
        // Service bridge domain
1✔
236
        bdName := cont.aciNameForKey("bd", cont.env.ServiceBd())
1✔
237
        bd := apicapi.NewFvBD(cont.config.AciVrfTenant, bdName)
1✔
238
        if apicapi.ApicVersion >= "6.0(3.84a)" {
1✔
239
                bd.SetAttr("serviceBdRoutingDisable", "yes")
×
240
        }
×
241
        bd.SetAttr("arpFlood", "yes")
1✔
242
        bd.SetAttr("ipLearning", "no")
1✔
243
        bd.SetAttr("unkMacUcastAct", "flood")
1✔
244
        bdToOut := apicapi.NewRsBdToOut(bd.GetDn(), cont.config.AciL3Out)
1✔
245
        bd.AddChild(bdToOut)
1✔
246
        bdToVrf := apicapi.NewRsCtx(bd.GetDn(), cont.config.AciVrf)
1✔
247
        bd.AddChild(bdToVrf)
1✔
248

1✔
249
        bdn := bd.GetDn()
1✔
250
        for _, cidr := range cont.config.NodeServiceSubnets {
2✔
251
                sn := apicapi.NewFvSubnet(bdn, cidr)
1✔
252
                bd.AddChild(sn)
1✔
253
        }
1✔
254
        serviceObjs = append(serviceObjs, bd)
1✔
255

1✔
256
        // Service IP SLA monitoring policy
1✔
257
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
258
                monPol := apicapi.NewFvIPSLAMonitoringPol(cont.config.AciVrfTenant,
1✔
259
                        cont.staticMonPolName())
1✔
260
                monPol.SetAttr("slaFrequency",
1✔
261
                        strconv.Itoa(cont.config.AciServiceMonitorInterval))
1✔
262
                serviceObjs = append(serviceObjs, monPol)
1✔
263
        }
1✔
264

265
        return serviceObjs
1✔
266
}
267

268
func (cont *AciController) initStaticServiceObjs() {
1✔
269
        cont.apicConn.WriteApicObjects(cont.config.AciPrefix+"_service_static",
1✔
270
                cont.staticServiceObjs())
1✔
271
}
1✔
272

273
func (cont *AciController) updateServicesForNode(nodename string) {
1✔
274
        cont.serviceEndPoints.UpdateServicesForNode(nodename)
1✔
275
}
1✔
276

277
// must have index lock
278
func (cont *AciController) getActiveFabricPathDn(node string) string {
×
279
        var fabricPathDn string
×
280
        sz := len(cont.nodeOpflexDevice[node])
×
281
        for i := range cont.nodeOpflexDevice[node] {
×
282
                device := cont.nodeOpflexDevice[node][sz-1-i]
×
283
                if device.GetAttrStr("state") == "connected" {
×
284
                        fabricPathDn = device.GetAttrStr("fabricPathDn")
×
285
                        break
×
286
                }
287
        }
288
        return fabricPathDn
×
289
}
290

291
func (cont *AciController) getOpflexOdevCount(node string) (int, string) {
×
292
        devIdCount := make(map[string]int)
×
293
        var maxCount int
×
294
        var fabricPathDn string
×
295
        for _, device := range cont.nodeOpflexDevice[node] {
×
296
                devId := device.GetAttrStr("devId")
×
297
                count, ok := devIdCount[devId]
×
298
                if ok {
×
299
                        devIdCount[devId] = count + 1
×
300
                } else {
×
301
                        devIdCount[devId] = 1
×
302
                }
×
303
                if devIdCount[devId] >= maxCount {
×
304
                        maxCount = devIdCount[devId]
×
305
                        fabricPathDn = device.GetAttrStr("fabricPathDn")
×
306
                }
×
307
        }
308
        return maxCount, fabricPathDn
×
309
}
310

311
func deleteDevicesFromList(delDevices, devices apicapi.ApicSlice) apicapi.ApicSlice {
×
312
        var newDevices apicapi.ApicSlice
×
313
        for _, device := range devices {
×
314
                found := false
×
315
                for delDev := range delDevices {
×
316
                        if reflect.DeepEqual(delDev, device) {
×
317
                                found = true
×
318
                        }
×
319
                }
320
                if !found {
×
321
                        newDevices = append(newDevices, device)
×
322
                }
×
323
        }
324
        return newDevices
×
325
}
326

327
func (cont *AciController) getAciPodSubnet(pod string) (string, error) {
×
328
        podslice := strings.Split(pod, "-")
×
329
        if len(podslice) < 2 {
×
330
                return "", fmt.Errorf("Failed to get podid from pod")
×
331
        }
×
332
        podid := podslice[1]
×
333
        var subnet string
×
334
        args := []string{
×
335
                "query-target=self",
×
336
        }
×
337
        url := fmt.Sprintf("/api/node/mo/uni/controller/setuppol/setupp-%s.json?%s", podid, strings.Join(args, "&"))
×
338
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
339
        if err != nil {
×
340
                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
341
                return subnet, err
×
342
        }
×
343
        for _, obj := range apicresp.Imdata {
×
344
                for _, body := range obj {
×
345
                        tepPool, ok := body.Attributes["tepPool"].(string)
×
346
                        if ok {
×
347
                                subnet = tepPool
×
348
                                break
×
349
                        }
350
                }
351
        }
352
        return subnet, nil
×
353
}
354

355
func (cont *AciController) createAciPodAnnotation(node string) (aciPodAnnot, error) {
×
356
        odevCount, fabricPathDn := cont.getOpflexOdevCount(node)
×
357
        nodeAciPodAnnot := cont.nodeACIPod[node]
×
358
        isSingleOdev := nodeAciPodAnnot.isSingleOpflexOdev
×
359
        if (odevCount == 0) ||
×
360
                (odevCount == 1 && !isSingleOdev) ||
×
361
                (odevCount == 1 && isSingleOdev && !nodeAciPodAnnot.disconnectTime.IsZero()) {
×
362
                if nodeAciPodAnnot.aciPod != "none" {
×
363
                        if nodeAciPodAnnot.disconnectTime.IsZero() {
×
364
                                nodeAciPodAnnot.disconnectTime = time.Now()
×
365
                        } else {
×
366
                                currentTime := time.Now()
×
367
                                diff := currentTime.Sub(nodeAciPodAnnot.disconnectTime)
×
368
                                if diff.Seconds() > float64(cont.config.OpflexDeviceReconnectWaitTimeout) {
×
369
                                        nodeAciPodAnnot.aciPod = "none"
×
370
                                        nodeAciPodAnnot.disconnectTime = time.Time{}
×
371
                                }
×
372
                        }
373
                }
374
                if odevCount == 1 {
×
375
                        nodeAciPodAnnot.isSingleOpflexOdev = true
×
376
                }
×
377
                nodeAciPodAnnot.connectTime = time.Time{}
×
378
                return nodeAciPodAnnot, nil
×
379
        } else if (odevCount == 2) ||
×
380
                (odevCount == 1 && isSingleOdev && nodeAciPodAnnot.disconnectTime.IsZero()) {
×
381
                // when there is already a connected opflex device,
×
382
                // fabricPathDn will have latest pod iformation
×
383
                // and annotation will be in the form pod-<podid>-<subnet of pod>
×
384
                nodeAciPodAnnot.disconnectTime = time.Time{}
×
385
                nodeAciPod := nodeAciPodAnnot.aciPod
×
386
                if odevCount == 2 {
×
387
                        nodeAciPodAnnot.isSingleOpflexOdev = false
×
388
                }
×
389
                if odevCount == 1 && nodeAciPod == "none" {
×
390
                        if nodeAciPodAnnot.connectTime.IsZero() {
×
391
                                nodeAciPodAnnot.connectTime = time.Now()
×
392
                                return nodeAciPodAnnot, nil
×
393
                        } else {
×
394
                                currentTime := time.Now()
×
395
                                diff := currentTime.Sub(nodeAciPodAnnot.connectTime)
×
396
                                if diff.Seconds() < float64(10) {
×
397
                                        return nodeAciPodAnnot, nil
×
398
                                }
×
399
                        }
400
                }
401
                nodeAciPodAnnot.connectTime = time.Time{}
×
402
                pathSlice := strings.Split(fabricPathDn, "/")
×
403
                if len(pathSlice) > 1 {
×
404
                        pod := pathSlice[1]
×
405

×
406
                        // when there is difference in pod info avaliable from fabricPathDn
×
407
                        // and what we have in cache, update info in cache and change annotation on node
×
408
                        if !strings.Contains(nodeAciPod, pod) {
×
409
                                subnet, err := cont.getAciPodSubnet(pod)
×
410
                                if err != nil {
×
411
                                        cont.log.Error("Failed to get subnet of aci pod ", err.Error())
×
412
                                        return nodeAciPodAnnot, err
×
413
                                } else {
×
414
                                        nodeAciPodAnnot.aciPod = pod + "-" + subnet
×
415
                                        return nodeAciPodAnnot, nil
×
416
                                }
×
417
                        } else {
×
418
                                return nodeAciPodAnnot, nil
×
419
                        }
×
420
                } else {
×
421
                        cont.log.Error("Invalid fabricPathDn of opflexOdev of node ", node)
×
422
                        return nodeAciPodAnnot, fmt.Errorf("Invalid fabricPathDn of opflexOdev")
×
423
                }
×
424
        }
425
        return nodeAciPodAnnot, fmt.Errorf("Failed to get annotation")
×
426
}
427

428
func (cont *AciController) createNodeAciPodAnnotation(node string) (aciPodAnnot, error) {
×
429
        odevCount, fabricPathDn := cont.getOpflexOdevCount(node)
×
430
        nodeAciPodAnnot := cont.nodeACIPodAnnot[node]
×
431
        isSingleOdev := nodeAciPodAnnot.isSingleOpflexOdev
×
432
        if (odevCount == 0) ||
×
433
                (odevCount == 1 && !isSingleOdev) {
×
434
                if nodeAciPodAnnot.aciPod != "none" {
×
435
                        nodeAciPodAnnot.aciPod = "none"
×
436
                }
×
437
                if odevCount == 1 {
×
438
                        nodeAciPodAnnot.isSingleOpflexOdev = true
×
439
                }
×
440
                nodeAciPodAnnot.connectTime = time.Time{}
×
441
                return nodeAciPodAnnot, nil
×
442
        } else if (odevCount == 2) ||
×
443
                (odevCount == 1 && isSingleOdev) {
×
444
                nodeAciPod := nodeAciPodAnnot.aciPod
×
445
                if odevCount == 2 {
×
446
                        nodeAciPodAnnot.isSingleOpflexOdev = false
×
447
                }
×
448
                if odevCount == 1 && nodeAciPod == "none" {
×
449
                        if nodeAciPodAnnot.connectTime.IsZero() {
×
450
                                nodeAciPodAnnot.connectTime = time.Now()
×
451
                                return nodeAciPodAnnot, nil
×
452
                        } else {
×
453
                                currentTime := time.Now()
×
454
                                diff := currentTime.Sub(nodeAciPodAnnot.connectTime)
×
455
                                if diff.Seconds() < float64(10) {
×
456
                                        return nodeAciPodAnnot, nil
×
457
                                }
×
458
                        }
459
                }
460
                nodeAciPodAnnot.connectTime = time.Time{}
×
461
                pathSlice := strings.Split(fabricPathDn, "/")
×
462
                if len(pathSlice) > 1 {
×
463

×
464
                        nodeAciPodAnnot.aciPod = pathSlice[1]
×
465
                        return nodeAciPodAnnot, nil
×
466
                } else {
×
467
                        cont.log.Error("Invalid fabricPathDn of opflexOdev of node ", node)
×
468
                        return nodeAciPodAnnot, fmt.Errorf("Invalid fabricPathDn of opflexOdev")
×
469
                }
×
470
        }
471
        return nodeAciPodAnnot, fmt.Errorf("Failed to get annotation")
×
472
}
473

UNCOV
474
func (cont *AciController) checkChangeOfOpflexOdevAciPod() {
×
UNCOV
475
        var nodeAnnotationUpdates []string
×
UNCOV
476
        cont.apicConn.SyncMutex.Lock()
×
UNCOV
477
        syncDone := cont.apicConn.SyncDone
×
UNCOV
478
        cont.apicConn.SyncMutex.Unlock()
×
UNCOV
479

×
UNCOV
480
        if !syncDone {
×
UNCOV
481
                return
×
UNCOV
482
        }
×
483

484
        cont.indexMutex.Lock()
×
485
        for node := range cont.nodeACIPodAnnot {
×
486
                annot, err := cont.createNodeAciPodAnnotation(node)
×
487
                if err != nil {
×
488
                        cont.log.Error(err.Error())
×
489
                } else {
×
490
                        if annot != cont.nodeACIPodAnnot[node] {
×
491
                                cont.nodeACIPodAnnot[node] = annot
×
492
                                nodeAnnotationUpdates = append(nodeAnnotationUpdates, node)
×
493
                        }
×
494
                }
495
        }
496
        cont.indexMutex.Unlock()
×
497
        if len(nodeAnnotationUpdates) > 0 {
×
498
                for _, updatednode := range nodeAnnotationUpdates {
×
499
                        go cont.env.NodeAnnotationChanged(updatednode)
×
500
                }
×
501
        }
502
}
503

504
func (cont *AciController) checkChangeOfOdevAciPod() {
×
505
        var nodeAnnotationUpdates []string
×
506
        cont.apicConn.SyncMutex.Lock()
×
507
        syncDone := cont.apicConn.SyncDone
×
508
        cont.apicConn.SyncMutex.Unlock()
×
509

×
510
        if !syncDone {
×
511
                return
×
512
        }
×
513

514
        cont.indexMutex.Lock()
×
515
        for node := range cont.nodeACIPod {
×
516
                annot, err := cont.createAciPodAnnotation(node)
×
517
                if err != nil {
×
518
                        cont.log.Error(err.Error())
×
519
                } else {
×
520
                        if annot != cont.nodeACIPod[node] {
×
521
                                cont.nodeACIPod[node] = annot
×
522
                                nodeAnnotationUpdates = append(nodeAnnotationUpdates, node)
×
523
                        }
×
524
                }
525
        }
526
        cont.indexMutex.Unlock()
×
527
        if len(nodeAnnotationUpdates) > 0 {
×
528
                for _, updatednode := range nodeAnnotationUpdates {
×
529
                        go cont.env.NodeAnnotationChanged(updatednode)
×
530
                }
×
531
        }
532
}
533

534
func (cont *AciController) deleteOldOpflexDevices() {
1✔
535
        var nodeUpdates []string
1✔
536
        cont.indexMutex.Lock()
1✔
537
        for node, devices := range cont.nodeOpflexDevice {
1✔
538
                var delDevices apicapi.ApicSlice
×
539
                fabricPathDn := cont.getActiveFabricPathDn(node)
×
540
                if fabricPathDn != "" {
×
541
                        for _, device := range devices {
×
542
                                if device.GetAttrStr("delete") == "true" && device.GetAttrStr("fabricPathDn") != fabricPathDn {
×
543
                                        deleteTimeStr := device.GetAttrStr("deleteTime")
×
544
                                        deleteTime, err := time.Parse(time.RFC3339, deleteTimeStr)
×
545
                                        if err != nil {
×
546
                                                cont.log.Error("Failed to parse opflex device delete time: ", err)
×
547
                                                continue
×
548
                                        }
549
                                        now := time.Now()
×
550
                                        diff := now.Sub(deleteTime)
×
551
                                        if diff.Seconds() >= cont.config.OpflexDeviceDeleteTimeout {
×
552
                                                delDevices = append(delDevices, device)
×
553
                                        }
×
554
                                }
555
                        }
556
                        if len(delDevices) > 0 {
×
557
                                newDevices := deleteDevicesFromList(delDevices, devices)
×
558
                                cont.nodeOpflexDevice[node] = newDevices
×
559
                                if len(newDevices) == 0 {
×
560
                                        delete(cont.nodeOpflexDevice, node)
×
561
                                }
×
562
                                nodeUpdates = append(nodeUpdates, node)
×
563
                        }
564
                }
565
        }
566
        cont.indexMutex.Unlock()
1✔
567
        if len(nodeUpdates) > 0 {
1✔
568
                cont.postOpflexDeviceDelete(nodeUpdates)
×
569
        }
×
570
}
571

572
// must have index lock
573
func (cont *AciController) setDeleteFlagForOldDevices(node, fabricPathDn string) {
1✔
574
        for _, device := range cont.nodeOpflexDevice[node] {
2✔
575
                if device.GetAttrStr("fabricPathDn") != fabricPathDn {
1✔
576
                        t := time.Now()
×
577
                        device.SetAttr("delete", "true")
×
578
                        device.SetAttr("deleteTime", t.Format(time.RFC3339))
×
579
                }
×
580
        }
581
}
582

583
// must have index lock
584
func (cont *AciController) fabricPathForNode(name string) (string, bool) {
1✔
585
        sz := len(cont.nodeOpflexDevice[name])
1✔
586
        for i := range cont.nodeOpflexDevice[name] {
2✔
587
                device := cont.nodeOpflexDevice[name][sz-1-i]
1✔
588
                deviceState := device.GetAttrStr("state")
1✔
589
                if deviceState == "connected" {
2✔
590
                        if deviceState != device.GetAttrStr("prevState") {
2✔
591
                                cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Info("Processing fabric path for node ",
1✔
592
                                        "when connected device state is found")
1✔
593
                                device.SetAttr("prevState", deviceState)
1✔
594
                        }
1✔
595
                        fabricPathDn := device.GetAttrStr("fabricPathDn")
1✔
596
                        cont.setDeleteFlagForOldDevices(name, fabricPathDn)
1✔
597
                        return fabricPathDn, true
1✔
598
                } else {
1✔
599
                        device.SetAttr("prevState", deviceState)
1✔
600
                }
1✔
601
        }
602
        if sz > 0 {
2✔
603
                // When the opflex-device for a node changes, for example during a live migration,
1✔
604
                // we end up with both the old and the new device objects till the old object
1✔
605
                // ages out on APIC. The new object is at end of the devices list (see opflexDeviceChanged),
1✔
606
                // so we return the fabricPathDn of the last opflex-device.
1✔
607
                cont.fabricPathLogger(cont.nodeOpflexDevice[name][sz-1].GetAttrStr("hostName"),
1✔
608
                        cont.nodeOpflexDevice[name][sz-1]).Info("Processing fabricPathDn for node")
1✔
609
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("fabricPathDn"), true
1✔
610
        }
1✔
611
        return "", false
1✔
612
}
613

614
// must have index lock
615
func (cont *AciController) deviceMacForNode(name string) (string, bool) {
1✔
616
        sz := len(cont.nodeOpflexDevice[name])
1✔
617
        if sz > 0 {
2✔
618
                // When the opflex-device for a node changes, for example when the
1✔
619
                // node is recreated, we end up with both the old and the new
1✔
620
                // device objects till the old object ages out on APIC. The
1✔
621
                // new object is at end of the devices list (see
1✔
622
                // opflexDeviceChanged), so we return the MAC address of the
1✔
623
                // last opflex-device.
1✔
624
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("mac"), true
1✔
625
        }
1✔
626
        return "", false
1✔
627
}
628

629
func apicRedirectDst(rpDn string, ip string, mac string,
630
        descr string, healthGroupDn string, enablePbrTracking bool) apicapi.ApicObject {
1✔
631
        dst := apicapi.NewVnsRedirectDest(rpDn, ip, mac).SetAttr("descr", descr)
1✔
632
        if healthGroupDn != "" && enablePbrTracking {
2✔
633
                dst.AddChild(apicapi.NewVnsRsRedirectHealthGroup(dst.GetDn(),
1✔
634
                        healthGroupDn))
1✔
635
        }
1✔
636
        return dst
1✔
637
}
638

639
func (cont *AciController) apicRedirectPol(name string, tenantName string, nodes []string,
640
        nodeMap map[string]*metadata.ServiceEndpoint,
641
        monPolDn string, enablePbrTracking bool) (apicapi.ApicObject, string) {
1✔
642
        rp := apicapi.NewVnsSvcRedirectPol(tenantName, name)
1✔
643
        rp.SetAttr("thresholdDownAction", "deny")
1✔
644
        rpDn := rp.GetDn()
1✔
645
        for _, node := range nodes {
2✔
646
                cont.indexMutex.Lock()
1✔
647
                serviceEp, ok := nodeMap[node]
1✔
648
                if !ok {
1✔
649
                        continue
×
650
                }
651
                if serviceEp.Ipv4 != nil {
2✔
652
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv4.String(),
1✔
653
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
1✔
654
                }
1✔
655
                if serviceEp.Ipv6 != nil {
1✔
656
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv6.String(),
×
657
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
×
658
                }
×
659
                cont.indexMutex.Unlock()
1✔
660
        }
661
        if monPolDn != "" && enablePbrTracking {
2✔
662
                rp.AddChild(apicapi.NewVnsRsIPSLAMonitoringPol(rpDn, monPolDn))
1✔
663
        }
1✔
664
        return rp, rpDn
1✔
665
}
666

667
func apicExtNetCreate(enDn string, ingress string, ipv4 bool,
668
        cidr bool, sharedSec bool) apicapi.ApicObject {
1✔
669
        if !cidr {
2✔
670
                if ipv4 {
2✔
671
                        ingress += "/32"
1✔
672
                } else {
1✔
673
                        ingress += "/128"
×
674
                }
×
675
        }
676
        subnet := apicapi.NewL3extSubnet(enDn, ingress)
1✔
677
        if sharedSec {
2✔
678
                subnet.SetAttr("scope", "import-security,shared-security")
1✔
679
        }
1✔
680
        return subnet
1✔
681
}
682

683
func apicExtNet(name string, tenantName string, l3Out string,
684
        ingresses []string, sharedSecurity bool, snat bool) apicapi.ApicObject {
1✔
685
        en := apicapi.NewL3extInstP(tenantName, l3Out, name)
1✔
686
        enDn := en.GetDn()
1✔
687
        if snat {
2✔
688
                en.AddChild(apicapi.NewFvRsCons(enDn, name))
1✔
689
        } else {
2✔
690
                en.AddChild(apicapi.NewFvRsProv(enDn, name))
1✔
691
        }
1✔
692

693
        for _, ingress := range ingresses {
2✔
694
                ip, _, _ := net.ParseCIDR(ingress)
1✔
695
                // If ingress is a subnet
1✔
696
                if ip != nil {
2✔
697
                        if ip != nil && ip.To4() != nil {
2✔
698
                                subnet := apicExtNetCreate(enDn, ingress, true, true, sharedSecurity)
1✔
699
                                en.AddChild(subnet)
1✔
700
                        } else if ip != nil && ip.To16() != nil {
1✔
701
                                subnet := apicExtNetCreate(enDn, ingress, false, true, sharedSecurity)
×
702
                                en.AddChild(subnet)
×
703
                        }
×
704
                } else {
1✔
705
                        // If ingress is an IP address
1✔
706
                        ip := net.ParseIP(ingress)
1✔
707
                        if ip != nil && ip.To4() != nil {
2✔
708
                                subnet := apicExtNetCreate(enDn, ingress, true, false, sharedSecurity)
1✔
709
                                en.AddChild(subnet)
1✔
710
                        } else if ip != nil && ip.To16() != nil {
1✔
711
                                subnet := apicExtNetCreate(enDn, ingress, false, false, sharedSecurity)
×
712
                                en.AddChild(subnet)
×
713
                        }
×
714
                }
715
        }
716
        return en
1✔
717
}
718

719
func apicExtNetCons(conName string, tenantName string,
720
        l3Out string, net string) apicapi.ApicObject {
1✔
721
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
722
        return apicapi.NewFvRsCons(enDn, conName)
1✔
723
}
1✔
724

725
func apicExtNetProv(conName string, tenantName string,
726
        l3Out string, net string) apicapi.ApicObject {
1✔
727
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
728
        return apicapi.NewFvRsProv(enDn, conName)
1✔
729
}
1✔
730

731
// Helper function to check if a string item exists in a slice
732
func stringInSlice(str string, list []string) bool {
1✔
733
        for _, v := range list {
2✔
734
                if v == str {
2✔
735
                        return true
1✔
736
                }
1✔
737
        }
738
        return false
×
739
}
740

741
func validScope(scope string) bool {
1✔
742
        validValues := []string{"", "context", "tenant", "global"}
1✔
743
        return stringInSlice(scope, validValues)
1✔
744
}
1✔
745

746
func (cont *AciController) getGraphNameFromContract(name, tenantName string) (string, error) {
×
747
        var graphName string
×
748
        args := []string{
×
749
                "query-target=subtree",
×
750
        }
×
751
        url := fmt.Sprintf("/api/node/mo/uni/tn-%s/brc-%s.json?%s", tenantName, name, strings.Join(args, "&"))
×
752
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
753
        if err != nil {
×
754
                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
755
                return graphName, err
×
756
        }
×
757
        for _, obj := range apicresp.Imdata {
×
758
                for class, body := range obj {
×
759
                        if class == "vzRsSubjGraphAtt" {
×
760
                                tnVnsAbsGraphName, ok := body.Attributes["tnVnsAbsGraphName"].(string)
×
761
                                if ok {
×
762
                                        graphName = tnVnsAbsGraphName
×
763
                                }
×
764
                                break
×
765
                        }
766
                }
767
        }
768
        cont.log.Debug("graphName: ", graphName)
×
769
        return graphName, err
×
770
}
771

772
func apicContract(conName string, tenantName string,
773
        graphName string, scopeName string, isSnatPbrFltrChain bool,
774
        customSGAnnot bool) apicapi.ApicObject {
1✔
775
        con := apicapi.NewVzBrCP(tenantName, conName)
1✔
776
        if scopeName != "" && scopeName != "context" {
2✔
777
                con.SetAttr("scope", scopeName)
1✔
778
        }
1✔
779
        cs := apicapi.NewVzSubj(con.GetDn(), "loadbalancedservice")
1✔
780
        csDn := cs.GetDn()
1✔
781
        if isSnatPbrFltrChain {
2✔
782
                cs.SetAttr("revFltPorts", "no")
1✔
783
                inTerm := apicapi.NewVzInTerm(csDn)
1✔
784
                outTerm := apicapi.NewVzOutTerm(csDn)
1✔
785
                inTerm.AddChild(apicapi.NewVzRsInTermGraphAtt(inTerm.GetDn(), graphName))
1✔
786
                inTerm.AddChild(apicapi.NewVzRsFiltAtt(inTerm.GetDn(), conName+"_fromCons-toProv"))
1✔
787
                outTerm.AddChild(apicapi.NewVzRsOutTermGraphAtt(outTerm.GetDn(), graphName))
1✔
788
                outTerm.AddChild(apicapi.NewVzRsFiltAtt(outTerm.GetDn(), conName+"_fromProv-toCons"))
1✔
789
                cs.AddChild(inTerm)
1✔
790
                cs.AddChild(outTerm)
1✔
791
        } else {
2✔
792
                cs.AddChild(apicapi.NewVzRsSubjGraphAtt(csDn, graphName, customSGAnnot))
1✔
793
                cs.AddChild(apicapi.NewVzRsSubjFiltAtt(csDn, conName))
1✔
794
        }
1✔
795
        con.AddChild(cs)
1✔
796
        return con
1✔
797
}
798

799
func apicDevCtx(name string, tenantName string,
800
        graphName string, deviceName string, bdName string, rpDn string, isSnatPbrFltrChain bool) apicapi.ApicObject {
1✔
801
        cc := apicapi.NewVnsLDevCtx(tenantName, name, graphName, "loadbalancer")
1✔
802
        ccDn := cc.GetDn()
1✔
803
        graphDn := fmt.Sprintf("uni/tn-%s/lDevVip-%s", tenantName, deviceName)
1✔
804
        lifDn := fmt.Sprintf("%s/lIf-%s", graphDn, "interface")
1✔
805
        bdDn := fmt.Sprintf("uni/tn-%s/BD-%s", tenantName, bdName)
1✔
806
        cc.AddChild(apicapi.NewVnsRsLDevCtxToLDev(ccDn, graphDn))
1✔
807
        rpDnBase := rpDn
1✔
808
        for _, ctxConn := range []string{"consumer", "provider"} {
2✔
809
                lifCtx := apicapi.NewVnsLIfCtx(ccDn, ctxConn)
1✔
810
                if isSnatPbrFltrChain {
2✔
811
                        if ctxConn == "consumer" {
2✔
812
                                rpDn = rpDnBase + "_Cons"
1✔
813
                        } else {
2✔
814
                                rpDn = rpDnBase + "_Prov"
1✔
815
                        }
1✔
816
                }
817
                lifCtxDn := lifCtx.GetDn()
1✔
818
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToSvcRedirectPol(lifCtxDn, rpDn))
1✔
819
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToBD(lifCtxDn, bdDn))
1✔
820
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToLIf(lifCtxDn, lifDn))
1✔
821
                cc.AddChild(lifCtx)
1✔
822
        }
823
        return cc
1✔
824
}
825

826
func apicFilterEntry(filterDn string, count string, p_start string,
827
        p_end string, protocol string, stateful string, snat bool, outTerm bool) apicapi.ApicObject {
1✔
828
        fe := apicapi.NewVzEntry(filterDn, count)
1✔
829
        fe.SetAttr("etherT", "ip")
1✔
830
        fe.SetAttr("prot", protocol)
1✔
831
        if snat {
2✔
832
                if outTerm {
2✔
833
                        if protocol == "tcp" {
2✔
834
                                fe.SetAttr("tcpRules", "est")
1✔
835
                        }
1✔
836
                        // Reverse the ports for outTerm
837
                        fe.SetAttr("dFromPort", p_start)
1✔
838
                        fe.SetAttr("dToPort", p_end)
1✔
839
                } else {
1✔
840
                        fe.SetAttr("sFromPort", p_start)
1✔
841
                        fe.SetAttr("sToPort", p_end)
1✔
842
                }
1✔
843
        } else {
1✔
844
                fe.SetAttr("dFromPort", p_start)
1✔
845
                fe.SetAttr("dToPort", p_end)
1✔
846
        }
1✔
847
        fe.SetAttr("stateful", stateful)
1✔
848
        return fe
1✔
849
}
850
func apicFilter(name string, tenantName string,
851
        portSpec []v1.ServicePort, snat bool, snatRange portRangeSnat) apicapi.ApicObject {
1✔
852
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
853
        filterDn := filter.GetDn()
1✔
854

1✔
855
        var i int
1✔
856
        var port v1.ServicePort
1✔
857
        for i, port = range portSpec {
2✔
858
                pstr := strconv.Itoa(int(port.Port))
1✔
859
                proto := getProtocolStr(port.Protocol)
1✔
860
                fe := apicFilterEntry(filterDn, strconv.Itoa(i), pstr,
1✔
861
                        pstr, proto, "no", false, false)
1✔
862
                filter.AddChild(fe)
1✔
863
        }
1✔
864

865
        if snat {
1✔
866
                portSpec := []portRangeSnat{snatRange}
×
867
                p_start := strconv.Itoa(portSpec[0].start)
×
868
                p_end := strconv.Itoa(portSpec[0].end)
×
869

×
870
                fe1 := apicFilterEntry(filterDn, strconv.Itoa(i+1), p_start,
×
871
                        p_end, "tcp", "no", false, false)
×
872
                filter.AddChild(fe1)
×
873
                fe2 := apicFilterEntry(filterDn, strconv.Itoa(i+2), p_start,
×
874
                        p_end, "udp", "no", false, false)
×
875
                filter.AddChild(fe2)
×
876
        }
×
877
        return filter
1✔
878
}
879

880
func apicFilterSnat(name string, tenantName string,
881
        portSpec []portRangeSnat, outTerm bool) apicapi.ApicObject {
1✔
882
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
883
        filterDn := filter.GetDn()
1✔
884

1✔
885
        p_start := strconv.Itoa(portSpec[0].start)
1✔
886
        p_end := strconv.Itoa(portSpec[0].end)
1✔
887

1✔
888
        fe := apicFilterEntry(filterDn, "0", p_start,
1✔
889
                p_end, "tcp", "no", true, outTerm)
1✔
890
        filter.AddChild(fe)
1✔
891
        fe1 := apicFilterEntry(filterDn, "1", p_start,
1✔
892
                p_end, "udp", "no", true, outTerm)
1✔
893
        filter.AddChild(fe1)
1✔
894

1✔
895
        return filter
1✔
896
}
1✔
897

898
func (cont *AciController) updateServiceDeviceInstance(key string,
899
        service *v1.Service) error {
1✔
900
        cont.indexMutex.Lock()
1✔
901
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
902
        cont.serviceEndPoints.GetnodesMetadata(key, service, nodeMap)
1✔
903
        cont.indexMutex.Unlock()
1✔
904

1✔
905
        var nodes []string
1✔
906
        for node := range nodeMap {
2✔
907
                nodes = append(nodes, node)
1✔
908
        }
1✔
909
        sort.Strings(nodes)
1✔
910
        name := cont.aciNameForKey("svc", key)
1✔
911
        var conScope string
1✔
912
        scopeVal, ok := service.ObjectMeta.Annotations[metadata.ServiceContractScopeAnnotation]
1✔
913
        if ok {
2✔
914
                normScopeVal := strings.ToLower(scopeVal)
1✔
915
                if !validScope(normScopeVal) {
1✔
916
                        errString := "Invalid service contract scope value provided " + scopeVal
×
917
                        err := errors.New(errString)
×
918
                        serviceLogger(cont.log, service).Error("Could not create contract: ", err)
×
919
                        return err
×
920
                } else {
1✔
921
                        conScope = normScopeVal
1✔
922
                }
1✔
923
        } else {
1✔
924
                conScope = DefaultServiceContractScope
1✔
925
        }
1✔
926

927
        var sharedSecurity bool
1✔
928
        if conScope == "global" {
2✔
929
                sharedSecurity = true
1✔
930
        } else {
2✔
931
                sharedSecurity = DefaultServiceExtSubNetShared
1✔
932
        }
1✔
933

934
        graphName := cont.aciNameForKey("svc", "global")
1✔
935
        deviceName := cont.aciNameForKey("svc", "global")
1✔
936
        _, customSGAnnPresent := service.ObjectMeta.Annotations[metadata.ServiceGraphNameAnnotation]
1✔
937
        if customSGAnnPresent {
1✔
938
                customSG, err := cont.getGraphNameFromContract(name, cont.config.AciVrfTenant)
×
939
                if err == nil {
×
940
                        graphName = customSG
×
941
                }
×
942
        }
943
        cont.log.Debug("Using service graph ", graphName, " for service ", key)
1✔
944

1✔
945
        var serviceObjs apicapi.ApicSlice
1✔
946
        if len(nodes) > 0 {
2✔
947
                // 1. Service redirect policy
1✔
948
                // The service redirect policy contains the MAC address
1✔
949
                // and IP address of each of the service endpoints for
1✔
950
                // each node that hosts a pod for this service.  The
1✔
951
                // example below shows the case of two nodes.
1✔
952
                rp, rpDn :=
1✔
953
                        cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
1✔
954
                                nodeMap, cont.staticMonPolDn(), cont.config.AciPbrTrackingNonSnat)
1✔
955
                serviceObjs = append(serviceObjs, rp)
1✔
956

1✔
957
                // 2. Service graph contract and external network
1✔
958
                // The service graph contract must be bound to the service
1✔
959
                // graph.  This contract must be consumed by the default
1✔
960
                // layer 3 network and provided by the service layer 3
1✔
961
                // network.
1✔
962
                {
2✔
963
                        var ingresses []string
1✔
964
                        for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
965
                                ingresses = append(ingresses, ingress.IP)
1✔
966
                        }
1✔
967
                        serviceObjs = append(serviceObjs,
1✔
968
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
969
                                        cont.config.AciL3Out, ingresses, sharedSecurity, false))
1✔
970
                }
971

972
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, false, customSGAnnPresent)
1✔
973
                serviceObjs = append(serviceObjs, contract)
1✔
974
                for _, net := range cont.config.AciExtNetworks {
2✔
975
                        serviceObjs = append(serviceObjs,
1✔
976
                                apicExtNetCons(name, cont.config.AciVrfTenant,
1✔
977
                                        cont.config.AciL3Out, net))
1✔
978
                }
1✔
979

980
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
981
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
982

1✔
983
                _, snat := cont.snatServices[key]
1✔
984
                filter := apicFilter(name, cont.config.AciVrfTenant,
1✔
985
                        service.Spec.Ports, snat, defaultPortRange)
1✔
986
                serviceObjs = append(serviceObjs, filter)
1✔
987

1✔
988
                // 3. Device cluster context
1✔
989
                // The logical device context binds the service contract
1✔
990
                // to the redirect policy and the device cluster and
1✔
991
                // bridge domain for the device cluster.
1✔
992
                serviceObjs = append(serviceObjs,
1✔
993
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName, deviceName,
1✔
994
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, false))
1✔
995
        }
996

997
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
998
        return nil
1✔
999
}
1000

1001
func (cont *AciController) updateServiceDeviceInstanceSnat(key string) error {
1✔
1002
        nodeList := cont.nodeIndexer.List()
1✔
1003
        cont.indexMutex.Lock()
1✔
1004
        if len(cont.nodeServiceMetaCache) == 0 {
2✔
1005
                cont.indexMutex.Unlock()
1✔
1006
                return nil
1✔
1007
        }
1✔
1008
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
1009
        for itr, nodeItem := range nodeList {
2✔
1010
                if itr == cont.config.MaxSvcGraphNodes {
1✔
1011
                        break
×
1012
                }
1013
                node := nodeItem.(*v1.Node)
1✔
1014
                nodeName := node.ObjectMeta.Name
1✔
1015
                nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
1016
                if !ok {
1✔
UNCOV
1017
                        continue
×
1018
                }
1019
                _, ok = cont.fabricPathForNode(nodeName)
1✔
1020
                if !ok {
1✔
1021
                        continue
×
1022
                }
1023
                nodeLabels := node.ObjectMeta.Labels
1✔
1024
                excludeNode := cont.nodeLabelsInExcludeList(nodeLabels)
1✔
1025
                if excludeNode {
1✔
1026
                        continue
×
1027
                }
1028
                nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
1029
        }
1030
        cont.indexMutex.Unlock()
1✔
1031

1✔
1032
        var nodes []string
1✔
1033
        for node := range nodeMap {
2✔
1034
                nodes = append(nodes, node)
1✔
1035
        }
1✔
1036
        sort.Strings(nodes)
1✔
1037
        name := cont.aciNameForKey("snat", key)
1✔
1038
        var conScope = cont.config.SnatSvcContractScope
1✔
1039
        sharedSecurity := true
1✔
1040

1✔
1041
        graphName := cont.aciNameForKey("svc", "global")
1✔
1042
        var serviceObjs apicapi.ApicSlice
1✔
1043
        if len(nodes) > 0 {
2✔
1044
                // 1. Service redirect policy
1✔
1045
                // The service redirect policy contains the MAC address
1✔
1046
                // and IP address of each of the service endpoints for
1✔
1047
                // each node that hosts a pod for this service.
1✔
1048
                // For SNAT with the introduction of filter-chain usage, to work-around
1✔
1049
                // an APIC limitation, creating two PBR policies with same nodes.
1✔
1050
                var rpDn string
1✔
1051
                var rp apicapi.ApicObject
1✔
1052
                if cont.apicConn.SnatPbrFltrChain {
2✔
1053
                        rpCons, rpDnCons :=
1✔
1054
                                cont.apicRedirectPol(name+"_Cons", cont.config.AciVrfTenant, nodes,
1✔
1055
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
1056
                        serviceObjs = append(serviceObjs, rpCons)
1✔
1057
                        rpProv, _ :=
1✔
1058
                                cont.apicRedirectPol(name+"_Prov", cont.config.AciVrfTenant, nodes,
1✔
1059
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
1060
                        serviceObjs = append(serviceObjs, rpProv)
1✔
1061
                        rpDn = strings.TrimSuffix(rpDnCons, "_Cons")
1✔
1062
                } else {
1✔
1063
                        rp, rpDn =
×
1064
                                cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
×
1065
                                        nodeMap, cont.staticMonPolDn(), true)
×
1066
                        serviceObjs = append(serviceObjs, rp)
×
1067
                }
×
1068
                // 2. Service graph contract and external network
1069
                // The service graph contract must be bound to the
1070
                // service
1071
                // graph.  This contract must be consumed by the default
1072
                // layer 3 network and provided by the service layer 3
1073
                // network.
1074
                {
1✔
1075
                        var ingresses []string
1✔
1076
                        for _, policy := range cont.snatPolicyCache {
2✔
1077
                                ingresses = append(ingresses, policy.SnatIp...)
1✔
1078
                        }
1✔
1079
                        serviceObjs = append(serviceObjs,
1✔
1080
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
1081
                                        cont.config.AciL3Out, ingresses, sharedSecurity, true))
1✔
1082
                }
1083

1084
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, cont.apicConn.SnatPbrFltrChain, false)
1✔
1085
                serviceObjs = append(serviceObjs, contract)
1✔
1086

1✔
1087
                for _, net := range cont.config.AciExtNetworks {
2✔
1088
                        serviceObjs = append(serviceObjs,
1✔
1089
                                apicExtNetProv(name, cont.config.AciVrfTenant,
1✔
1090
                                        cont.config.AciL3Out, net))
1✔
1091
                }
1✔
1092

1093
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
1094
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
1095
                portSpec := []portRangeSnat{defaultPortRange}
1✔
1096
                if cont.apicConn.SnatPbrFltrChain {
2✔
1097
                        filterIn := apicFilterSnat(name+"_fromCons-toProv", cont.config.AciVrfTenant, portSpec, false)
1✔
1098
                        serviceObjs = append(serviceObjs, filterIn)
1✔
1099
                        filterOut := apicFilterSnat(name+"_fromProv-toCons", cont.config.AciVrfTenant, portSpec, true)
1✔
1100
                        serviceObjs = append(serviceObjs, filterOut)
1✔
1101
                } else {
1✔
1102
                        filter := apicFilterSnat(name, cont.config.AciVrfTenant, portSpec, false)
×
1103
                        serviceObjs = append(serviceObjs, filter)
×
1104
                }
×
1105
                // 3. Device cluster context
1106
                // The logical device context binds the service contract
1107
                // to the redirect policy and the device cluster and
1108
                // bridge domain for the device cluster.
1109
                serviceObjs = append(serviceObjs,
1✔
1110
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName, graphName,
1✔
1111
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, cont.apicConn.SnatPbrFltrChain))
1✔
1112
        }
1113
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1114
        return nil
1✔
1115
}
1116

1117
func (cont *AciController) nodeLabelsInExcludeList(Labels map[string]string) bool {
1✔
1118
        nodeSnatRedirectExclude := cont.config.NodeSnatRedirectExclude
1✔
1119

1✔
1120
        for _, nodeGroup := range nodeSnatRedirectExclude {
1✔
1121
                if len(nodeGroup.Labels) == 0 {
×
1122
                        continue
×
1123
                }
1124
                matchFound := true
×
1125
                for _, label := range nodeGroup.Labels {
×
1126
                        if _, ok := Labels["node-role.kubernetes.io/"+label]; !ok {
×
1127
                                matchFound = false
×
1128
                                break
×
1129
                        }
1130
                }
1131
                if matchFound {
×
1132
                        return true
×
1133
                }
×
1134
        }
1135
        return false
1✔
1136
}
1137

1138
func (cont *AciController) queueServiceUpdateByKey(key string) {
1✔
1139
        cont.serviceQueue.Add(key)
1✔
1140
}
1✔
1141

1142
func (cont *AciController) queueServiceUpdate(service *v1.Service) {
1✔
1143
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
1144
        if err != nil {
1✔
1145
                serviceLogger(cont.log, service).
×
1146
                        Error("Could not create service key: ", err)
×
1147
                return
×
1148
        }
×
1149
        cont.serviceQueue.Add(key)
1✔
1150
}
1151

1152
func apicDeviceCluster(name string, vrfTenant string,
1153
        physDom string, encap string,
1154
        nodes []string, nodeMap map[string]string) (apicapi.ApicObject, string) {
1✔
1155
        dc := apicapi.NewVnsLDevVip(vrfTenant, name)
1✔
1156
        dc.SetAttr("managed", "no")
1✔
1157
        dcDn := dc.GetDn()
1✔
1158
        dc.AddChild(apicapi.NewVnsRsALDevToPhysDomP(dcDn,
1✔
1159
                fmt.Sprintf("uni/phys-%s", physDom)))
1✔
1160
        lif := apicapi.NewVnsLIf(dcDn, "interface")
1✔
1161
        lif.SetAttr("encap", encap)
1✔
1162
        lifDn := lif.GetDn()
1✔
1163

1✔
1164
        for _, node := range nodes {
2✔
1165
                path, ok := nodeMap[node]
1✔
1166
                if !ok {
1✔
1167
                        continue
×
1168
                }
1169

1170
                cdev := apicapi.NewVnsCDev(dcDn, node)
1✔
1171
                cif := apicapi.NewVnsCif(cdev.GetDn(), "interface")
1✔
1172
                cif.AddChild(apicapi.NewVnsRsCIfPathAtt(cif.GetDn(), path))
1✔
1173
                cdev.AddChild(cif)
1✔
1174
                lif.AddChild(apicapi.NewVnsRsCIfAttN(lifDn, cif.GetDn()))
1✔
1175
                dc.AddChild(cdev)
1✔
1176
        }
1177

1178
        dc.AddChild(lif)
1✔
1179

1✔
1180
        return dc, dcDn
1✔
1181
}
1182

1183
func apicServiceGraph(name string, tenantName string,
1184
        dcDn string) apicapi.ApicObject {
1✔
1185
        sg := apicapi.NewVnsAbsGraph(tenantName, name)
1✔
1186
        sgDn := sg.GetDn()
1✔
1187
        var provDn string
1✔
1188
        var consDn string
1✔
1189
        var cTermDn string
1✔
1190
        var pTermDn string
1✔
1191
        {
2✔
1192
                an := apicapi.NewVnsAbsNode(sgDn, "loadbalancer")
1✔
1193
                an.SetAttr("managed", "no")
1✔
1194
                an.SetAttr("routingMode", "Redirect")
1✔
1195
                anDn := an.GetDn()
1✔
1196
                cons := apicapi.NewVnsAbsFuncConn(anDn, "consumer")
1✔
1197
                consDn = cons.GetDn()
1✔
1198
                an.AddChild(cons)
1✔
1199
                prov := apicapi.NewVnsAbsFuncConn(anDn, "provider")
1✔
1200
                provDn = prov.GetDn()
1✔
1201
                an.AddChild(prov)
1✔
1202
                an.AddChild(apicapi.NewVnsRsNodeToLDev(anDn, dcDn))
1✔
1203
                sg.AddChild(an)
1✔
1204
        }
1✔
1205
        {
1✔
1206
                tnc := apicapi.NewVnsAbsTermNodeCon(sgDn, "T1")
1✔
1207
                tncDn := tnc.GetDn()
1✔
1208
                cTerm := apicapi.NewVnsAbsTermConn(tncDn)
1✔
1209
                cTermDn = cTerm.GetDn()
1✔
1210
                tnc.AddChild(cTerm)
1✔
1211
                tnc.AddChild(apicapi.NewVnsInTerm(tncDn))
1✔
1212
                tnc.AddChild(apicapi.NewVnsOutTerm(tncDn))
1✔
1213
                sg.AddChild(tnc)
1✔
1214
        }
1✔
1215
        {
1✔
1216
                tnp := apicapi.NewVnsAbsTermNodeProv(sgDn, "T2")
1✔
1217
                tnpDn := tnp.GetDn()
1✔
1218
                pTerm := apicapi.NewVnsAbsTermConn(tnpDn)
1✔
1219
                pTermDn = pTerm.GetDn()
1✔
1220
                tnp.AddChild(pTerm)
1✔
1221
                tnp.AddChild(apicapi.NewVnsInTerm(tnpDn))
1✔
1222
                tnp.AddChild(apicapi.NewVnsOutTerm(tnpDn))
1✔
1223
                sg.AddChild(tnp)
1✔
1224
        }
1✔
1225
        {
1✔
1226
                acc := apicapi.NewVnsAbsConnection(sgDn, "C1")
1✔
1227
                acc.SetAttr("connDir", "provider")
1✔
1228
                accDn := acc.GetDn()
1✔
1229
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, consDn))
1✔
1230
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, cTermDn))
1✔
1231
                sg.AddChild(acc)
1✔
1232
        }
1✔
1233
        {
1✔
1234
                acp := apicapi.NewVnsAbsConnection(sgDn, "C2")
1✔
1235
                acp.SetAttr("connDir", "provider")
1✔
1236
                acpDn := acp.GetDn()
1✔
1237
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, provDn))
1✔
1238
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, pTermDn))
1✔
1239
                sg.AddChild(acp)
1✔
1240
        }
1✔
1241
        return sg
1✔
1242
}
1243
func (cont *AciController) updateDeviceCluster() {
1✔
1244
        nodeMap := make(map[string]string)
1✔
1245
        cont.indexMutex.Lock()
1✔
1246
        for node := range cont.nodeOpflexDevice {
2✔
1247
                cont.log.Debug("Processing node in nodeOpflexDevice cache : ", node)
1✔
1248
                fabricPath, ok := cont.fabricPathForNode(node)
1✔
1249
                if !ok {
2✔
1250
                        continue
1✔
1251
                }
1252
                nodeMap[node] = fabricPath
1✔
1253
        }
1254
        cont.indexMutex.Unlock()
1✔
1255

1✔
1256
        var nodes []string
1✔
1257
        for node := range nodeMap {
2✔
1258
                nodes = append(nodes, node)
1✔
1259
        }
1✔
1260
        sort.Strings(nodes)
1✔
1261

1✔
1262
        name := cont.aciNameForKey("svc", "global")
1✔
1263
        var serviceObjs apicapi.ApicSlice
1✔
1264

1✔
1265
        // 1. Device cluster:
1✔
1266
        // The device cluster is a set of physical paths that need to be
1✔
1267
        // created for each node in the cluster, that correspond to the
1✔
1268
        // service interface for each node.
1✔
1269
        dc, dcDn := apicDeviceCluster(name, cont.config.AciVrfTenant,
1✔
1270
                cont.config.AciServicePhysDom, cont.config.AciServiceEncap,
1✔
1271
                nodes, nodeMap)
1✔
1272
        serviceObjs = append(serviceObjs, dc)
1✔
1273

1✔
1274
        // 2. Service graph template
1✔
1275
        // The service graph controls how the traffic will be redirected.
1✔
1276
        // A service graph must be created for each device cluster.
1✔
1277
        serviceObjs = append(serviceObjs,
1✔
1278
                apicServiceGraph(name, cont.config.AciVrfTenant, dcDn))
1✔
1279

1✔
1280
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1281
}
1282

1283
func (cont *AciController) fabricPathLogger(node string,
1284
        obj apicapi.ApicObject) *logrus.Entry {
1✔
1285
        return cont.log.WithFields(logrus.Fields{
1✔
1286
                "fabricPath": obj.GetAttr("fabricPathDn"),
1✔
1287
                "mac":        obj.GetAttr("mac"),
1✔
1288
                "node":       node,
1✔
1289
                "obj":        obj,
1✔
1290
        })
1✔
1291
}
1✔
1292

1293
func (cont *AciController) opflexDeviceChanged(obj apicapi.ApicObject) {
1✔
1294
        devType := obj.GetAttrStr("devType")
1✔
1295
        domName := obj.GetAttrStr("domName")
1✔
1296
        ctrlrName := obj.GetAttrStr("ctrlrName")
1✔
1297

1✔
1298
        if (devType == cont.env.OpFlexDeviceType()) && (domName == cont.config.AciVmmDomain) && (ctrlrName == cont.config.AciVmmController) {
2✔
1299
                cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Processing opflex device update")
1✔
1300
                if obj.GetAttrStr("state") == "disconnected" {
2✔
1301
                        cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Opflex device disconnected")
1✔
1302
                        cont.indexMutex.Lock()
1✔
1303
                        for node, devices := range cont.nodeOpflexDevice {
1✔
1304
                                if node == obj.GetAttrStr("hostName") {
×
1305
                                        for _, device := range devices {
×
1306
                                                if device.GetDn() == obj.GetDn() {
×
1307
                                                        device.SetAttr("state", "disconnected")
×
1308
                                                        cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Debug("Opflex device cache updated for disconnected node")
×
1309
                                                }
×
1310
                                        }
1311
                                        cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", devices)
×
1312
                                        break
×
1313
                                }
1314
                        }
1315
                        cont.indexMutex.Unlock()
1✔
1316
                        cont.updateDeviceCluster()
1✔
1317
                        return
1✔
1318
                }
1319
                var nodeUpdates []string
1✔
1320

1✔
1321
                cont.indexMutex.Lock()
1✔
1322
                nodefound := false
1✔
1323
                for node, devices := range cont.nodeOpflexDevice {
2✔
1324
                        found := false
1✔
1325

1✔
1326
                        if node == obj.GetAttrStr("hostName") {
2✔
1327
                                nodefound = true
1✔
1328
                        }
1✔
1329

1330
                        for i, device := range devices {
2✔
1331
                                if device.GetDn() != obj.GetDn() {
2✔
1332
                                        continue
1✔
1333
                                }
1334
                                found = true
1✔
1335

1✔
1336
                                if obj.GetAttrStr("hostName") != node {
2✔
1337
                                        cont.fabricPathLogger(node, device).
1✔
1338
                                                Debug("Moving opflex device from node")
1✔
1339

1✔
1340
                                        devices = append(devices[:i], devices[i+1:]...)
1✔
1341
                                        cont.nodeOpflexDevice[node] = devices
1✔
1342
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1343
                                        break
1✔
1344
                                } else if (device.GetAttrStr("mac") != obj.GetAttrStr("mac")) ||
1✔
1345
                                        (device.GetAttrStr("fabricPathDn") != obj.GetAttrStr("fabricPathDn")) ||
1✔
1346
                                        (device.GetAttrStr("state") != obj.GetAttrStr("state")) {
2✔
1347
                                        cont.fabricPathLogger(node, obj).
1✔
1348
                                                Debug("Updating opflex device")
1✔
1349

1✔
1350
                                        devices = append(append(devices[:i], devices[i+1:]...), obj)
1✔
1351
                                        cont.nodeOpflexDevice[node] = devices
1✔
1352
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1353
                                        break
1✔
1354
                                }
1355
                        }
1356
                        if !found && obj.GetAttrStr("hostName") == node {
2✔
1357
                                cont.fabricPathLogger(node, obj).
1✔
1358
                                        Debug("Appending opflex device")
1✔
1359

1✔
1360
                                devices = append(devices, obj)
1✔
1361
                                cont.nodeOpflexDevice[node] = devices
1✔
1362
                                nodeUpdates = append(nodeUpdates, node)
1✔
1363
                        }
1✔
1364
                }
1365
                if !nodefound {
2✔
1366
                        node := obj.GetAttrStr("hostName")
1✔
1367
                        cont.fabricPathLogger(node, obj).Debug("Adding opflex device")
1✔
1368
                        cont.nodeOpflexDevice[node] = apicapi.ApicSlice{obj}
1✔
1369
                        nodeUpdates = append(nodeUpdates, node)
1✔
1370
                }
1✔
1371
                cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", cont.nodeOpflexDevice[obj.GetAttrStr("hostName")])
1✔
1372
                cont.indexMutex.Unlock()
1✔
1373

1✔
1374
                for _, node := range nodeUpdates {
2✔
1375
                        cont.env.NodeServiceChanged(node)
1✔
1376
                        cont.erspanSyncOpflexDev()
1✔
1377
                }
1✔
1378
                cont.updateDeviceCluster()
1✔
1379
        }
1380
}
1381

1382
func (cont *AciController) postOpflexDeviceDelete(nodes []string) {
1✔
1383
        cont.updateDeviceCluster()
1✔
1384
        for _, node := range nodes {
2✔
1385
                cont.env.NodeServiceChanged(node)
1✔
1386
                cont.erspanSyncOpflexDev()
1✔
1387
        }
1✔
1388
}
1389

1390
func (cont *AciController) opflexDeviceDeleted(dn string) {
1✔
1391
        var nodeUpdates []string
1✔
1392
        var dnFound bool //to check if the dn belongs to this cluster
1✔
1393
        cont.indexMutex.Lock()
1✔
1394
        for node, devices := range cont.nodeOpflexDevice {
2✔
1395
                for i, device := range devices {
2✔
1396
                        if device.GetDn() != dn {
2✔
1397
                                continue
1✔
1398
                        }
1399
                        dnFound = true
1✔
1400
                        cont.fabricPathLogger(node, device).
1✔
1401
                                Debug("Deleting opflex device path")
1✔
1402
                        devices = append(devices[:i], devices[i+1:]...)
1✔
1403
                        cont.nodeOpflexDevice[node] = devices
1✔
1404
                        nodeUpdates = append(nodeUpdates, node)
1✔
1405
                        break
1✔
1406
                }
1407
                if len(devices) == 0 {
2✔
1408
                        delete(cont.nodeOpflexDevice, node)
1✔
1409
                }
1✔
1410
        }
1411
        cont.indexMutex.Unlock()
1✔
1412

1✔
1413
        if dnFound {
2✔
1414
                cont.postOpflexDeviceDelete(nodeUpdates)
1✔
1415
        }
1✔
1416
}
1417

1418
func (cont *AciController) writeApicSvc(key string, service *v1.Service) {
1✔
1419
        if cont.config.ChainedMode {
1✔
1420
                return
×
1421
        }
×
1422
        aobj := apicapi.NewVmmInjectedSvc(cont.vmmDomainProvider(),
1✔
1423
                cont.config.AciVmmDomain, cont.config.AciVmmController,
1✔
1424
                service.Namespace, service.Name)
1✔
1425
        aobjDn := aobj.GetDn()
1✔
1426
        aobj.SetAttr("guid", string(service.UID))
1✔
1427

1✔
1428
        svcns := service.ObjectMeta.Namespace
1✔
1429
        _, exists, err := cont.namespaceIndexer.GetByKey(svcns)
1✔
1430
        if err != nil {
1✔
1431
                cont.log.Error("Failed to lookup ns : ", svcns, " ", err)
×
1432
                return
×
1433
        }
×
1434
        if !exists {
2✔
1435
                cont.log.Debug("Namespace of service ", service.ObjectMeta.Name, ": ", svcns, " doesn't exist, hence not sending an update to the APIC")
1✔
1436
                return
1✔
1437
        }
1✔
1438

1439
        if !cont.serviceEndPoints.SetServiceApicObject(aobj, service) {
2✔
1440
                return
1✔
1441
        }
1✔
1442
        var setApicSvcDnsName bool
1✔
1443
        if len(cont.config.ApicHosts) != 0 && apicapi.ApicVersion >= "5.1" {
1✔
1444
                setApicSvcDnsName = true
×
1445
        }
×
1446
        // APIC model only allows one of these
1447
        for _, ingress := range service.Status.LoadBalancer.Ingress {
1✔
1448
                if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
1449
                        aobj.SetAttr("lbIp", ingress.IP)
×
1450
                } else if ingress.Hostname != "" {
×
1451
                        ipList, err := net.LookupHost(ingress.Hostname)
×
1452
                        if err == nil && len(ipList) > 0 {
×
1453
                                aobj.SetAttr("lbIp", ipList[0])
×
1454
                        } else {
×
1455
                                cont.log.Errorf("Lookup: err: %v, ipList: %+v", err, ipList)
×
1456
                        }
×
1457
                }
1458
                break
×
1459
        }
1460
        if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != "None" {
2✔
1461
                aobj.SetAttr("clusterIp", service.Spec.ClusterIP)
1✔
1462
        }
1✔
1463

1464
        var t string
1✔
1465
        switch service.Spec.Type {
1✔
1466
        case v1.ServiceTypeClusterIP:
×
1467
                t = "clusterIp"
×
1468
        case v1.ServiceTypeNodePort:
×
1469
                t = "nodePort"
×
1470
        case v1.ServiceTypeLoadBalancer:
1✔
1471
                t = "loadBalancer"
1✔
1472
        case v1.ServiceTypeExternalName:
×
1473
                t = "externalName"
×
1474
        }
1475
        if t != "" {
2✔
1476
                aobj.SetAttr("type", t)
1✔
1477
        }
1✔
1478

1479
        if setApicSvcDnsName || cont.config.Flavor == "k8s-overlay" {
1✔
1480
                dnsName := fmt.Sprintf("%s.%s.svc.cluster.local", service.Name, service.Namespace)
×
1481

×
1482
                for _, ingress := range service.Status.LoadBalancer.Ingress {
×
1483
                        if ingress.Hostname != "" {
×
1484
                                aobj.SetAttr("dnsName", ingress.Hostname)
×
1485
                        } else if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
1486
                                aobj.SetAttr("dnsName", dnsName)
×
1487
                        }
×
1488
                }
1489
                if t == "clusterIp" || t == "nodePort" || t == "externalName" {
×
1490
                        aobj.SetAttr("dnsName", dnsName)
×
1491
                }
×
1492
        }
1493
        for _, port := range service.Spec.Ports {
2✔
1494
                proto := getProtocolStr(port.Protocol)
1✔
1495
                p := apicapi.NewVmmInjectedSvcPort(aobjDn,
1✔
1496
                        strconv.Itoa(int(port.Port)), proto, port.TargetPort.String())
1✔
1497
                p.SetAttr("nodePort", strconv.Itoa(int(port.NodePort)))
1✔
1498
                aobj.AddChild(p)
1✔
1499
        }
1✔
1500
        if cont.config.EnableVmmInjectedLabels && service.ObjectMeta.Labels != nil && apicapi.ApicVersion >= "5.2" {
1✔
1501
                for key, val := range service.ObjectMeta.Labels {
×
1502
                        newLabelKey := cont.aciNameForKey("label", key)
×
1503
                        label := apicapi.NewVmmInjectedLabel(aobj.GetDn(),
×
1504
                                newLabelKey, val)
×
1505
                        aobj.AddChild(label)
×
1506
                }
×
1507
        }
1508
        name := cont.aciNameForKey("service-vmm", key)
1✔
1509
        cont.log.Debug("Write Service Object: ", aobj)
1✔
1510
        cont.apicConn.WriteApicObjects(name, apicapi.ApicSlice{aobj})
1✔
1511
        cont.log.Debugf("svcObject: %+v", aobj)
1✔
1512
}
1513

1514
func removeAllConditions(conditions []metav1.Condition, conditionType string) []metav1.Condition {
1✔
1515
        i := 0
1✔
1516
        for _, cond := range conditions {
1✔
1517
                if cond.Type != conditionType {
×
1518
                        conditions[i] = cond
×
1519
                }
×
1520
        }
1521
        return conditions[:i]
1✔
1522
}
1523

1524
func (cont *AciController) updateServiceCondition(service *v1.Service, success bool, reason string, message string) error {
1✔
1525
        conditionType := "LbIpamAllocation"
1✔
1526

1✔
1527
        var err error
1✔
1528
        var condition metav1.Condition
1✔
1529
        if success {
2✔
1530
                condition.Status = metav1.ConditionTrue
1✔
1531
        } else {
2✔
1532
                condition.Status = metav1.ConditionFalse
1✔
1533
                condition.Message = message
1✔
1534
        }
1✔
1535
        condition.Type = conditionType
1✔
1536
        condition.Reason = reason
1✔
1537
        condition.LastTransitionTime = metav1.Time{time.Now()}
1✔
1538
        for _, cond := range service.Status.Conditions {
2✔
1539
                if cond.Type == conditionType &&
1✔
1540
                        cond.Status == condition.Status &&
1✔
1541
                        cond.Message == condition.Message &&
1✔
1542
                        cond.Reason == condition.Reason {
2✔
1543
                        return nil
1✔
1544
                }
1✔
1545
        }
1546

1547
        service.Status.Conditions = removeAllConditions(service.Status.Conditions, conditionType)
1✔
1548
        service.Status.Conditions = append(service.Status.Conditions, condition)
1✔
1549
        _, err = cont.updateServiceStatus(service)
1✔
1550
        return err
1✔
1551
}
1552

1553
func (cont *AciController) validateRequestedIps(lbIpList []string) (net.IP, net.IP, bool) {
1✔
1554
        var ipv4, ipv6 net.IP
1✔
1555
        for _, lbIp := range lbIpList {
2✔
1556
                ip := net.ParseIP(lbIp)
1✔
1557
                if ip != nil {
2✔
1558
                        if ip.To4() != nil {
2✔
1559
                                if ipv4.Equal(net.IP{}) {
2✔
1560
                                        ipv4 = ip
1✔
1561
                                } else {
2✔
1562
                                        cont.log.Error("Annotation should have only one ipv4")
1✔
1563
                                        return ipv4, ipv6, false
1✔
1564
                                }
1✔
1565
                        } else if ip.To16() != nil {
2✔
1566
                                if ipv6.Equal(net.IP{}) {
2✔
1567
                                        ipv6 = ip
1✔
1568
                                } else {
2✔
1569
                                        cont.log.Error("Annotation should have only one ipv6")
1✔
1570
                                        return ipv4, ipv6, false
1✔
1571
                                }
1✔
1572
                        }
1573
                }
1574
        }
1575
        return ipv4, ipv6, true
1✔
1576
}
1577

1578
func (cont *AciController) returnUnusedStaticIngressIps(staticIngressIps, requestedIps []net.IP) {
1✔
1579
        for _, staticIp := range staticIngressIps {
2✔
1580
                found := false
1✔
1581
                for _, reqIp := range requestedIps {
2✔
1582
                        if reqIp.Equal(staticIp) {
2✔
1583
                                found = true
1✔
1584
                        }
1✔
1585
                }
1586
                if !found {
1✔
1587
                        returnIps(cont.staticServiceIps, []net.IP{staticIp})
×
1588
                }
×
1589
        }
1590
}
1591

1592
func (cont *AciController) allocateServiceIps(servicekey string,
1593
        service *v1.Service) bool {
1✔
1594
        logger := serviceLogger(cont.log, service)
1✔
1595
        cont.indexMutex.Lock()
1✔
1596
        meta, ok := cont.serviceMetaCache[servicekey]
1✔
1597
        if !ok {
2✔
1598
                meta = &serviceMeta{}
1✔
1599
                cont.serviceMetaCache[servicekey] = meta
1✔
1600

1✔
1601
                // Read any existing IPs and attempt to allocate them to the pod
1✔
1602
                for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
1603
                        ip := net.ParseIP(ingress.IP)
1✔
1604
                        if ip == nil {
1✔
1605
                                continue
×
1606
                        }
1607
                        if ip.To4() != nil {
2✔
1608
                                if cont.serviceIps.GetV4IpCache()[0].RemoveIp(ip) {
2✔
1609
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
1610
                                } else if cont.staticServiceIps.V4.RemoveIp(ip) {
3✔
1611
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
1612
                                }
1✔
1613
                        } else if ip.To16() != nil {
2✔
1614
                                if cont.serviceIps.GetV6IpCache()[0].RemoveIp(ip) {
2✔
1615
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
1616
                                } else if cont.staticServiceIps.V6.RemoveIp(ip) {
3✔
1617
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
1618
                                }
1✔
1619
                        }
1620
                }
1621
        }
1622

1623
        if !cont.serviceSyncEnabled {
2✔
1624
                cont.indexMutex.Unlock()
1✔
1625
                return false
1✔
1626
        }
1✔
1627

1628
        var requestedIps []net.IP
1✔
1629
        // try to give the requested load balancer IP to the pod
1✔
1630
        lbIps, ok := service.ObjectMeta.Annotations[metadata.LbIpAnnotation]
1✔
1631
        if ok {
2✔
1632
                lbIpList := strings.Split(lbIps, ",")
1✔
1633
                ipv4, ipv6, valid := cont.validateRequestedIps(lbIpList)
1✔
1634
                if valid {
2✔
1635
                        if ipv4 != nil {
2✔
1636
                                requestedIps = append(requestedIps, ipv4)
1✔
1637
                        }
1✔
1638
                        if ipv6 != nil {
2✔
1639
                                requestedIps = append(requestedIps, ipv6)
1✔
1640
                        }
1✔
1641
                } else {
1✔
1642
                        cont.returnServiceIps(meta.ingressIps)
1✔
1643
                        cont.log.Error("Invalid LB IP annotation for service ", servicekey)
1✔
1644
                        err := cont.updateServiceCondition(service, false, "InvalidAnnotation", "Invalid Loadbalancer IP annotation")
1✔
1645
                        if err != nil {
1✔
1646
                                logger.Error("Failed to update service status : ", err)
×
1647
                                cont.indexMutex.Unlock()
×
1648
                                return true
×
1649
                        }
×
1650
                        cont.indexMutex.Unlock()
1✔
1651
                        return false
1✔
1652
                }
1653
        } else {
1✔
1654
                requestedIp := net.ParseIP(service.Spec.LoadBalancerIP)
1✔
1655
                if requestedIp != nil {
2✔
1656
                        requestedIps = append(requestedIps, requestedIp)
1✔
1657
                }
1✔
1658
        }
1659
        if len(requestedIps) > 0 {
2✔
1660
                meta.requestedIps = []net.IP{}
1✔
1661
                for _, requestedIp := range requestedIps {
2✔
1662
                        hasRequestedIp := false
1✔
1663
                        for _, ip := range meta.staticIngressIps {
2✔
1664
                                if reflect.DeepEqual(requestedIp, ip) {
2✔
1665
                                        hasRequestedIp = true
1✔
1666
                                }
1✔
1667
                        }
1668
                        if !hasRequestedIp {
2✔
1669
                                if requestedIp.To4() != nil &&
1✔
1670
                                        cont.staticServiceIps.V4.RemoveIp(requestedIp) {
2✔
1671
                                        hasRequestedIp = true
1✔
1672
                                } else if requestedIp.To16() != nil &&
2✔
1673
                                        cont.staticServiceIps.V6.RemoveIp(requestedIp) {
2✔
1674
                                        hasRequestedIp = true
1✔
1675
                                }
1✔
1676
                        }
1677
                        if hasRequestedIp {
2✔
1678
                                meta.requestedIps = append(meta.requestedIps, requestedIp)
1✔
1679
                        }
1✔
1680
                }
1681
                cont.returnUnusedStaticIngressIps(meta.staticIngressIps, meta.requestedIps)
1✔
1682
                meta.staticIngressIps = meta.requestedIps
1✔
1683
                cont.returnServiceIps(meta.ingressIps)
1✔
1684
                meta.ingressIps = nil
1✔
1685
                // If no requested ips are allocatable
1✔
1686
                if len(meta.requestedIps) < 1 {
2✔
1687
                        logger.Error("No Requested Ip addresses available for service ", servicekey)
1✔
1688
                        err := cont.updateServiceCondition(service, false, "RequestedIpsNotAllocatable", "The requested ips for loadbalancer service are not available or not in extern static range")
1✔
1689
                        if err != nil {
1✔
1690
                                cont.indexMutex.Unlock()
×
1691
                                logger.Error("Failed to update service status: ", err)
×
1692
                                return true
×
1693
                        }
×
1694
                        cont.indexMutex.Unlock()
1✔
1695
                        return false
1✔
1696
                }
1697
        } else if len(meta.requestedIps) > 0 {
1✔
1698
                meta.requestedIps = nil
×
1699
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
×
1700
                meta.staticIngressIps = nil
×
1701
        }
×
1702
        ingressIps := make([]net.IP, 0)
1✔
1703
        ingressIps = append(ingressIps, meta.ingressIps...)
1✔
1704
        ingressIps = append(ingressIps, meta.staticIngressIps...)
1✔
1705

1✔
1706
        var ipv4, ipv6 net.IP
1✔
1707
        for _, ip := range ingressIps {
2✔
1708
                if ip.To4() != nil {
2✔
1709
                        ipv4 = ip
1✔
1710
                } else if ip.To16() != nil {
3✔
1711
                        ipv6 = ip
1✔
1712
                }
1✔
1713
        }
1714
        var clusterIPv4, clusterIPv6 net.IP
1✔
1715
        clusterIPs := append([]string{service.Spec.ClusterIP}, service.Spec.ClusterIPs...)
1✔
1716
        for _, ipStr := range clusterIPs {
2✔
1717
                ip := net.ParseIP(ipStr)
1✔
1718
                if ip == nil {
1✔
1719
                        continue
×
1720
                }
1721
                if ip.To4() != nil && clusterIPv4 == nil {
2✔
1722
                        clusterIPv4 = ip
1✔
1723
                } else if ip.To16() != nil && strings.Contains(ip.String(), ":") && clusterIPv6 == nil {
3✔
1724
                        clusterIPv6 = ip
1✔
1725
                }
1✔
1726
        }
1727
        if clusterIPv4 != nil && ipv4 == nil {
2✔
1728
                if len(requestedIps) < 1 {
2✔
1729
                        ipv4, _ = cont.serviceIps.AllocateIp(true)
1✔
1730
                        if ipv4 != nil {
2✔
1731
                                ingressIps = append(ingressIps, ipv4)
1✔
1732
                        }
1✔
1733
                }
1734
        } else if clusterIPv4 == nil && ipv4 != nil {
1✔
1735
                cont.removeIpFromIngressIPList(&ingressIps, ipv4)
×
1736
        }
×
1737

1738
        if clusterIPv6 != nil && ipv6 == nil {
2✔
1739
                if len(requestedIps) < 1 {
2✔
1740
                        ipv6, _ = cont.serviceIps.AllocateIp(false)
1✔
1741
                        if ipv6 != nil {
2✔
1742
                                ingressIps = append(ingressIps, ipv6)
1✔
1743
                        }
1✔
1744
                }
1745
        } else if clusterIPv6 == nil && ipv6 != nil {
1✔
1746
                cont.removeIpFromIngressIPList(&ingressIps, ipv6)
×
1747
        }
×
1748

1749
        meta.ingressIps = ingressIps
1✔
1750
        if ipv4 == nil && ipv6 == nil {
2✔
1751
                logger.Error("No IP addresses available for service")
1✔
1752
                cont.indexMutex.Unlock()
1✔
1753
                return true
1✔
1754
        }
1✔
1755
        cont.indexMutex.Unlock()
1✔
1756
        var newIngress []v1.LoadBalancerIngress
1✔
1757
        for _, ip := range meta.ingressIps {
2✔
1758
                newIngress = append(newIngress, v1.LoadBalancerIngress{IP: ip.String()})
1✔
1759
        }
1✔
1760

1761
        if !reflect.DeepEqual(newIngress, service.Status.LoadBalancer.Ingress) {
2✔
1762
                service.Status.LoadBalancer.Ingress = newIngress
1✔
1763

1✔
1764
                _, err := cont.updateServiceStatus(service)
1✔
1765
                if err != nil {
1✔
1766
                        logger.Error("Failed to update service: ", err)
×
1767
                        return true
×
1768
                } else {
1✔
1769
                        logger.WithFields(logrus.Fields{
1✔
1770
                                "status": service.Status.LoadBalancer.Ingress,
1✔
1771
                        }).Info("Updated service load balancer status")
1✔
1772
                }
1✔
1773
        }
1774

1775
        success := true
1✔
1776
        reason := "Success"
1✔
1777
        message := ""
1✔
1778
        if len(requestedIps) > 0 && len(requestedIps) != len(meta.ingressIps) {
1✔
1779
                success = false
×
1780
                reason = "OneIpNotAllocatable"
×
1781
                message = "One of the requested Ips is not allocatable"
×
1782
        }
×
1783
        err := cont.updateServiceCondition(service, success, reason, message)
1✔
1784
        if err != nil {
1✔
1785
                logger.Error("Failed to update service status: ", err)
×
1786
                return true
×
1787
        }
×
1788
        return false
1✔
1789
}
1790

1791
func (cont *AciController) handleServiceDelete(servicekey string) bool {
1✔
1792
        if cont.config.ChainedMode {
1✔
1793
                return false
×
1794
        }
×
1795
        cont.clearLbService(servicekey)
1✔
1796
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("service-vmm",
1✔
1797
                servicekey))
1✔
1798
        return false
1✔
1799
}
1800

1801
func (cont *AciController) handleServiceUpdate(service *v1.Service) bool {
1✔
1802
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
1803
        if err != nil {
1✔
1804
                serviceLogger(cont.log, service).
×
1805
                        Error("Could not create service key: ", err)
×
1806
                return false
×
1807
        }
×
1808
        if cont.config.ChainedMode {
1✔
1809
                return false
×
1810
        }
×
1811
        var requeue bool
1✔
1812
        isLoadBalancer := service.Spec.Type == v1.ServiceTypeLoadBalancer
1✔
1813
        aciLB := cont.config.LBType == lbTypeAci
1✔
1814
        if isLoadBalancer && aciLB {
2✔
1815
                if *cont.config.AllocateServiceIps {
2✔
1816
                        requeue = cont.allocateServiceIps(servicekey, service)
1✔
1817
                }
1✔
1818
                cont.indexMutex.Lock()
1✔
1819
                if cont.serviceSyncEnabled {
2✔
1820
                        cont.indexMutex.Unlock()
1✔
1821
                        err = cont.updateServiceDeviceInstance(servicekey, service)
1✔
1822
                        if err != nil {
1✔
1823
                                serviceLogger(cont.log, service).
×
1824
                                        Error("Failed to update service device Instance: ", err)
×
1825
                                return true
×
1826
                        }
×
1827
                } else {
1✔
1828
                        cont.indexMutex.Unlock()
1✔
1829
                }
1✔
1830
        } else if aciLB {
2✔
1831
                cont.clearLbService(servicekey)
1✔
1832
        }
1✔
1833
        cont.writeApicSvc(servicekey, service)
1✔
1834
        return requeue
1✔
1835
}
1836

1837
func (cont *AciController) clearLbService(servicekey string) {
1✔
1838
        cont.indexMutex.Lock()
1✔
1839
        if meta, ok := cont.serviceMetaCache[servicekey]; ok {
2✔
1840
                cont.returnServiceIps(meta.ingressIps)
1✔
1841
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
1✔
1842
                delete(cont.serviceMetaCache, servicekey)
1✔
1843
        }
1✔
1844
        cont.indexMutex.Unlock()
1✔
1845
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("svc", servicekey))
1✔
1846
}
1847

1848
func getEndpointsIps(endpoints *v1.Endpoints) map[string]bool {
1✔
1849
        ips := make(map[string]bool)
1✔
1850
        for _, subset := range endpoints.Subsets {
2✔
1851
                for _, addr := range subset.Addresses {
2✔
1852
                        ips[addr.IP] = true
1✔
1853
                }
1✔
1854
                for _, addr := range subset.NotReadyAddresses {
1✔
1855
                        ips[addr.IP] = true
×
1856
                }
×
1857
        }
1858
        return ips
1✔
1859
}
1860

1861
func getServiceTargetPorts(service *v1.Service) map[string]targetPort {
1✔
1862
        ports := make(map[string]targetPort)
1✔
1863
        for _, port := range service.Spec.Ports {
2✔
1864
                portNum := port.TargetPort.IntValue()
1✔
1865
                if portNum <= 0 {
2✔
1866
                        portNum = int(port.Port)
1✔
1867
                }
1✔
1868
                key := portProto(&port.Protocol) + "-num-" + strconv.Itoa(portNum)
1✔
1869
                ports[key] = targetPort{
1✔
1870
                        proto: port.Protocol,
1✔
1871
                        ports: []int{portNum},
1✔
1872
                }
1✔
1873
        }
1874
        return ports
1✔
1875
}
1876

1877
func (cont *AciController) endpointsAdded(obj interface{}) {
1✔
1878
        endpoints := obj.(*v1.Endpoints)
1✔
1879
        servicekey, err := cache.MetaNamespaceKeyFunc(obj.(*v1.Endpoints))
1✔
1880
        if err != nil {
1✔
1881
                cont.log.Error("Could not create service key: ", err)
×
1882
                return
×
1883
        }
×
1884

1885
        ips := getEndpointsIps(endpoints)
1✔
1886
        cont.indexMutex.Lock()
1✔
1887
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
1888
        cont.queueIPNetPolUpdates(ips)
1✔
1889
        cont.indexMutex.Unlock()
1✔
1890

1✔
1891
        cont.queueEndpointsNetPolUpdates(endpoints)
1✔
1892

1✔
1893
        cont.queueServiceUpdateByKey(servicekey)
1✔
1894
}
1895

1896
func (cont *AciController) endpointsDeleted(obj interface{}) {
1✔
1897
        endpoints, isEndpoints := obj.(*v1.Endpoints)
1✔
1898
        if !isEndpoints {
1✔
1899
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
1900
                if !ok {
×
1901
                        cont.log.Error("Received unexpected object: ", obj)
×
1902
                        return
×
1903
                }
×
1904
                endpoints, ok = deletedState.Obj.(*v1.Endpoints)
×
1905
                if !ok {
×
1906
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpoints object: ", deletedState.Obj)
×
1907
                        return
×
1908
                }
×
1909
        }
1910
        servicekey, err := cache.MetaNamespaceKeyFunc(endpoints)
1✔
1911
        if err != nil {
1✔
1912
                cont.log.Error("Could not create service key: ", err)
×
1913
                return
×
1914
        }
×
1915

1916
        ips := getEndpointsIps(endpoints)
1✔
1917
        cont.indexMutex.Lock()
1✔
1918
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
1919
        cont.queueIPNetPolUpdates(ips)
1✔
1920
        cont.indexMutex.Unlock()
1✔
1921

1✔
1922
        cont.queueEndpointsNetPolUpdates(endpoints)
1✔
1923

1✔
1924
        cont.queueServiceUpdateByKey(servicekey)
1✔
1925
}
1926

1927
func (cont *AciController) endpointsUpdated(oldEps, newEps interface{}) {
1✔
1928
        oldendpoints := oldEps.(*v1.Endpoints)
1✔
1929
        newendpoints := newEps.(*v1.Endpoints)
1✔
1930
        servicekey, err := cache.MetaNamespaceKeyFunc(newendpoints)
1✔
1931
        if err != nil {
1✔
1932
                cont.log.Error("Could not create service key: ", err)
×
1933
                return
×
1934
        }
×
1935

1936
        oldIps := getEndpointsIps(oldendpoints)
1✔
1937
        newIps := getEndpointsIps(newendpoints)
1✔
1938
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
1939
                cont.indexMutex.Lock()
1✔
1940
                cont.queueIPNetPolUpdates(oldIps)
1✔
1941
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
1942
                cont.queueIPNetPolUpdates(newIps)
1✔
1943
                cont.indexMutex.Unlock()
1✔
1944
        }
1✔
1945

1946
        if !reflect.DeepEqual(oldendpoints.Subsets, newendpoints.Subsets) {
2✔
1947
                cont.queueEndpointsNetPolUpdates(oldendpoints)
1✔
1948
                cont.queueEndpointsNetPolUpdates(newendpoints)
1✔
1949
        }
1✔
1950

1951
        cont.queueServiceUpdateByKey(servicekey)
1✔
1952
}
1953

1954
func (cont *AciController) serviceAdded(obj interface{}) {
1✔
1955
        service := obj.(*v1.Service)
1✔
1956
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
1957
        if err != nil {
1✔
1958
                serviceLogger(cont.log, service).
×
1959
                        Error("Could not create service key: ", err)
×
1960
                return
×
1961
        }
×
1962

1963
        ports := getServiceTargetPorts(service)
1✔
1964
        cont.indexMutex.Lock()
1✔
1965
        cont.queuePortNetPolUpdates(ports)
1✔
1966
        cont.updateTargetPortIndex(true, servicekey, nil, ports)
1✔
1967
        cont.indexMutex.Unlock()
1✔
1968

1✔
1969
        cont.queueServiceUpdateByKey(servicekey)
1✔
1970
}
1971

1972
func (cont *AciController) serviceUpdated(oldSvc, newSvc interface{}) {
1✔
1973
        oldservice := oldSvc.(*v1.Service)
1✔
1974
        newservice := newSvc.(*v1.Service)
1✔
1975
        servicekey, err := cache.MetaNamespaceKeyFunc(newservice)
1✔
1976
        if err != nil {
1✔
1977
                serviceLogger(cont.log, newservice).
×
1978
                        Error("Could not create service key: ", err)
×
1979
                return
×
1980
        }
×
1981
        oldPorts := getServiceTargetPorts(oldservice)
1✔
1982
        newPorts := getServiceTargetPorts(newservice)
1✔
1983
        if !reflect.DeepEqual(oldPorts, newPorts) {
1✔
1984
                cont.indexMutex.Lock()
×
1985
                cont.queuePortNetPolUpdates(oldPorts)
×
1986
                cont.updateTargetPortIndex(true, servicekey, oldPorts, newPorts)
×
1987
                cont.queuePortNetPolUpdates(newPorts)
×
1988
                cont.indexMutex.Unlock()
×
1989
        }
×
1990
        cont.queueServiceUpdateByKey(servicekey)
1✔
1991
}
1992

1993
func (cont *AciController) serviceDeleted(obj interface{}) {
1✔
1994
        service, isService := obj.(*v1.Service)
1✔
1995
        if !isService {
1✔
1996
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
1997
                if !ok {
×
1998
                        serviceLogger(cont.log, service).
×
1999
                                Error("Received unexpected object: ", obj)
×
2000
                        return
×
2001
                }
×
2002
                service, ok = deletedState.Obj.(*v1.Service)
×
2003
                if !ok {
×
2004
                        serviceLogger(cont.log, service).
×
2005
                                Error("DeletedFinalStateUnknown contained non-Services object: ", deletedState.Obj)
×
2006
                        return
×
2007
                }
×
2008
        }
2009
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2010
        if err != nil {
1✔
2011
                serviceLogger(cont.log, service).
×
2012
                        Error("Could not create service key: ", err)
×
2013
                return
×
2014
        }
×
2015

2016
        ports := getServiceTargetPorts(service)
1✔
2017
        cont.indexMutex.Lock()
1✔
2018
        cont.updateTargetPortIndex(true, servicekey, ports, nil)
1✔
2019
        cont.queuePortNetPolUpdates(ports)
1✔
2020
        delete(cont.snatServices, servicekey)
1✔
2021
        cont.indexMutex.Unlock()
1✔
2022

1✔
2023
        deletedServiceKey := "DELETED_" + servicekey
1✔
2024
        cont.queueServiceUpdateByKey(deletedServiceKey)
1✔
2025
}
2026

2027
func (cont *AciController) serviceFullSync() {
1✔
2028
        cache.ListAll(cont.serviceIndexer, labels.Everything(),
1✔
2029
                func(sobj interface{}) {
2✔
2030
                        cont.queueServiceUpdate(sobj.(*v1.Service))
1✔
2031
                })
1✔
2032
}
2033

2034
func (cont *AciController) getEndpointSliceIps(endpointSlice *discovery.EndpointSlice) map[string]bool {
1✔
2035
        ips := make(map[string]bool)
1✔
2036
        for _, endpoints := range endpointSlice.Endpoints {
2✔
2037
                for _, addr := range endpoints.Addresses {
2✔
2038
                        ips[addr] = true
1✔
2039
                }
1✔
2040
        }
2041
        return ips
1✔
2042
}
2043

2044
func (cont *AciController) notReadyEndpointPresent(endpointSlice *discovery.EndpointSlice) bool {
×
2045
        for _, endpoints := range endpointSlice.Endpoints {
×
2046
                if (endpoints.Conditions.Ready != nil && !*endpoints.Conditions.Ready) &&
×
2047
                        (endpoints.Conditions.Terminating == nil || !*endpoints.Conditions.Terminating) {
×
2048
                        return true
×
2049
                }
×
2050
        }
2051
        return false
×
2052
}
2053

2054
func (cont *AciController) getEndpointSliceEpIps(endpoints *discovery.Endpoint) map[string]bool {
×
2055
        ips := make(map[string]bool)
×
2056
        for _, addr := range endpoints.Addresses {
×
2057
                ips[addr] = true
×
2058
        }
×
2059
        return ips
×
2060
}
2061

2062
func (cont *AciController) processDelayedEpSlices() {
1✔
2063
        var processEps []DelayedEpSlice
1✔
2064
        cont.indexMutex.Lock()
1✔
2065
        for i := 0; i < len(cont.delayedEpSlices); i++ {
1✔
2066
                delayedepslice := cont.delayedEpSlices[i]
×
2067
                if time.Now().After(delayedepslice.DelayedTime) {
×
2068
                        var toprocess DelayedEpSlice
×
2069
                        err := util.DeepCopyObj(&delayedepslice, &toprocess)
×
2070
                        if err != nil {
×
2071
                                cont.log.Error(err)
×
2072
                                continue
×
2073
                        }
2074
                        processEps = append(processEps, toprocess)
×
2075
                        cont.delayedEpSlices = append(cont.delayedEpSlices[:i], cont.delayedEpSlices[i+1:]...)
×
2076
                }
2077
        }
2078

2079
        cont.indexMutex.Unlock()
1✔
2080
        for _, epslice := range processEps {
1✔
2081
                //ignore the epslice if newly added endpoint is not ready
×
2082
                if cont.notReadyEndpointPresent(epslice.NewEpSlice) {
×
2083
                        cont.log.Debug("Ignoring the update as the new endpoint is not ready : ", epslice.NewEpSlice)
×
2084
                } else {
×
2085
                        cont.log.Debug("Processing update of epslice : ", epslice.NewEpSlice)
×
2086
                        cont.doendpointSliceUpdated(epslice.OldEpSlice, epslice.NewEpSlice)
×
2087
                }
×
2088
        }
2089
}
2090

2091
func (cont *AciController) endpointSliceAdded(obj interface{}) {
1✔
2092
        endpointslice, ok := obj.(*discovery.EndpointSlice)
1✔
2093
        if !ok {
1✔
2094
                cont.log.Error("error processing Endpointslice object: ", obj)
×
2095
                return
×
2096
        }
×
2097
        servicekey, valid := getServiceKey(endpointslice)
1✔
2098
        if !valid {
1✔
2099
                return
×
2100
        }
×
2101
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
2102
        cont.indexMutex.Lock()
1✔
2103
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
2104
        cont.queueIPNetPolUpdates(ips)
1✔
2105
        cont.indexMutex.Unlock()
1✔
2106

1✔
2107
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
2108

1✔
2109
        cont.queueServiceUpdateByKey(servicekey)
1✔
2110
        cont.log.Info("EndPointSlice Object Added: ", servicekey)
1✔
2111
}
2112

2113
func (cont *AciController) endpointSliceDeleted(obj interface{}) {
1✔
2114
        endpointslice, isEndpointslice := obj.(*discovery.EndpointSlice)
1✔
2115
        if !isEndpointslice {
1✔
2116
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2117
                if !ok {
×
2118
                        cont.log.Error("Received unexpected object: ", obj)
×
2119
                        return
×
2120
                }
×
2121
                endpointslice, ok = deletedState.Obj.(*discovery.EndpointSlice)
×
2122
                if !ok {
×
2123
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpointslice object: ", deletedState.Obj)
×
2124
                        return
×
2125
                }
×
2126
        }
2127
        servicekey, valid := getServiceKey(endpointslice)
1✔
2128
        if !valid {
1✔
2129
                return
×
2130
        }
×
2131
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
2132
        cont.indexMutex.Lock()
1✔
2133
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
2134
        cont.queueIPNetPolUpdates(ips)
1✔
2135
        cont.indexMutex.Unlock()
1✔
2136
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
2137
        cont.queueServiceUpdateByKey(servicekey)
1✔
2138
}
2139

2140
// Checks if the given service is present in the user configured list of services
2141
// for pbr delay and if present, returns the servie specific delay if configured
2142
func (cont *AciController) svcInAddDelayList(name, ns string) (int, bool) {
×
2143
        for _, svc := range cont.config.ServiceGraphEndpointAddDelay.Services {
×
2144
                if svc.Name == name && svc.Namespace == ns {
×
2145
                        return svc.Delay, true
×
2146
                }
×
2147
        }
2148
        return 0, false
×
2149
}
2150

2151
// Check if the endpointslice update notification has any deletion of enpoint
2152
func (cont *AciController) isDeleteEndpointSlice(oldendpointslice, newendpointslice *discovery.EndpointSlice) bool {
×
2153
        del := false
×
2154

×
2155
        // if any endpoint is removed from endpontslice
×
2156
        if len(newendpointslice.Endpoints) < len(oldendpointslice.Endpoints) {
×
2157
                del = true
×
2158
        }
×
2159

2160
        if !del {
×
2161
                // if any one of the endpoint is in terminating state
×
2162
                for _, endpoint := range newendpointslice.Endpoints {
×
2163
                        if endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating {
×
2164
                                del = true
×
2165
                                break
×
2166
                        }
2167
                }
2168
        }
2169
        if !del {
×
2170
                // if any one of endpoint moved from ready state to not-ready state
×
2171
                for ix := range oldendpointslice.Endpoints {
×
2172
                        oldips := cont.getEndpointSliceEpIps(&oldendpointslice.Endpoints[ix])
×
2173
                        for newIx := range newendpointslice.Endpoints {
×
2174
                                newips := cont.getEndpointSliceEpIps(&newendpointslice.Endpoints[newIx])
×
2175
                                if reflect.DeepEqual(oldips, newips) {
×
2176
                                        if (oldendpointslice.Endpoints[ix].Conditions.Ready != nil && *oldendpointslice.Endpoints[ix].Conditions.Ready) &&
×
2177
                                                (newendpointslice.Endpoints[newIx].Conditions.Ready != nil && !*newendpointslice.Endpoints[newIx].Conditions.Ready) {
×
2178
                                                del = true
×
2179
                                        }
×
2180
                                        break
×
2181
                                }
2182
                        }
2183
                }
2184
        }
2185
        return del
×
2186
}
2187

2188
func (cont *AciController) doendpointSliceUpdatedDelay(oldendpointslice *discovery.EndpointSlice,
2189
        newendpointslice *discovery.EndpointSlice) {
×
2190
        svc, ns, valid := getServiceNameAndNs(newendpointslice)
×
2191
        if !valid {
×
2192
                return
×
2193
        }
×
2194
        svckey, valid := getServiceKey(newendpointslice)
×
2195
        if !valid {
×
2196
                return
×
2197
        }
×
2198
        delay := cont.config.ServiceGraphEndpointAddDelay.Delay
×
2199
        svcDelay, exists := cont.svcInAddDelayList(svc, ns)
×
2200
        if svcDelay > 0 {
×
2201
                delay = svcDelay
×
2202
        }
×
2203
        delayedsvc := exists && delay > 0
×
2204
        if delayedsvc {
×
2205
                cont.log.Debug("Delay of ", delay, " seconds is applicable for svc :", svc, " in ns: ", ns)
×
2206
                var delayedepslice DelayedEpSlice
×
2207
                delayedepslice.OldEpSlice = oldendpointslice
×
2208
                delayedepslice.ServiceKey = svckey
×
2209
                delayedepslice.NewEpSlice = newendpointslice
×
2210
                currentTime := time.Now()
×
2211
                delayedepslice.DelayedTime = currentTime.Add(time.Duration(delay) * time.Second)
×
2212
                cont.indexMutex.Lock()
×
2213
                cont.delayedEpSlices = append(cont.delayedEpSlices, &delayedepslice)
×
2214
                cont.indexMutex.Unlock()
×
2215
        } else {
×
2216
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
2217
        }
×
2218

2219
        if delayedsvc && cont.isDeleteEndpointSlice(oldendpointslice, newendpointslice) {
×
2220
                cont.log.Debug("Proceeding by ignoring delay as the update is due to delete of endpoint")
×
2221
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
2222
        }
×
2223
}
2224

2225
func (cont *AciController) endpointSliceUpdated(oldobj, newobj interface{}) {
1✔
2226
        oldendpointslice, ok := oldobj.(*discovery.EndpointSlice)
1✔
2227
        if !ok {
1✔
2228
                cont.log.Error("error processing Endpointslice object: ", oldobj)
×
2229
                return
×
2230
        }
×
2231
        newendpointslice, ok := newobj.(*discovery.EndpointSlice)
1✔
2232
        if !ok {
1✔
2233
                cont.log.Error("error processing Endpointslice object: ", newobj)
×
2234
                return
×
2235
        }
×
2236
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
2237
                cont.doendpointSliceUpdatedDelay(oldendpointslice, newendpointslice)
×
2238
        } else {
1✔
2239
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
1✔
2240
        }
1✔
2241
}
2242

2243
func (cont *AciController) doendpointSliceUpdated(oldendpointslice *discovery.EndpointSlice,
2244
        newendpointslice *discovery.EndpointSlice) {
1✔
2245
        servicekey, valid := getServiceKey(newendpointslice)
1✔
2246
        if !valid {
1✔
2247
                return
×
2248
        }
×
2249
        oldIps := cont.getEndpointSliceIps(oldendpointslice)
1✔
2250
        newIps := cont.getEndpointSliceIps(newendpointslice)
1✔
2251
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
2252
                cont.indexMutex.Lock()
1✔
2253
                cont.queueIPNetPolUpdates(oldIps)
1✔
2254
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
2255
                cont.queueIPNetPolUpdates(newIps)
1✔
2256
                cont.indexMutex.Unlock()
1✔
2257
        }
1✔
2258

2259
        if !reflect.DeepEqual(oldendpointslice.Endpoints, newendpointslice.Endpoints) {
2✔
2260
                cont.queueEndpointSliceNetPolUpdates(oldendpointslice)
1✔
2261
                cont.queueEndpointSliceNetPolUpdates(newendpointslice)
1✔
2262
        }
1✔
2263
        cont.log.Debug("EndPointSlice Object Update: ", servicekey)
1✔
2264
        cont.queueServiceUpdateByKey(servicekey)
1✔
2265
}
2266

2267
func (cont *AciController) queueEndpointSliceNetPolUpdates(endpointslice *discovery.EndpointSlice) {
1✔
2268
        for _, endpoint := range endpointslice.Endpoints {
2✔
2269
                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" ||
1✔
2270
                        endpoint.TargetRef.Namespace == "" || endpoint.TargetRef.Name == "" {
2✔
2271
                        continue
1✔
2272
                }
2273
                if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
1✔
2274
                        continue
×
2275
                }
2276
                podkey := endpoint.TargetRef.Namespace + "/" + endpoint.TargetRef.Name
1✔
2277
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
2278
                ps := make(map[string]bool)
1✔
2279
                for _, npkey := range npkeys {
2✔
2280
                        cont.queueNetPolUpdateByKey(npkey)
1✔
2281
                }
1✔
2282
                // Process if the  any matching namedport wildcard policy is present
2283
                // ignore np already processed policies
2284
                cont.queueMatchingNamedNp(ps, podkey)
1✔
2285
        }
2286
}
2287

2288
func getServiceKey(endPointSlice *discovery.EndpointSlice) (string, bool) {
1✔
2289
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
1✔
2290
        if !ok {
1✔
2291
                return "", false
×
2292
        }
×
2293
        return endPointSlice.ObjectMeta.Namespace + "/" + serviceName, true
1✔
2294
}
2295

2296
func getServiceNameAndNs(endPointSlice *discovery.EndpointSlice) (string, string, bool) {
×
2297
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
×
2298
        if !ok {
×
2299
                return "", "", false
×
2300
        }
×
2301
        return serviceName, endPointSlice.ObjectMeta.Namespace, true
×
2302
}
2303

2304
// can be called with index lock
2305
func (sep *serviceEndpoint) UpdateServicesForNode(nodename string) {
1✔
2306
        cont := sep.cont
1✔
2307
        cache.ListAll(cont.endpointsIndexer, labels.Everything(),
1✔
2308
                func(endpointsobj interface{}) {
2✔
2309
                        endpoints := endpointsobj.(*v1.Endpoints)
1✔
2310
                        for _, subset := range endpoints.Subsets {
2✔
2311
                                for _, addr := range subset.Addresses {
2✔
2312
                                        if addr.NodeName != nil && *addr.NodeName == nodename {
2✔
2313
                                                servicekey, err :=
1✔
2314
                                                        cache.MetaNamespaceKeyFunc(endpointsobj.(*v1.Endpoints))
1✔
2315
                                                if err != nil {
1✔
2316
                                                        cont.log.Error("Could not create endpoints key: ", err)
×
2317
                                                        return
×
2318
                                                }
×
2319
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
2320
                                                return
1✔
2321
                                        }
2322
                                }
2323
                        }
2324
                })
2325
}
2326

2327
func (seps *serviceEndpointSlice) UpdateServicesForNode(nodename string) {
1✔
2328
        // 1. List all the endpointslice and check for matching nodename
1✔
2329
        // 2. if it matches trigger the Service update and mark it visited
1✔
2330
        cont := seps.cont
1✔
2331
        visited := make(map[string]bool)
1✔
2332
        cache.ListAll(cont.endpointSliceIndexer, labels.Everything(),
1✔
2333
                func(endpointSliceobj interface{}) {
2✔
2334
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2335
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2336
                                if endpoint.NodeName != nil && *endpoint.NodeName == nodename {
2✔
2337
                                        servicekey, valid := getServiceKey(endpointSlices)
1✔
2338
                                        if !valid {
1✔
2339
                                                return
×
2340
                                        }
×
2341
                                        if _, ok := visited[servicekey]; !ok {
2✔
2342
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
2343
                                                visited[servicekey] = true
1✔
2344
                                                return
1✔
2345
                                        }
1✔
2346
                                }
2347
                        }
2348
                })
2349
}
2350
func (cont *AciController) setNodeMap(nodeMap map[string]*metadata.ServiceEndpoint, nodeName string) {
1✔
2351
        nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
2352
        if !ok {
2✔
2353
                return
1✔
2354
        }
1✔
2355
        _, ok = cont.fabricPathForNode(nodeName)
1✔
2356
        if !ok {
2✔
2357
                return
1✔
2358
        }
1✔
2359
        nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
2360
}
2361

2362
// 2 cases when epslices corresponding to given service is presnt in delayedEpSlices:
2363
//  1. endpoint not present in delayedEpSlices of the service
2364
//  2. endpoint present in delayedEpSlices of the service but in not ready state
2365
//
2366
// indexMutex lock must be acquired before calling the function
2367
func (cont *AciController) isDelayedEndpoint(endpoint *discovery.Endpoint, svckey string) bool {
×
2368
        delayed := false
×
2369
        endpointips := cont.getEndpointSliceEpIps(endpoint)
×
2370
        for _, delayedepslices := range cont.delayedEpSlices {
×
2371
                if delayedepslices.ServiceKey == svckey {
×
2372
                        var found bool
×
2373
                        epslice := delayedepslices.OldEpSlice
×
2374
                        for ix := range epslice.Endpoints {
×
2375
                                epips := cont.getEndpointSliceEpIps(&epslice.Endpoints[ix])
×
2376
                                if reflect.DeepEqual(endpointips, epips) {
×
2377
                                        // case 2
×
2378
                                        if epslice.Endpoints[ix].Conditions.Ready != nil && !*epslice.Endpoints[ix].Conditions.Ready {
×
2379
                                                delayed = true
×
2380
                                        }
×
2381
                                        found = true
×
2382
                                }
2383
                        }
2384
                        // case 1
2385
                        if !found {
×
2386
                                delayed = true
×
2387
                        }
×
2388
                }
2389
        }
2390
        return delayed
×
2391
}
2392

2393
// set nodemap only if endoint is ready and not in delayedEpSlices
2394
func (cont *AciController) setNodeMapDelay(nodeMap map[string]*metadata.ServiceEndpoint,
2395
        endpoint *discovery.Endpoint, service *v1.Service) {
×
2396
        svckey, err := cache.MetaNamespaceKeyFunc(service)
×
2397
        if err != nil {
×
2398
                cont.log.Error("Could not create service key: ", err)
×
2399
                return
×
2400
        }
×
2401
        if cont.config.NoWaitForServiceEpReadiness ||
×
2402
                (endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready) {
×
2403
                if endpoint.NodeName != nil && *endpoint.NodeName != "" {
×
2404
                        // donot setNodeMap for endpoint if:
×
2405
                        //   endpoint is newly added
×
2406
                        //   endpoint status changed from not ready to ready
×
2407
                        if !cont.isDelayedEndpoint(endpoint, svckey) {
×
2408
                                cont.setNodeMap(nodeMap, *endpoint.NodeName)
×
2409
                        }
×
2410
                }
2411
        }
2412
}
2413

2414
func (sep *serviceEndpoint) GetnodesMetadata(key string,
2415
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
2416
        cont := sep.cont
1✔
2417
        endpointsobj, exists, err := cont.endpointsIndexer.GetByKey(key)
1✔
2418
        if err != nil {
1✔
2419
                cont.log.Error("Could not lookup endpoints for " +
×
2420
                        key + ": " + err.Error())
×
2421
        }
×
2422
        if exists && endpointsobj != nil {
2✔
2423
                endpoints := endpointsobj.(*v1.Endpoints)
1✔
2424
                for _, subset := range endpoints.Subsets {
2✔
2425
                        for _, addr := range subset.Addresses {
2✔
2426
                                if addr.NodeName == nil {
2✔
2427
                                        continue
1✔
2428
                                }
2429
                                cont.setNodeMap(nodeMap, *addr.NodeName)
1✔
2430
                        }
2431
                }
2432
        }
2433
        cont.log.Info("NodeMap: ", nodeMap)
1✔
2434
}
2435

2436
func (seps *serviceEndpointSlice) GetnodesMetadata(key string,
2437
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
2438
        cont := seps.cont
1✔
2439
        // 1. Get all the Endpoint slices matching the label service-name
1✔
2440
        // 2. update the node map matching with endpoints nodes name
1✔
2441
        label := map[string]string{"kubernetes.io/service-name": service.ObjectMeta.Name}
1✔
2442
        selector := labels.SelectorFromSet(label)
1✔
2443
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2444
                func(endpointSliceobj interface{}) {
2✔
2445
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2446
                        for ix := range endpointSlices.Endpoints {
2✔
2447
                                if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
2448
                                        cont.setNodeMapDelay(nodeMap, &endpointSlices.Endpoints[ix], service)
×
2449
                                } else if cont.config.NoWaitForServiceEpReadiness ||
1✔
2450
                                        (endpointSlices.Endpoints[ix].Conditions.Ready != nil && *endpointSlices.Endpoints[ix].Conditions.Ready) {
2✔
2451
                                        if endpointSlices.Endpoints[ix].NodeName != nil && *endpointSlices.Endpoints[ix].NodeName != "" {
2✔
2452
                                                cont.setNodeMap(nodeMap, *endpointSlices.Endpoints[ix].NodeName)
1✔
2453
                                        }
1✔
2454
                                }
2455
                        }
2456
                })
2457
        cont.log.Debug("NodeMap: ", nodeMap)
1✔
2458
}
2459

2460
func (sep *serviceEndpoint) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2461
        cont := sep.cont
1✔
2462
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
2463
        if err != nil {
1✔
2464
                serviceLogger(cont.log, service).
×
2465
                        Error("Could not create service key: ", err)
×
2466
                return false
×
2467
        }
×
2468
        endpointsobj, _, err := cont.endpointsIndexer.GetByKey(key)
1✔
2469
        if err != nil {
1✔
2470
                cont.log.Error("Could not lookup endpoints for " +
×
2471
                        key + ": " + err.Error())
×
2472
                return false
×
2473
        }
×
2474
        if endpointsobj != nil {
2✔
2475
                for _, subset := range endpointsobj.(*v1.Endpoints).Subsets {
2✔
2476
                        for _, addr := range subset.Addresses {
2✔
2477
                                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" {
1✔
2478
                                        continue
×
2479
                                }
2480
                                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(),
1✔
2481
                                        addr.TargetRef.Name))
1✔
2482
                        }
2483
                }
2484
        }
2485
        return true
1✔
2486
}
2487

2488
func (seps *serviceEndpointSlice) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2489
        cont := seps.cont
1✔
2490
        label := map[string]string{"kubernetes.io/service-name": service.ObjectMeta.Name}
1✔
2491
        selector := labels.SelectorFromSet(label)
1✔
2492
        epcount := 0
1✔
2493
        childs := make(map[string]struct{})
1✔
2494
        var exists = struct{}{}
1✔
2495
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2496
                func(endpointSliceobj interface{}) {
2✔
2497
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2498
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2499
                                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" {
1✔
2500
                                        continue
×
2501
                                }
2502
                                epcount++
1✔
2503
                                childs[endpoint.TargetRef.Name] = exists
1✔
2504
                                cont.log.Debug("EndPoint added: ", endpoint.TargetRef.Name)
1✔
2505
                        }
2506
                })
2507
        for child := range childs {
2✔
2508
                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(), child))
1✔
2509
        }
1✔
2510
        return epcount != 0
1✔
2511
}
2512

2513
func getProtocolStr(proto v1.Protocol) string {
1✔
2514
        var protostring string
1✔
2515
        switch proto {
1✔
2516
        case v1.ProtocolUDP:
1✔
2517
                protostring = "udp"
1✔
2518
        case v1.ProtocolTCP:
1✔
2519
                protostring = "tcp"
1✔
2520
        case v1.ProtocolSCTP:
×
2521
                protostring = "sctp"
×
2522
        default:
×
2523
                protostring = "tcp"
×
2524
        }
2525
        return protostring
1✔
2526
}
2527

2528
func (cont *AciController) removeIpFromIngressIPList(ingressIps *[]net.IP, ip net.IP) {
×
2529
        cont.returnServiceIps([]net.IP{ip})
×
2530
        index := -1
×
2531
        for i, v := range *ingressIps {
×
2532
                if v.Equal(ip) {
×
2533
                        index = i
×
2534
                        break
×
2535
                }
2536
        }
2537
        if index == -1 {
×
2538
                return
×
2539
        }
×
2540
        *ingressIps = append((*ingressIps)[:index], (*ingressIps)[index+1:]...)
×
2541
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc