• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 8852

21 Mar 2024 03:12PM UTC coverage: 70.881% (-0.4%) from 71.311%
8852

Pull #1287

travis-pro

akhilamohanan
Do VLAN programming upfront for OpenShift on OpenStack

For OpenShift on OpenStack clusters, vnsRsCIfPathAtt is created for each
OpenStack compute hosts along with OpenShift nodes so that VLAN will
already be there when a vm is migrated from one compute host to other.

The change is done to avoid datapath issues after vm migration.
Pull Request #1287: Do VLAN programming upfront for OpenShift on OpenStack

27 of 120 new or added lines in 2 files covered. (22.5%)

175 existing lines in 4 files now uncovered.

10703 of 15100 relevant lines covered (70.88%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.04
/pkg/controller/services.go
1
// Copyright 2016 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "errors"
19
        "fmt"
20
        "net"
21
        "reflect"
22
        "sort"
23
        "strconv"
24
        "strings"
25
        "time"
26

27
        "github.com/noironetworks/aci-containers/pkg/apicapi"
28
        "github.com/noironetworks/aci-containers/pkg/metadata"
29
        "github.com/noironetworks/aci-containers/pkg/util"
30
        "github.com/sirupsen/logrus"
31
        v1 "k8s.io/api/core/v1"
32
        discovery "k8s.io/api/discovery/v1"
33
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34
        "k8s.io/apimachinery/pkg/fields"
35
        "k8s.io/apimachinery/pkg/labels"
36
        "k8s.io/client-go/kubernetes"
37
        "k8s.io/client-go/tools/cache"
38
)
39

40
// Default service contract scope value
41
const DefaultServiceContractScope = "context"
42

43
// Default service ext subnet scope - enable shared security
44
const DefaultServiceExtSubNetShared = false
45

46
func (cont *AciController) initEndpointsInformerFromClient(
47
        kubeClient kubernetes.Interface) {
×
48
        cont.initEndpointsInformerBase(
×
49
                cache.NewListWatchFromClient(
×
50
                        kubeClient.CoreV1().RESTClient(), "endpoints",
×
51
                        metav1.NamespaceAll, fields.Everything()))
×
52
}
×
53

54
func (cont *AciController) initEndpointSliceInformerFromClient(
55
        kubeClient kubernetes.Interface) {
×
56
        cont.initEndpointSliceInformerBase(
×
57
                cache.NewListWatchFromClient(
×
58
                        kubeClient.DiscoveryV1().RESTClient(), "endpointslices",
×
59
                        metav1.NamespaceAll, fields.Everything()))
×
60
}
×
61

62
func (cont *AciController) initEndpointSliceInformerBase(listWatch *cache.ListWatch) {
1✔
63
        cont.endpointSliceIndexer, cont.endpointSliceInformer = cache.NewIndexerInformer(
1✔
64
                listWatch, &discovery.EndpointSlice{}, 0,
1✔
65
                cache.ResourceEventHandlerFuncs{
1✔
66
                        AddFunc: func(obj interface{}) {
2✔
67
                                cont.endpointSliceAdded(obj)
1✔
68
                        },
1✔
69
                        UpdateFunc: func(oldobj interface{}, newobj interface{}) {
1✔
70
                                cont.endpointSliceUpdated(oldobj, newobj)
1✔
71
                        },
1✔
72
                        DeleteFunc: func(obj interface{}) {
1✔
73
                                cont.endpointSliceDeleted(obj)
1✔
74
                        },
1✔
75
                },
76
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
77
        )
78
}
79

80
func (cont *AciController) initEndpointsInformerBase(listWatch *cache.ListWatch) {
1✔
81
        cont.endpointsIndexer, cont.endpointsInformer = cache.NewIndexerInformer(
1✔
82
                listWatch, &v1.Endpoints{}, 0,
1✔
83
                cache.ResourceEventHandlerFuncs{
1✔
84
                        AddFunc: func(obj interface{}) {
2✔
85
                                cont.endpointsAdded(obj)
1✔
86
                        },
1✔
87
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
88
                                cont.endpointsUpdated(old, new)
1✔
89
                        },
1✔
90
                        DeleteFunc: func(obj interface{}) {
1✔
91
                                cont.endpointsDeleted(obj)
1✔
92
                        },
1✔
93
                },
94
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
95
        )
96
}
97

98
func (cont *AciController) initServiceInformerFromClient(
99
        kubeClient *kubernetes.Clientset) {
×
100
        cont.initServiceInformerBase(
×
101
                cache.NewListWatchFromClient(
×
102
                        kubeClient.CoreV1().RESTClient(), "services",
×
103
                        metav1.NamespaceAll, fields.Everything()))
×
104
}
×
105

106
func (cont *AciController) initServiceInformerBase(listWatch *cache.ListWatch) {
1✔
107
        cont.serviceIndexer, cont.serviceInformer = cache.NewIndexerInformer(
1✔
108
                listWatch, &v1.Service{}, 0,
1✔
109
                cache.ResourceEventHandlerFuncs{
1✔
110
                        AddFunc: func(obj interface{}) {
2✔
111
                                cont.serviceAdded(obj)
1✔
112
                        },
1✔
113
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
114
                                cont.serviceUpdated(old, new)
1✔
115
                        },
1✔
116
                        DeleteFunc: func(obj interface{}) {
×
117
                                cont.serviceDeleted(obj)
×
118
                        },
×
119
                },
120
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
121
        )
122
}
123

124
func serviceLogger(log *logrus.Logger, as *v1.Service) *logrus.Entry {
1✔
125
        return log.WithFields(logrus.Fields{
1✔
126
                "namespace": as.ObjectMeta.Namespace,
1✔
127
                "name":      as.ObjectMeta.Name,
1✔
128
                "type":      as.Spec.Type,
1✔
129
        })
1✔
130
}
1✔
131

132
func (cont *AciController) queueIPNetPolUpdates(ips map[string]bool) {
1✔
133
        for ipStr := range ips {
2✔
134
                ip := net.ParseIP(ipStr)
1✔
135
                if ip == nil {
1✔
136
                        continue
×
137
                }
138
                entries, err := cont.netPolSubnetIndex.ContainingNetworks(ip)
1✔
139
                if err != nil {
1✔
140
                        cont.log.Error("Corrupted network policy IP index, err: ", err)
×
141
                        return
×
142
                }
×
143
                for _, entry := range entries {
2✔
144
                        for npkey := range entry.(*ipIndexEntry).keys {
2✔
145
                                cont.queueNetPolUpdateByKey(npkey)
1✔
146
                        }
1✔
147
                }
148
        }
149
}
150

151
func (cont *AciController) queuePortNetPolUpdates(ports map[string]targetPort) {
1✔
152
        for portkey := range ports {
2✔
153
                entry := cont.targetPortIndex[portkey]
1✔
154
                if entry == nil {
2✔
155
                        continue
1✔
156
                }
157
                for npkey := range entry.networkPolicyKeys {
2✔
158
                        cont.queueNetPolUpdateByKey(npkey)
1✔
159
                }
1✔
160
        }
161
}
162

163
func (cont *AciController) queueNetPolForEpAddrs(addrs []v1.EndpointAddress) {
1✔
164
        for _, addr := range addrs {
2✔
165
                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" ||
1✔
166
                        addr.TargetRef.Namespace == "" || addr.TargetRef.Name == "" {
2✔
167
                        continue
1✔
168
                }
169
                podkey := addr.TargetRef.Namespace + "/" + addr.TargetRef.Name
1✔
170
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
171
                ps := make(map[string]bool)
1✔
172
                for _, npkey := range npkeys {
2✔
173
                        cont.queueNetPolUpdateByKey(npkey)
1✔
174
                        ps[npkey] = true
1✔
175
                }
1✔
176
                // Process if the  any matching namedport wildcard policy is present
177
                // ignore np already processed policies
178
                cont.queueMatchingNamedNp(ps, podkey)
1✔
179
        }
180
}
181

182
func (cont *AciController) queueMatchingNamedNp(served map[string]bool, podkey string) {
1✔
183
        cont.indexMutex.Lock()
1✔
184
        for npkey := range cont.nmPortNp {
2✔
185
                if _, ok := served[npkey]; !ok {
2✔
186
                        if cont.checkPodNmpMatchesNp(npkey, podkey) {
2✔
187
                                cont.queueNetPolUpdateByKey(npkey)
1✔
188
                        }
1✔
189
                }
190
        }
191
        cont.indexMutex.Unlock()
1✔
192
}
193
func (cont *AciController) queueEndpointsNetPolUpdates(endpoints *v1.Endpoints) {
1✔
194
        for _, subset := range endpoints.Subsets {
2✔
195
                cont.queueNetPolForEpAddrs(subset.Addresses)
1✔
196
                cont.queueNetPolForEpAddrs(subset.NotReadyAddresses)
1✔
197
        }
1✔
198
}
199

200
func (cont *AciController) returnServiceIps(ips []net.IP) {
1✔
201
        for _, ip := range ips {
2✔
202
                if ip.To4() != nil {
2✔
203
                        cont.serviceIps.DeallocateIp(ip)
1✔
204
                } else if ip.To16() != nil {
3✔
205
                        cont.serviceIps.DeallocateIp(ip)
1✔
206
                }
1✔
207
        }
208
}
209

210
func returnIps(pool *netIps, ips []net.IP) {
1✔
211
        for _, ip := range ips {
2✔
212
                if ip.To4() != nil {
2✔
213
                        pool.V4.AddIp(ip)
1✔
214
                } else if ip.To16() != nil {
3✔
215
                        pool.V6.AddIp(ip)
1✔
216
                }
1✔
217
        }
218
}
219

220
func (cont *AciController) staticMonPolName() string {
1✔
221
        return cont.aciNameForKey("monPol", cont.env.ServiceBd())
1✔
222
}
1✔
223

224
func (cont *AciController) staticMonPolDn() string {
1✔
225
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
226
                return fmt.Sprintf("uni/tn-%s/ipslaMonitoringPol-%s",
1✔
227
                        cont.config.AciVrfTenant, cont.staticMonPolName())
1✔
228
        }
1✔
229
        return ""
×
230
}
231

232
func (cont *AciController) staticServiceObjs() apicapi.ApicSlice {
1✔
233
        var serviceObjs apicapi.ApicSlice
1✔
234

1✔
235
        // Service bridge domain
1✔
236
        bdName := cont.aciNameForKey("bd", cont.env.ServiceBd())
1✔
237
        bd := apicapi.NewFvBD(cont.config.AciVrfTenant, bdName)
1✔
238
        if apicapi.ApicVersion >= "6.0(4c)" {
1✔
239
                bd.SetAttr("serviceBdRoutingDisable", "yes")
×
240
        }
×
241
        bd.SetAttr("arpFlood", "yes")
1✔
242
        bd.SetAttr("ipLearning", "no")
1✔
243
        bd.SetAttr("unkMacUcastAct", "flood")
1✔
244
        bdToOut := apicapi.NewRsBdToOut(bd.GetDn(), cont.config.AciL3Out)
1✔
245
        bd.AddChild(bdToOut)
1✔
246
        bdToVrf := apicapi.NewRsCtx(bd.GetDn(), cont.config.AciVrf)
1✔
247
        bd.AddChild(bdToVrf)
1✔
248

1✔
249
        bdn := bd.GetDn()
1✔
250
        for _, cidr := range cont.config.NodeServiceSubnets {
2✔
251
                sn := apicapi.NewFvSubnet(bdn, cidr)
1✔
252
                bd.AddChild(sn)
1✔
253
        }
1✔
254
        serviceObjs = append(serviceObjs, bd)
1✔
255

1✔
256
        // Service IP SLA monitoring policy
1✔
257
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
258
                monPol := apicapi.NewFvIPSLAMonitoringPol(cont.config.AciVrfTenant,
1✔
259
                        cont.staticMonPolName())
1✔
260
                monPol.SetAttr("slaFrequency",
1✔
261
                        strconv.Itoa(cont.config.AciServiceMonitorInterval))
1✔
262
                serviceObjs = append(serviceObjs, monPol)
1✔
263
        }
1✔
264

265
        return serviceObjs
1✔
266
}
267

268
func (cont *AciController) initStaticServiceObjs() {
1✔
269
        cont.apicConn.WriteApicObjects(cont.config.AciPrefix+"_service_static",
1✔
270
                cont.staticServiceObjs())
1✔
271
}
1✔
272

273
func (cont *AciController) updateServicesForNode(nodename string) {
1✔
274
        cont.serviceEndPoints.UpdateServicesForNode(nodename)
1✔
275
}
1✔
276

277
// must have index lock
278
func (cont *AciController) getActiveFabricPathDn(node string) string {
×
279
        var fabricPathDn string
×
280
        sz := len(cont.nodeOpflexDevice[node])
×
281
        for i := range cont.nodeOpflexDevice[node] {
×
282
                device := cont.nodeOpflexDevice[node][sz-1-i]
×
283
                if device.GetAttrStr("state") == "connected" {
×
284
                        fabricPathDn = device.GetAttrStr("fabricPathDn")
×
285
                        break
×
286
                }
287
        }
288
        return fabricPathDn
×
289
}
290

291
func (cont *AciController) getOpflexOdevCount(node string) (int, string) {
×
292
        devIdCount := make(map[string]int)
×
293
        var maxCount int
×
294
        var fabricPathDn string
×
295
        for _, device := range cont.nodeOpflexDevice[node] {
×
296
                devId := device.GetAttrStr("devId")
×
297
                count, ok := devIdCount[devId]
×
298
                if ok {
×
299
                        devIdCount[devId] = count + 1
×
300
                } else {
×
301
                        devIdCount[devId] = 1
×
302
                }
×
303
                if devIdCount[devId] >= maxCount {
×
304
                        maxCount = devIdCount[devId]
×
305
                        fabricPathDn = device.GetAttrStr("fabricPathDn")
×
306
                }
×
307
        }
308
        return maxCount, fabricPathDn
×
309
}
310

311
func deleteDevicesFromList(delDevices, devices apicapi.ApicSlice) apicapi.ApicSlice {
×
312
        var newDevices apicapi.ApicSlice
×
313
        for _, device := range devices {
×
314
                found := false
×
315
                for delDev := range delDevices {
×
316
                        if reflect.DeepEqual(delDev, device) {
×
317
                                found = true
×
318
                        }
×
319
                }
320
                if !found {
×
321
                        newDevices = append(newDevices, device)
×
322
                }
×
323
        }
324
        return newDevices
×
325
}
326

327
func (cont *AciController) getAciPodSubnet(pod string) (string, error) {
×
328
        podslice := strings.Split(pod, "-")
×
329
        if len(podslice) < 2 {
×
330
                return "", fmt.Errorf("Failed to get podid from pod")
×
331
        }
×
332
        podid := podslice[1]
×
333
        var subnet string
×
334
        args := []string{
×
335
                "query-target=self",
×
336
        }
×
337
        url := fmt.Sprintf("/api/node/mo/uni/controller/setuppol/setupp-%s.json?%s", podid, strings.Join(args, "&"))
×
338
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
339
        if err != nil {
×
340
                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
341
                return subnet, err
×
342
        }
×
343
        for _, obj := range apicresp.Imdata {
×
344
                for _, body := range obj {
×
345
                        tepPool, ok := body.Attributes["tepPool"].(string)
×
346
                        if ok {
×
347
                                subnet = tepPool
×
348
                                break
×
349
                        }
350
                }
351
        }
352
        return subnet, nil
×
353
}
354

355
func (cont *AciController) createAciPodAnnotation(node string) (aciPodAnnot, error) {
×
356
        odevCount, fabricPathDn := cont.getOpflexOdevCount(node)
×
357
        nodeAciPodAnnot := cont.nodeACIPod[node]
×
358
        isSingleOdev := nodeAciPodAnnot.isSingleOpflexOdev
×
359
        if (odevCount == 0) ||
×
360
                (odevCount == 1 && !isSingleOdev) ||
×
361
                (odevCount == 1 && isSingleOdev && !nodeAciPodAnnot.disconnectTime.IsZero()) {
×
362
                if nodeAciPodAnnot.aciPod != "none" {
×
363
                        if nodeAciPodAnnot.disconnectTime.IsZero() {
×
364
                                nodeAciPodAnnot.disconnectTime = time.Now()
×
365
                        } else {
×
366
                                currentTime := time.Now()
×
367
                                diff := currentTime.Sub(nodeAciPodAnnot.disconnectTime)
×
368
                                if diff.Seconds() > float64(cont.config.OpflexDeviceReconnectWaitTimeout) {
×
369
                                        nodeAciPodAnnot.aciPod = "none"
×
370
                                        nodeAciPodAnnot.disconnectTime = time.Time{}
×
371
                                }
×
372
                        }
373
                }
374
                if odevCount == 1 {
×
375
                        nodeAciPodAnnot.isSingleOpflexOdev = true
×
376
                }
×
377
                nodeAciPodAnnot.connectTime = time.Time{}
×
378
                return nodeAciPodAnnot, nil
×
379
        } else if (odevCount == 2) ||
×
380
                (odevCount == 1 && isSingleOdev && nodeAciPodAnnot.disconnectTime.IsZero()) {
×
381
                // when there is already a connected opflex device,
×
382
                // fabricPathDn will have latest pod iformation
×
383
                // and annotation will be in the form pod-<podid>-<subnet of pod>
×
384
                nodeAciPodAnnot.disconnectTime = time.Time{}
×
385
                nodeAciPod := nodeAciPodAnnot.aciPod
×
386
                if odevCount == 2 {
×
387
                        nodeAciPodAnnot.isSingleOpflexOdev = false
×
388
                }
×
389
                if odevCount == 1 && nodeAciPod == "none" {
×
390
                        if nodeAciPodAnnot.connectTime.IsZero() {
×
391
                                nodeAciPodAnnot.connectTime = time.Now()
×
392
                                return nodeAciPodAnnot, nil
×
393
                        } else {
×
394
                                currentTime := time.Now()
×
395
                                diff := currentTime.Sub(nodeAciPodAnnot.connectTime)
×
396
                                if diff.Seconds() < float64(10) {
×
397
                                        return nodeAciPodAnnot, nil
×
398
                                }
×
399
                        }
400
                }
401
                nodeAciPodAnnot.connectTime = time.Time{}
×
402
                pathSlice := strings.Split(fabricPathDn, "/")
×
403
                if len(pathSlice) > 1 {
×
404
                        pod := pathSlice[1]
×
405

×
406
                        // when there is difference in pod info avaliable from fabricPathDn
×
407
                        // and what we have in cache, update info in cache and change annotation on node
×
408
                        if !strings.Contains(nodeAciPod, pod) {
×
409
                                subnet, err := cont.getAciPodSubnet(pod)
×
410
                                if err != nil {
×
411
                                        cont.log.Error("Failed to get subnet of aci pod ", err.Error())
×
412
                                        return nodeAciPodAnnot, err
×
413
                                } else {
×
414
                                        nodeAciPodAnnot.aciPod = pod + "-" + subnet
×
415
                                        return nodeAciPodAnnot, nil
×
416
                                }
×
417
                        } else {
×
418
                                return nodeAciPodAnnot, nil
×
419
                        }
×
420
                } else {
×
421
                        cont.log.Error("Invalid fabricPathDn of opflexOdev of node ", node)
×
422
                        return nodeAciPodAnnot, fmt.Errorf("Invalid fabricPathDn of opflexOdev")
×
423
                }
×
424
        }
425
        return nodeAciPodAnnot, fmt.Errorf("Failed to get annotation")
×
426
}
427

428
func (cont *AciController) createNodeAciPodAnnotation(node string) (aciPodAnnot, error) {
×
429
        odevCount, fabricPathDn := cont.getOpflexOdevCount(node)
×
430
        nodeAciPodAnnot := cont.nodeACIPodAnnot[node]
×
431
        isSingleOdev := nodeAciPodAnnot.isSingleOpflexOdev
×
432
        if (odevCount == 0) ||
×
433
                (odevCount == 1 && !isSingleOdev) {
×
434
                if nodeAciPodAnnot.aciPod != "none" {
×
435
                        nodeAciPodAnnot.aciPod = "none"
×
436
                }
×
437
                if odevCount == 1 {
×
438
                        nodeAciPodAnnot.isSingleOpflexOdev = true
×
439
                }
×
440
                nodeAciPodAnnot.connectTime = time.Time{}
×
441
                return nodeAciPodAnnot, nil
×
442
        } else if (odevCount == 2) ||
×
443
                (odevCount == 1 && isSingleOdev) {
×
444
                nodeAciPod := nodeAciPodAnnot.aciPod
×
445
                if odevCount == 2 {
×
446
                        nodeAciPodAnnot.isSingleOpflexOdev = false
×
447
                }
×
448
                if odevCount == 1 && nodeAciPod == "none" {
×
449
                        if nodeAciPodAnnot.connectTime.IsZero() {
×
450
                                nodeAciPodAnnot.connectTime = time.Now()
×
451
                                return nodeAciPodAnnot, nil
×
452
                        } else {
×
453
                                currentTime := time.Now()
×
454
                                diff := currentTime.Sub(nodeAciPodAnnot.connectTime)
×
455
                                if diff.Seconds() < float64(10) {
×
456
                                        return nodeAciPodAnnot, nil
×
457
                                }
×
458
                        }
459
                }
460
                nodeAciPodAnnot.connectTime = time.Time{}
×
461
                pathSlice := strings.Split(fabricPathDn, "/")
×
462
                if len(pathSlice) > 1 {
×
463

×
464
                        nodeAciPodAnnot.aciPod = pathSlice[1]
×
465
                        return nodeAciPodAnnot, nil
×
466
                } else {
×
467
                        cont.log.Error("Invalid fabricPathDn of opflexOdev of node ", node)
×
468
                        return nodeAciPodAnnot, fmt.Errorf("Invalid fabricPathDn of opflexOdev")
×
469
                }
×
470
        }
471
        return nodeAciPodAnnot, fmt.Errorf("Failed to get annotation")
×
472
}
473

474
func (cont *AciController) checkChangeOfOpflexOdevAciPod() {
×
475
        var nodeAnnotationUpdates []string
×
476
        cont.apicConn.SyncMutex.Lock()
×
477
        syncDone := cont.apicConn.SyncDone
×
478
        cont.apicConn.SyncMutex.Unlock()
×
479

×
480
        if !syncDone {
×
481
                return
×
482
        }
×
483

484
        cont.indexMutex.Lock()
×
485
        for node := range cont.nodeACIPodAnnot {
×
486
                annot, err := cont.createNodeAciPodAnnotation(node)
×
487
                if err != nil {
×
488
                        cont.log.Error(err.Error())
×
489
                } else {
×
490
                        if annot != cont.nodeACIPodAnnot[node] {
×
491
                                cont.nodeACIPodAnnot[node] = annot
×
492
                                nodeAnnotationUpdates = append(nodeAnnotationUpdates, node)
×
493
                        }
×
494
                }
495
        }
496
        cont.indexMutex.Unlock()
×
497
        if len(nodeAnnotationUpdates) > 0 {
×
498
                for _, updatednode := range nodeAnnotationUpdates {
×
499
                        go cont.env.NodeAnnotationChanged(updatednode)
×
500
                }
×
501
        }
502
}
503

504
func (cont *AciController) checkChangeOfOdevAciPod() {
×
505
        var nodeAnnotationUpdates []string
×
506
        cont.apicConn.SyncMutex.Lock()
×
507
        syncDone := cont.apicConn.SyncDone
×
508
        cont.apicConn.SyncMutex.Unlock()
×
509

×
510
        if !syncDone {
×
511
                return
×
512
        }
×
513

514
        cont.indexMutex.Lock()
×
515
        for node := range cont.nodeACIPod {
×
516
                annot, err := cont.createAciPodAnnotation(node)
×
517
                if err != nil {
×
518
                        cont.log.Error(err.Error())
×
519
                } else {
×
520
                        if annot != cont.nodeACIPod[node] {
×
521
                                cont.nodeACIPod[node] = annot
×
522
                                nodeAnnotationUpdates = append(nodeAnnotationUpdates, node)
×
523
                        }
×
524
                }
525
        }
526
        cont.indexMutex.Unlock()
×
527
        if len(nodeAnnotationUpdates) > 0 {
×
528
                for _, updatednode := range nodeAnnotationUpdates {
×
529
                        go cont.env.NodeAnnotationChanged(updatednode)
×
530
                }
×
531
        }
532
}
533

534
func (cont *AciController) deleteOldOpflexDevices() {
1✔
535
        var nodeUpdates []string
1✔
536
        cont.indexMutex.Lock()
1✔
537
        for node, devices := range cont.nodeOpflexDevice {
1✔
538
                var delDevices apicapi.ApicSlice
×
539
                fabricPathDn := cont.getActiveFabricPathDn(node)
×
540
                if fabricPathDn != "" {
×
541
                        for _, device := range devices {
×
542
                                if device.GetAttrStr("delete") == "true" && device.GetAttrStr("fabricPathDn") != fabricPathDn {
×
543
                                        deleteTimeStr := device.GetAttrStr("deleteTime")
×
544
                                        deleteTime, err := time.Parse(time.RFC3339, deleteTimeStr)
×
545
                                        if err != nil {
×
546
                                                cont.log.Error("Failed to parse opflex device delete time: ", err)
×
547
                                                continue
×
548
                                        }
549
                                        now := time.Now()
×
550
                                        diff := now.Sub(deleteTime)
×
551
                                        if diff.Seconds() >= cont.config.OpflexDeviceDeleteTimeout {
×
552
                                                delDevices = append(delDevices, device)
×
553
                                        }
×
554
                                }
555
                        }
556
                        if len(delDevices) > 0 {
×
557
                                newDevices := deleteDevicesFromList(delDevices, devices)
×
558
                                cont.nodeOpflexDevice[node] = newDevices
×
559
                                if len(newDevices) == 0 {
×
560
                                        delete(cont.nodeOpflexDevice, node)
×
561
                                }
×
562
                                nodeUpdates = append(nodeUpdates, node)
×
563
                        }
564
                }
565
        }
566
        cont.indexMutex.Unlock()
1✔
567
        if len(nodeUpdates) > 0 {
1✔
568
                cont.postOpflexDeviceDelete(nodeUpdates)
×
569
        }
×
570
}
571

572
// must have index lock
573
func (cont *AciController) setDeleteFlagForOldDevices(node, fabricPathDn string) {
1✔
574
        for _, device := range cont.nodeOpflexDevice[node] {
2✔
575
                if device.GetAttrStr("fabricPathDn") != fabricPathDn {
1✔
576
                        t := time.Now()
×
577
                        device.SetAttr("delete", "true")
×
578
                        device.SetAttr("deleteTime", t.Format(time.RFC3339))
×
579
                }
×
580
        }
581
}
582

583
// must have index lock
584
func (cont *AciController) fabricPathForNode(name string) (string, bool) {
1✔
585
        sz := len(cont.nodeOpflexDevice[name])
1✔
586
        for i := range cont.nodeOpflexDevice[name] {
2✔
587
                device := cont.nodeOpflexDevice[name][sz-1-i]
1✔
588
                deviceState := device.GetAttrStr("state")
1✔
589
                if deviceState == "connected" {
2✔
590
                        if deviceState != device.GetAttrStr("prevState") {
2✔
591
                                cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Info("Processing fabric path for node ",
1✔
592
                                        "when connected device state is found")
1✔
593
                                device.SetAttr("prevState", deviceState)
1✔
594
                        }
1✔
595
                        fabricPathDn := device.GetAttrStr("fabricPathDn")
1✔
596
                        cont.setDeleteFlagForOldDevices(name, fabricPathDn)
1✔
597
                        return fabricPathDn, true
1✔
598
                } else {
1✔
599
                        device.SetAttr("prevState", deviceState)
1✔
600
                }
1✔
601
        }
602
        if sz > 0 {
2✔
603
                // When the opflex-device for a node changes, for example during a live migration,
1✔
604
                // we end up with both the old and the new device objects till the old object
1✔
605
                // ages out on APIC. The new object is at end of the devices list (see opflexDeviceChanged),
1✔
606
                // so we return the fabricPathDn of the last opflex-device.
1✔
607
                cont.fabricPathLogger(cont.nodeOpflexDevice[name][sz-1].GetAttrStr("hostName"),
1✔
608
                        cont.nodeOpflexDevice[name][sz-1]).Info("Processing fabricPathDn for node")
1✔
609
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("fabricPathDn"), true
1✔
610
        }
1✔
611
        return "", false
1✔
612
}
613

614
// must have index lock
615
func (cont *AciController) deviceMacForNode(name string) (string, bool) {
1✔
616
        sz := len(cont.nodeOpflexDevice[name])
1✔
617
        if sz > 0 {
2✔
618
                // When the opflex-device for a node changes, for example when the
1✔
619
                // node is recreated, we end up with both the old and the new
1✔
620
                // device objects till the old object ages out on APIC. The
1✔
621
                // new object is at end of the devices list (see
1✔
622
                // opflexDeviceChanged), so we return the MAC address of the
1✔
623
                // last opflex-device.
1✔
624
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("mac"), true
1✔
625
        }
1✔
626
        return "", false
1✔
627
}
628

629
func apicRedirectDst(rpDn string, ip string, mac string,
630
        descr string, healthGroupDn string, enablePbrTracking bool) apicapi.ApicObject {
1✔
631
        dst := apicapi.NewVnsRedirectDest(rpDn, ip, mac).SetAttr("descr", descr)
1✔
632
        if healthGroupDn != "" && enablePbrTracking {
2✔
633
                dst.AddChild(apicapi.NewVnsRsRedirectHealthGroup(dst.GetDn(),
1✔
634
                        healthGroupDn))
1✔
635
        }
1✔
636
        return dst
1✔
637
}
638

639
func (cont *AciController) apicRedirectPol(name string, tenantName string, nodes []string,
640
        nodeMap map[string]*metadata.ServiceEndpoint,
641
        monPolDn string, enablePbrTracking bool) (apicapi.ApicObject, string) {
1✔
642
        rp := apicapi.NewVnsSvcRedirectPol(tenantName, name)
1✔
643
        rp.SetAttr("thresholdDownAction", "deny")
1✔
644
        rpDn := rp.GetDn()
1✔
645
        for _, node := range nodes {
2✔
646
                cont.indexMutex.Lock()
1✔
647
                serviceEp, ok := nodeMap[node]
1✔
648
                if !ok {
1✔
649
                        continue
×
650
                }
651
                if serviceEp.Ipv4 != nil {
2✔
652
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv4.String(),
1✔
653
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
1✔
654
                }
1✔
655
                if serviceEp.Ipv6 != nil {
1✔
656
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv6.String(),
×
657
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
×
658
                }
×
659
                cont.indexMutex.Unlock()
1✔
660
        }
661
        if monPolDn != "" && enablePbrTracking {
2✔
662
                rp.AddChild(apicapi.NewVnsRsIPSLAMonitoringPol(rpDn, monPolDn))
1✔
663
        }
1✔
664
        return rp, rpDn
1✔
665
}
666

667
func apicExtNetCreate(enDn string, ingress string, ipv4 bool,
668
        cidr bool, sharedSec bool) apicapi.ApicObject {
1✔
669
        if !cidr {
2✔
670
                if ipv4 {
2✔
671
                        ingress += "/32"
1✔
672
                } else {
1✔
673
                        ingress += "/128"
×
674
                }
×
675
        }
676
        subnet := apicapi.NewL3extSubnet(enDn, ingress)
1✔
677
        if sharedSec {
2✔
678
                subnet.SetAttr("scope", "import-security,shared-security")
1✔
679
        }
1✔
680
        return subnet
1✔
681
}
682

683
func apicExtNet(name string, tenantName string, l3Out string,
684
        ingresses []string, sharedSecurity bool, snat bool) apicapi.ApicObject {
1✔
685
        en := apicapi.NewL3extInstP(tenantName, l3Out, name)
1✔
686
        enDn := en.GetDn()
1✔
687
        if snat {
2✔
688
                en.AddChild(apicapi.NewFvRsCons(enDn, name))
1✔
689
        } else {
2✔
690
                en.AddChild(apicapi.NewFvRsProv(enDn, name))
1✔
691
        }
1✔
692

693
        for _, ingress := range ingresses {
2✔
694
                ip, _, _ := net.ParseCIDR(ingress)
1✔
695
                // If ingress is a subnet
1✔
696
                if ip != nil {
2✔
697
                        if ip != nil && ip.To4() != nil {
2✔
698
                                subnet := apicExtNetCreate(enDn, ingress, true, true, sharedSecurity)
1✔
699
                                en.AddChild(subnet)
1✔
700
                        } else if ip != nil && ip.To16() != nil {
1✔
701
                                subnet := apicExtNetCreate(enDn, ingress, false, true, sharedSecurity)
×
702
                                en.AddChild(subnet)
×
703
                        }
×
704
                } else {
1✔
705
                        // If ingress is an IP address
1✔
706
                        ip := net.ParseIP(ingress)
1✔
707
                        if ip != nil && ip.To4() != nil {
2✔
708
                                subnet := apicExtNetCreate(enDn, ingress, true, false, sharedSecurity)
1✔
709
                                en.AddChild(subnet)
1✔
710
                        } else if ip != nil && ip.To16() != nil {
1✔
711
                                subnet := apicExtNetCreate(enDn, ingress, false, false, sharedSecurity)
×
712
                                en.AddChild(subnet)
×
713
                        }
×
714
                }
715
        }
716
        return en
1✔
717
}
718

719
func apicDefaultEgCons(conName string, tenantName string,
720
        appProfile string, epg string) apicapi.ApicObject {
×
721
        enDn := fmt.Sprintf("uni/tn-%s/ap-%s/epg-%s", tenantName, appProfile, epg)
×
722
        return apicapi.NewFvRsCons(enDn, conName)
×
723
}
×
724

725
func apicExtNetCons(conName string, tenantName string,
726
        l3Out string, net string) apicapi.ApicObject {
1✔
727
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
728
        return apicapi.NewFvRsCons(enDn, conName)
1✔
729
}
1✔
730

731
func apicExtNetProv(conName string, tenantName string,
732
        l3Out string, net string) apicapi.ApicObject {
1✔
733
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
734
        return apicapi.NewFvRsProv(enDn, conName)
1✔
735
}
1✔
736

737
// Helper function to check if a string item exists in a slice
738
func stringInSlice(str string, list []string) bool {
1✔
739
        for _, v := range list {
2✔
740
                if v == str {
2✔
741
                        return true
1✔
742
                }
1✔
743
        }
744
        return false
×
745
}
746

747
func validScope(scope string) bool {
1✔
748
        validValues := []string{"", "context", "tenant", "global"}
1✔
749
        return stringInSlice(scope, validValues)
1✔
750
}
1✔
751

752
func (cont *AciController) getGraphNameFromContract(name, tenantName string) (string, error) {
×
753
        var graphName string
×
754
        args := []string{
×
755
                "query-target=subtree",
×
756
        }
×
757
        url := fmt.Sprintf("/api/node/mo/uni/tn-%s/brc-%s.json?%s", tenantName, name, strings.Join(args, "&"))
×
758
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
759
        if err != nil {
×
760
                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
761
                return graphName, err
×
762
        }
×
763
        for _, obj := range apicresp.Imdata {
×
764
                for class, body := range obj {
×
765
                        if class == "vzRsSubjGraphAtt" {
×
766
                                tnVnsAbsGraphName, ok := body.Attributes["tnVnsAbsGraphName"].(string)
×
767
                                if ok {
×
768
                                        graphName = tnVnsAbsGraphName
×
769
                                }
×
770
                                break
×
771
                        }
772
                }
773
        }
774
        cont.log.Debug("graphName: ", graphName)
×
775
        return graphName, err
×
776
}
777

778
func apicContract(conName string, tenantName string,
779
        graphName string, scopeName string, isSnatPbrFltrChain bool,
780
        customSGAnnot bool) apicapi.ApicObject {
1✔
781
        con := apicapi.NewVzBrCP(tenantName, conName)
1✔
782
        if scopeName != "" && scopeName != "context" {
2✔
783
                con.SetAttr("scope", scopeName)
1✔
784
        }
1✔
785
        cs := apicapi.NewVzSubj(con.GetDn(), "loadbalancedservice")
1✔
786
        csDn := cs.GetDn()
1✔
787
        if isSnatPbrFltrChain {
2✔
788
                cs.SetAttr("revFltPorts", "no")
1✔
789
                inTerm := apicapi.NewVzInTerm(csDn)
1✔
790
                outTerm := apicapi.NewVzOutTerm(csDn)
1✔
791
                inTerm.AddChild(apicapi.NewVzRsInTermGraphAtt(inTerm.GetDn(), graphName))
1✔
792
                inTerm.AddChild(apicapi.NewVzRsFiltAtt(inTerm.GetDn(), conName+"_fromCons-toProv"))
1✔
793
                outTerm.AddChild(apicapi.NewVzRsOutTermGraphAtt(outTerm.GetDn(), graphName))
1✔
794
                outTerm.AddChild(apicapi.NewVzRsFiltAtt(outTerm.GetDn(), conName+"_fromProv-toCons"))
1✔
795
                cs.AddChild(inTerm)
1✔
796
                cs.AddChild(outTerm)
1✔
797
        } else {
2✔
798
                cs.AddChild(apicapi.NewVzRsSubjGraphAtt(csDn, graphName, customSGAnnot))
1✔
799
                cs.AddChild(apicapi.NewVzRsSubjFiltAtt(csDn, conName))
1✔
800
        }
1✔
801
        con.AddChild(cs)
1✔
802
        return con
1✔
803
}
804

805
func apicDevCtx(name string, tenantName string,
806
        graphName string, deviceName string, bdName string, rpDn string, isSnatPbrFltrChain bool) apicapi.ApicObject {
1✔
807
        cc := apicapi.NewVnsLDevCtx(tenantName, name, graphName, "loadbalancer")
1✔
808
        ccDn := cc.GetDn()
1✔
809
        graphDn := fmt.Sprintf("uni/tn-%s/lDevVip-%s", tenantName, deviceName)
1✔
810
        lifDn := fmt.Sprintf("%s/lIf-%s", graphDn, "interface")
1✔
811
        bdDn := fmt.Sprintf("uni/tn-%s/BD-%s", tenantName, bdName)
1✔
812
        cc.AddChild(apicapi.NewVnsRsLDevCtxToLDev(ccDn, graphDn))
1✔
813
        rpDnBase := rpDn
1✔
814
        for _, ctxConn := range []string{"consumer", "provider"} {
2✔
815
                lifCtx := apicapi.NewVnsLIfCtx(ccDn, ctxConn)
1✔
816
                if isSnatPbrFltrChain {
2✔
817
                        if ctxConn == "consumer" {
2✔
818
                                rpDn = rpDnBase + "_Cons"
1✔
819
                        } else {
2✔
820
                                rpDn = rpDnBase + "_Prov"
1✔
821
                        }
1✔
822
                }
823
                lifCtxDn := lifCtx.GetDn()
1✔
824
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToSvcRedirectPol(lifCtxDn, rpDn))
1✔
825
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToBD(lifCtxDn, bdDn))
1✔
826
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToLIf(lifCtxDn, lifDn))
1✔
827
                cc.AddChild(lifCtx)
1✔
828
        }
829
        return cc
1✔
830
}
831

832
func apicFilterEntry(filterDn string, count string, p_start string,
833
        p_end string, protocol string, stateful string, snat bool, outTerm bool) apicapi.ApicObject {
1✔
834
        fe := apicapi.NewVzEntry(filterDn, count)
1✔
835
        fe.SetAttr("etherT", "ip")
1✔
836
        fe.SetAttr("prot", protocol)
1✔
837
        if snat {
2✔
838
                if outTerm {
2✔
839
                        if protocol == "tcp" {
2✔
840
                                fe.SetAttr("tcpRules", "est")
1✔
841
                        }
1✔
842
                        // Reverse the ports for outTerm
843
                        fe.SetAttr("dFromPort", p_start)
1✔
844
                        fe.SetAttr("dToPort", p_end)
1✔
845
                } else {
1✔
846
                        fe.SetAttr("sFromPort", p_start)
1✔
847
                        fe.SetAttr("sToPort", p_end)
1✔
848
                }
1✔
849
        } else {
1✔
850
                fe.SetAttr("dFromPort", p_start)
1✔
851
                fe.SetAttr("dToPort", p_end)
1✔
852
        }
1✔
853
        fe.SetAttr("stateful", stateful)
1✔
854
        return fe
1✔
855
}
856
func apicFilter(name string, tenantName string,
857
        portSpec []v1.ServicePort, snat bool, snatRange portRangeSnat) apicapi.ApicObject {
1✔
858
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
859
        filterDn := filter.GetDn()
1✔
860

1✔
861
        var i int
1✔
862
        var port v1.ServicePort
1✔
863
        for i, port = range portSpec {
2✔
864
                pstr := strconv.Itoa(int(port.Port))
1✔
865
                proto := getProtocolStr(port.Protocol)
1✔
866
                fe := apicFilterEntry(filterDn, strconv.Itoa(i), pstr,
1✔
867
                        pstr, proto, "no", false, false)
1✔
868
                filter.AddChild(fe)
1✔
869
        }
1✔
870

871
        if snat {
1✔
872
                portSpec := []portRangeSnat{snatRange}
×
873
                p_start := strconv.Itoa(portSpec[0].start)
×
874
                p_end := strconv.Itoa(portSpec[0].end)
×
875

×
876
                fe1 := apicFilterEntry(filterDn, strconv.Itoa(i+1), p_start,
×
877
                        p_end, "tcp", "no", false, false)
×
878
                filter.AddChild(fe1)
×
879
                fe2 := apicFilterEntry(filterDn, strconv.Itoa(i+2), p_start,
×
880
                        p_end, "udp", "no", false, false)
×
881
                filter.AddChild(fe2)
×
882
        }
×
883
        return filter
1✔
884
}
885

886
func apicFilterSnat(name string, tenantName string,
887
        portSpec []portRangeSnat, outTerm bool) apicapi.ApicObject {
1✔
888
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
889
        filterDn := filter.GetDn()
1✔
890

1✔
891
        p_start := strconv.Itoa(portSpec[0].start)
1✔
892
        p_end := strconv.Itoa(portSpec[0].end)
1✔
893

1✔
894
        fe := apicFilterEntry(filterDn, "0", p_start,
1✔
895
                p_end, "tcp", "no", true, outTerm)
1✔
896
        filter.AddChild(fe)
1✔
897
        fe1 := apicFilterEntry(filterDn, "1", p_start,
1✔
898
                p_end, "udp", "no", true, outTerm)
1✔
899
        filter.AddChild(fe1)
1✔
900

1✔
901
        return filter
1✔
902
}
1✔
903

904
func (cont *AciController) updateServiceDeviceInstance(key string,
905
        service *v1.Service) error {
1✔
906
        cont.indexMutex.Lock()
1✔
907
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
908
        cont.serviceEndPoints.GetnodesMetadata(key, service, nodeMap)
1✔
909
        cont.indexMutex.Unlock()
1✔
910

1✔
911
        var nodes []string
1✔
912
        for node := range nodeMap {
2✔
913
                nodes = append(nodes, node)
1✔
914
        }
1✔
915
        sort.Strings(nodes)
1✔
916
        name := cont.aciNameForKey("svc", key)
1✔
917
        var conScope string
1✔
918
        scopeVal, ok := service.ObjectMeta.Annotations[metadata.ServiceContractScopeAnnotation]
1✔
919
        if ok {
2✔
920
                normScopeVal := strings.ToLower(scopeVal)
1✔
921
                if !validScope(normScopeVal) {
1✔
922
                        errString := "Invalid service contract scope value provided " + scopeVal
×
923
                        err := errors.New(errString)
×
924
                        serviceLogger(cont.log, service).Error("Could not create contract: ", err)
×
925
                        return err
×
926
                } else {
1✔
927
                        conScope = normScopeVal
1✔
928
                }
1✔
929
        } else {
1✔
930
                conScope = DefaultServiceContractScope
1✔
931
        }
1✔
932

933
        var sharedSecurity bool
1✔
934
        if conScope == "global" {
2✔
935
                sharedSecurity = true
1✔
936
        } else {
2✔
937
                sharedSecurity = DefaultServiceExtSubNetShared
1✔
938
        }
1✔
939

940
        graphName := cont.aciNameForKey("svc", "global")
1✔
941
        deviceName := cont.aciNameForKey("svc", "global")
1✔
942
        _, customSGAnnPresent := service.ObjectMeta.Annotations[metadata.ServiceGraphNameAnnotation]
1✔
943
        if customSGAnnPresent {
1✔
944
                customSG, err := cont.getGraphNameFromContract(name, cont.config.AciVrfTenant)
×
945
                if err == nil {
×
946
                        graphName = customSG
×
947
                }
×
948
        }
949
        cont.log.Debug("Using service graph ", graphName, " for service ", key)
1✔
950

1✔
951
        var serviceObjs apicapi.ApicSlice
1✔
952
        if len(nodes) > 0 {
2✔
953
                // 1. Service redirect policy
1✔
954
                // The service redirect policy contains the MAC address
1✔
955
                // and IP address of each of the service endpoints for
1✔
956
                // each node that hosts a pod for this service.  The
1✔
957
                // example below shows the case of two nodes.
1✔
958
                rp, rpDn :=
1✔
959
                        cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
1✔
960
                                nodeMap, cont.staticMonPolDn(), cont.config.AciPbrTrackingNonSnat)
1✔
961
                serviceObjs = append(serviceObjs, rp)
1✔
962

1✔
963
                // 2. Service graph contract and external network
1✔
964
                // The service graph contract must be bound to the service
1✔
965
                // graph.  This contract must be consumed by the default
1✔
966
                // layer 3 network and provided by the service layer 3
1✔
967
                // network.
1✔
968
                {
2✔
969
                        var ingresses []string
1✔
970
                        for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
971
                                ingresses = append(ingresses, ingress.IP)
1✔
972
                        }
1✔
973
                        serviceObjs = append(serviceObjs,
1✔
974
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
975
                                        cont.config.AciL3Out, ingresses, sharedSecurity, false))
1✔
976
                }
977

978
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, false, customSGAnnPresent)
1✔
979
                serviceObjs = append(serviceObjs, contract)
1✔
980
                for _, net := range cont.config.AciExtNetworks {
2✔
981
                        serviceObjs = append(serviceObjs,
1✔
982
                                apicExtNetCons(name, cont.config.AciVrfTenant,
1✔
983
                                        cont.config.AciL3Out, net))
1✔
984
                }
1✔
985

986
                if cont.config.AddExternalContractToDefaultEPG && service.Spec.Type == v1.ServiceTypeLoadBalancer {
1✔
987
                        defaultEpgTenant := cont.config.DefaultEg.PolicySpace
×
988
                        defaultEpgStringSplit := strings.Split(cont.config.DefaultEg.Name, "|")
×
989
                        var defaultEpgName, appProfile string
×
990
                        if len(defaultEpgStringSplit) > 1 {
×
991
                                appProfile = defaultEpgStringSplit[0]
×
992
                                defaultEpgName = defaultEpgStringSplit[1]
×
993
                        } else {
×
994
                                appProfile = cont.config.AppProfile
×
995
                                defaultEpgName = defaultEpgStringSplit[0]
×
996
                        }
×
997
                        serviceObjs = append(serviceObjs,
×
998
                                apicDefaultEgCons(name, defaultEpgTenant, appProfile, defaultEpgName))
×
999
                }
1000

1001
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
1002
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
1003

1✔
1004
                _, snat := cont.snatServices[key]
1✔
1005
                filter := apicFilter(name, cont.config.AciVrfTenant,
1✔
1006
                        service.Spec.Ports, snat, defaultPortRange)
1✔
1007
                serviceObjs = append(serviceObjs, filter)
1✔
1008

1✔
1009
                // 3. Device cluster context
1✔
1010
                // The logical device context binds the service contract
1✔
1011
                // to the redirect policy and the device cluster and
1✔
1012
                // bridge domain for the device cluster.
1✔
1013
                serviceObjs = append(serviceObjs,
1✔
1014
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName, deviceName,
1✔
1015
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, false))
1✔
1016
        }
1017

1018
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1019
        return nil
1✔
1020
}
1021

1022
func (cont *AciController) updateServiceDeviceInstanceSnat(key string) error {
1✔
1023
        nodeList := cont.nodeIndexer.List()
1✔
1024
        cont.indexMutex.Lock()
1✔
1025
        if len(cont.nodeServiceMetaCache) == 0 {
2✔
1026
                cont.indexMutex.Unlock()
1✔
1027
                return nil
1✔
1028
        }
1✔
1029
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
1030
        sort.Slice(nodeList, func(i, j int) bool {
2✔
1031
                nodeA := nodeList[i].(*v1.Node)
1✔
1032
                nodeB := nodeList[j].(*v1.Node)
1✔
1033
                return nodeA.ObjectMeta.Name < nodeB.ObjectMeta.Name
1✔
1034
        })
1✔
1035
        for itr, nodeItem := range nodeList {
2✔
1036
                if itr == cont.config.MaxSvcGraphNodes {
1✔
1037
                        break
×
1038
                }
1039
                node := nodeItem.(*v1.Node)
1✔
1040
                nodeName := node.ObjectMeta.Name
1✔
1041
                nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
1042
                if !ok {
2✔
1043
                        continue
1✔
1044
                }
1045
                _, ok = cont.fabricPathForNode(nodeName)
1✔
1046
                if !ok {
1✔
1047
                        continue
×
1048
                }
1049
                nodeLabels := node.ObjectMeta.Labels
1✔
1050
                excludeNode := cont.nodeLabelsInExcludeList(nodeLabels)
1✔
1051
                if excludeNode {
1✔
1052
                        continue
×
1053
                }
1054
                nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
1055
        }
1056
        cont.indexMutex.Unlock()
1✔
1057

1✔
1058
        var nodes []string
1✔
1059
        for node := range nodeMap {
2✔
1060
                nodes = append(nodes, node)
1✔
1061
        }
1✔
1062
        sort.Strings(nodes)
1✔
1063
        name := cont.aciNameForKey("snat", key)
1✔
1064
        var conScope = cont.config.SnatSvcContractScope
1✔
1065
        sharedSecurity := true
1✔
1066

1✔
1067
        graphName := cont.aciNameForKey("svc", "global")
1✔
1068
        var serviceObjs apicapi.ApicSlice
1✔
1069
        if len(nodes) > 0 {
2✔
1070
                // 1. Service redirect policy
1✔
1071
                // The service redirect policy contains the MAC address
1✔
1072
                // and IP address of each of the service endpoints for
1✔
1073
                // each node that hosts a pod for this service.
1✔
1074
                // For SNAT with the introduction of filter-chain usage, to work-around
1✔
1075
                // an APIC limitation, creating two PBR policies with same nodes.
1✔
1076
                var rpDn string
1✔
1077
                var rp apicapi.ApicObject
1✔
1078
                if cont.apicConn.SnatPbrFltrChain {
2✔
1079
                        rpCons, rpDnCons :=
1✔
1080
                                cont.apicRedirectPol(name+"_Cons", cont.config.AciVrfTenant, nodes,
1✔
1081
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
1082
                        serviceObjs = append(serviceObjs, rpCons)
1✔
1083
                        rpProv, _ :=
1✔
1084
                                cont.apicRedirectPol(name+"_Prov", cont.config.AciVrfTenant, nodes,
1✔
1085
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
1086
                        serviceObjs = append(serviceObjs, rpProv)
1✔
1087
                        rpDn = strings.TrimSuffix(rpDnCons, "_Cons")
1✔
1088
                } else {
1✔
1089
                        rp, rpDn =
×
1090
                                cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
×
1091
                                        nodeMap, cont.staticMonPolDn(), true)
×
1092
                        serviceObjs = append(serviceObjs, rp)
×
1093
                }
×
1094
                // 2. Service graph contract and external network
1095
                // The service graph contract must be bound to the
1096
                // service
1097
                // graph.  This contract must be consumed by the default
1098
                // layer 3 network and provided by the service layer 3
1099
                // network.
1100
                {
1✔
1101
                        var ingresses []string
1✔
1102
                        for _, policy := range cont.snatPolicyCache {
2✔
1103
                                ingresses = append(ingresses, policy.SnatIp...)
1✔
1104
                        }
1✔
1105
                        serviceObjs = append(serviceObjs,
1✔
1106
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
1107
                                        cont.config.AciL3Out, ingresses, sharedSecurity, true))
1✔
1108
                }
1109

1110
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, cont.apicConn.SnatPbrFltrChain, false)
1✔
1111
                serviceObjs = append(serviceObjs, contract)
1✔
1112

1✔
1113
                for _, net := range cont.config.AciExtNetworks {
2✔
1114
                        serviceObjs = append(serviceObjs,
1✔
1115
                                apicExtNetProv(name, cont.config.AciVrfTenant,
1✔
1116
                                        cont.config.AciL3Out, net))
1✔
1117
                }
1✔
1118

1119
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
1120
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
1121
                portSpec := []portRangeSnat{defaultPortRange}
1✔
1122
                if cont.apicConn.SnatPbrFltrChain {
2✔
1123
                        filterIn := apicFilterSnat(name+"_fromCons-toProv", cont.config.AciVrfTenant, portSpec, false)
1✔
1124
                        serviceObjs = append(serviceObjs, filterIn)
1✔
1125
                        filterOut := apicFilterSnat(name+"_fromProv-toCons", cont.config.AciVrfTenant, portSpec, true)
1✔
1126
                        serviceObjs = append(serviceObjs, filterOut)
1✔
1127
                } else {
1✔
1128
                        filter := apicFilterSnat(name, cont.config.AciVrfTenant, portSpec, false)
×
1129
                        serviceObjs = append(serviceObjs, filter)
×
1130
                }
×
1131
                // 3. Device cluster context
1132
                // The logical device context binds the service contract
1133
                // to the redirect policy and the device cluster and
1134
                // bridge domain for the device cluster.
1135
                serviceObjs = append(serviceObjs,
1✔
1136
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName, graphName,
1✔
1137
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, cont.apicConn.SnatPbrFltrChain))
1✔
1138
        }
1139
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1140
        return nil
1✔
1141
}
1142

1143
func (cont *AciController) nodeLabelsInExcludeList(Labels map[string]string) bool {
1✔
1144
        nodeSnatRedirectExclude := cont.config.NodeSnatRedirectExclude
1✔
1145

1✔
1146
        for _, nodeGroup := range nodeSnatRedirectExclude {
1✔
1147
                if len(nodeGroup.Labels) == 0 {
×
1148
                        continue
×
1149
                }
1150
                matchFound := true
×
1151
                for _, label := range nodeGroup.Labels {
×
1152
                        if _, ok := Labels["node-role.kubernetes.io/"+label]; !ok {
×
1153
                                matchFound = false
×
1154
                                break
×
1155
                        }
1156
                }
1157
                if matchFound {
×
1158
                        return true
×
1159
                }
×
1160
        }
1161
        return false
1✔
1162
}
1163

1164
func (cont *AciController) queueServiceUpdateByKey(key string) {
1✔
1165
        cont.serviceQueue.Add(key)
1✔
1166
}
1✔
1167

1168
func (cont *AciController) queueServiceUpdate(service *v1.Service) {
1✔
1169
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
1170
        if err != nil {
1✔
1171
                serviceLogger(cont.log, service).
×
1172
                        Error("Could not create service key: ", err)
×
1173
                return
×
1174
        }
×
1175
        cont.serviceQueue.Add(key)
1✔
1176
}
1177

1178
func apicDeviceCluster(name string, vrfTenant string,
1179
        physDom string, encap string,
1180
        nodes []string, nodeMap map[string]string) (apicapi.ApicObject, string) {
1✔
1181
        dc := apicapi.NewVnsLDevVip(vrfTenant, name)
1✔
1182
        dc.SetAttr("managed", "no")
1✔
1183
        dcDn := dc.GetDn()
1✔
1184
        dc.AddChild(apicapi.NewVnsRsALDevToPhysDomP(dcDn,
1✔
1185
                fmt.Sprintf("uni/phys-%s", physDom)))
1✔
1186
        lif := apicapi.NewVnsLIf(dcDn, "interface")
1✔
1187
        lif.SetAttr("encap", encap)
1✔
1188
        lifDn := lif.GetDn()
1✔
1189

1✔
1190
        for _, node := range nodes {
2✔
1191
                path, ok := nodeMap[node]
1✔
1192
                if !ok {
1✔
1193
                        continue
×
1194
                }
1195

1196
                cdev := apicapi.NewVnsCDev(dcDn, node)
1✔
1197
                cif := apicapi.NewVnsCif(cdev.GetDn(), "interface")
1✔
1198
                cif.AddChild(apicapi.NewVnsRsCIfPathAtt(cif.GetDn(), path))
1✔
1199
                cdev.AddChild(cif)
1✔
1200
                lif.AddChild(apicapi.NewVnsRsCIfAttN(lifDn, cif.GetDn()))
1✔
1201
                dc.AddChild(cdev)
1✔
1202
        }
1203

1204
        dc.AddChild(lif)
1✔
1205

1✔
1206
        return dc, dcDn
1✔
1207
}
1208

1209
func apicServiceGraph(name string, tenantName string,
1210
        dcDn string) apicapi.ApicObject {
1✔
1211
        sg := apicapi.NewVnsAbsGraph(tenantName, name)
1✔
1212
        sgDn := sg.GetDn()
1✔
1213
        var provDn string
1✔
1214
        var consDn string
1✔
1215
        var cTermDn string
1✔
1216
        var pTermDn string
1✔
1217
        {
2✔
1218
                an := apicapi.NewVnsAbsNode(sgDn, "loadbalancer")
1✔
1219
                an.SetAttr("managed", "no")
1✔
1220
                an.SetAttr("routingMode", "Redirect")
1✔
1221
                anDn := an.GetDn()
1✔
1222
                cons := apicapi.NewVnsAbsFuncConn(anDn, "consumer")
1✔
1223
                consDn = cons.GetDn()
1✔
1224
                an.AddChild(cons)
1✔
1225
                prov := apicapi.NewVnsAbsFuncConn(anDn, "provider")
1✔
1226
                provDn = prov.GetDn()
1✔
1227
                an.AddChild(prov)
1✔
1228
                an.AddChild(apicapi.NewVnsRsNodeToLDev(anDn, dcDn))
1✔
1229
                sg.AddChild(an)
1✔
1230
        }
1✔
1231
        {
1✔
1232
                tnc := apicapi.NewVnsAbsTermNodeCon(sgDn, "T1")
1✔
1233
                tncDn := tnc.GetDn()
1✔
1234
                cTerm := apicapi.NewVnsAbsTermConn(tncDn)
1✔
1235
                cTermDn = cTerm.GetDn()
1✔
1236
                tnc.AddChild(cTerm)
1✔
1237
                tnc.AddChild(apicapi.NewVnsInTerm(tncDn))
1✔
1238
                tnc.AddChild(apicapi.NewVnsOutTerm(tncDn))
1✔
1239
                sg.AddChild(tnc)
1✔
1240
        }
1✔
1241
        {
1✔
1242
                tnp := apicapi.NewVnsAbsTermNodeProv(sgDn, "T2")
1✔
1243
                tnpDn := tnp.GetDn()
1✔
1244
                pTerm := apicapi.NewVnsAbsTermConn(tnpDn)
1✔
1245
                pTermDn = pTerm.GetDn()
1✔
1246
                tnp.AddChild(pTerm)
1✔
1247
                tnp.AddChild(apicapi.NewVnsInTerm(tnpDn))
1✔
1248
                tnp.AddChild(apicapi.NewVnsOutTerm(tnpDn))
1✔
1249
                sg.AddChild(tnp)
1✔
1250
        }
1✔
1251
        {
1✔
1252
                acc := apicapi.NewVnsAbsConnection(sgDn, "C1")
1✔
1253
                acc.SetAttr("connDir", "provider")
1✔
1254
                accDn := acc.GetDn()
1✔
1255
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, consDn))
1✔
1256
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, cTermDn))
1✔
1257
                sg.AddChild(acc)
1✔
1258
        }
1✔
1259
        {
1✔
1260
                acp := apicapi.NewVnsAbsConnection(sgDn, "C2")
1✔
1261
                acp.SetAttr("connDir", "provider")
1✔
1262
                acpDn := acp.GetDn()
1✔
1263
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, provDn))
1✔
1264
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, pTermDn))
1✔
1265
                sg.AddChild(acp)
1✔
1266
        }
1✔
1267
        return sg
1✔
1268
}
1269
func (cont *AciController) updateDeviceCluster() {
1✔
1270
        nodeMap := make(map[string]string)
1✔
1271
        cont.indexMutex.Lock()
1✔
1272
        for node := range cont.nodeOpflexDevice {
2✔
1273
                cont.log.Debug("Processing node in nodeOpflexDevice cache : ", node)
1✔
1274
                fabricPath, ok := cont.fabricPathForNode(node)
1✔
1275
                if !ok {
2✔
1276
                        continue
1✔
1277
                }
1278
                nodeMap[node] = fabricPath
1✔
1279
        }
1280

1281
        // For clusters other than OpenShift On OpenStack,
1282
        // openStackFabricPathDnMap will be empty
1283
        for host, opflexOdevInfo := range cont.openStackFabricPathDnMap {
1✔
NEW
1284
                nodeMap[host] = opflexOdevInfo.fabricPathDn
×
NEW
1285
        }
×
1286
        cont.indexMutex.Unlock()
1✔
1287

1✔
1288
        var nodes []string
1✔
1289
        for node := range nodeMap {
2✔
1290
                nodes = append(nodes, node)
1✔
1291
        }
1✔
1292
        sort.Strings(nodes)
1✔
1293

1✔
1294
        name := cont.aciNameForKey("svc", "global")
1✔
1295
        var serviceObjs apicapi.ApicSlice
1✔
1296

1✔
1297
        // 1. Device cluster:
1✔
1298
        // The device cluster is a set of physical paths that need to be
1✔
1299
        // created for each node in the cluster, that correspond to the
1✔
1300
        // service interface for each node.
1✔
1301
        dc, dcDn := apicDeviceCluster(name, cont.config.AciVrfTenant,
1✔
1302
                cont.config.AciServicePhysDom, cont.config.AciServiceEncap,
1✔
1303
                nodes, nodeMap)
1✔
1304
        serviceObjs = append(serviceObjs, dc)
1✔
1305

1✔
1306
        // 2. Service graph template
1✔
1307
        // The service graph controls how the traffic will be redirected.
1✔
1308
        // A service graph must be created for each device cluster.
1✔
1309
        serviceObjs = append(serviceObjs,
1✔
1310
                apicServiceGraph(name, cont.config.AciVrfTenant, dcDn))
1✔
1311

1✔
1312
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1313
}
1314

1315
func (cont *AciController) fabricPathLogger(node string,
1316
        obj apicapi.ApicObject) *logrus.Entry {
1✔
1317
        return cont.log.WithFields(logrus.Fields{
1✔
1318
                "fabricPath": obj.GetAttr("fabricPathDn"),
1✔
1319
                "mac":        obj.GetAttr("mac"),
1✔
1320
                "node":       node,
1✔
1321
                "obj":        obj,
1✔
1322
        })
1✔
1323
}
1✔
1324

NEW
1325
func (cont *AciController) setOpenStackSystemId() string {
×
NEW
1326

×
NEW
1327
        // 1) get opflexIDEp with containerName == <node name of any one of the openshift nodes>
×
NEW
1328
        // 2) extract OpenStack system id from compHvDn attribute
×
NEW
1329
        //    comp/prov-OpenStack/ctrlr-[k8s-scale]-k8s-scale/hv-overcloud-novacompute-0 - sample compHvDn,
×
NEW
1330
        //    where k8s-scale is the system id
×
NEW
1331

×
NEW
1332
        var systemId string
×
NEW
1333
        nodeList := cont.nodeIndexer.List()
×
NEW
1334
        if len(nodeList) < 1 {
×
NEW
1335
                return systemId
×
NEW
1336
        }
×
NEW
1337
        node := nodeList[0].(*v1.Node)
×
NEW
1338
        nodeName := node.ObjectMeta.Name
×
NEW
1339
        opflexIDEpFilter := fmt.Sprintf("query-target-filter=and(eq(opflexIDEp.containerName,\"%s\"))", nodeName)
×
NEW
1340
        opflexIDEpArgs := []string{
×
NEW
1341
                opflexIDEpFilter,
×
NEW
1342
        }
×
NEW
1343
        url := fmt.Sprintf("/api/node/class/opflexIDEp.json?%s", strings.Join(opflexIDEpArgs, "&"))
×
NEW
1344
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
NEW
1345
        if err != nil {
×
NEW
1346
                cont.log.Error("Failed to get APIC response, err: ", err.Error())
×
NEW
1347
                return systemId
×
NEW
1348
        }
×
NEW
1349
        for _, obj := range apicresp.Imdata {
×
NEW
1350
                for _, body := range obj {
×
NEW
1351
                        compHvDn, ok := body.Attributes["compHvDn"].(string)
×
NEW
1352
                        if ok {
×
NEW
1353
                                systemId = compHvDn[strings.IndexByte(compHvDn, '[')+1 : strings.IndexByte(compHvDn, ']')]
×
NEW
1354
                                break
×
1355
                        }
1356
                }
1357
        }
NEW
1358
        cont.indexMutex.Lock()
×
NEW
1359
        cont.openStackSystemId = systemId
×
NEW
1360
        cont.log.Info("Setting OpenStack system id : ", cont.openStackSystemId)
×
NEW
1361
        cont.indexMutex.Unlock()
×
NEW
1362
        return systemId
×
1363
}
1364

1365
// Returns true when a new OpenStack opflexODev is added
NEW
1366
func (cont *AciController) openStackOpflexOdevUpdate(obj apicapi.ApicObject) bool {
×
NEW
1367

×
NEW
1368
        // If opflexOdev compHvDn contains comp/prov-OpenShift/ctrlr-[<systemid>]-<systemid>,
×
NEW
1369
        // it means that it is an OpenStack OpflexOdev which belongs to OpenStack with system id <systemid>
×
NEW
1370

×
NEW
1371
        var deviceClusterUpdate bool
×
NEW
1372
        compHvDn := obj.GetAttrStr("compHvDn")
×
NEW
1373
        if strings.Contains(compHvDn, "prov-OpenStack") {
×
NEW
1374
                cont.indexMutex.Lock()
×
NEW
1375
                systemId := cont.openStackSystemId
×
NEW
1376
                cont.indexMutex.Unlock()
×
NEW
1377
                if systemId == "" {
×
NEW
1378
                        systemId = cont.setOpenStackSystemId()
×
NEW
1379
                }
×
NEW
1380
                if systemId == "" {
×
NEW
1381
                        cont.log.Error("Failed  to get OpenStack system id")
×
NEW
1382
                        return deviceClusterUpdate
×
NEW
1383
                }
×
NEW
1384
                prefix := fmt.Sprintf("comp/prov-OpenStack/ctrlr-[%s]-%s", systemId, systemId)
×
NEW
1385
                if strings.Contains(compHvDn, prefix) {
×
NEW
1386
                        cont.log.Info("Received notification for OpenStack opflexODev update, hostName: ",
×
NEW
1387
                                obj.GetAttrStr("hostName"), " dn: ", obj.GetAttrStr("dn"))
×
NEW
1388
                        cont.indexMutex.Lock()
×
NEW
1389
                        opflexOdevInfo, ok := cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")]
×
NEW
1390
                        if ok {
×
NEW
1391
                                opflexOdevInfo.opflexODevDn[obj.GetAttrStr("dn")] = struct{}{}
×
NEW
1392
                                cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")] = opflexOdevInfo
×
NEW
1393
                        } else {
×
NEW
1394
                                var openstackopflexodevinfo openstackOpflexOdevInfo
×
NEW
1395
                                opflexODevDn := make(map[string]struct{})
×
NEW
1396
                                opflexODevDn[obj.GetAttrStr("dn")] = struct{}{}
×
NEW
1397
                                openstackopflexodevinfo.fabricPathDn = obj.GetAttrStr("fabricPathDn")
×
NEW
1398
                                openstackopflexodevinfo.opflexODevDn = opflexODevDn
×
NEW
1399
                                cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")] = openstackopflexodevinfo
×
NEW
1400
                                deviceClusterUpdate = true
×
NEW
1401
                        }
×
NEW
1402
                        cont.indexMutex.Unlock()
×
1403
                }
1404
        }
NEW
1405
        return deviceClusterUpdate
×
1406
}
1407

1408
func (cont *AciController) opflexDeviceChanged(obj apicapi.ApicObject) {
1✔
1409
        devType := obj.GetAttrStr("devType")
1✔
1410
        domName := obj.GetAttrStr("domName")
1✔
1411
        ctrlrName := obj.GetAttrStr("ctrlrName")
1✔
1412

1✔
1413
        if strings.Contains(cont.config.Flavor, "openstack") {
1✔
NEW
1414
                if cont.openStackOpflexOdevUpdate(obj) {
×
NEW
1415
                        cont.log.Info("OpenStack opflexODev for ", obj.GetAttrStr("hostName"), " is added")
×
NEW
1416
                        cont.updateDeviceCluster()
×
NEW
1417
                }
×
1418
        }
1419
        if (devType == cont.env.OpFlexDeviceType()) && (domName == cont.config.AciVmmDomain) && (ctrlrName == cont.config.AciVmmController) {
2✔
1420
                cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Processing opflex device update")
1✔
1421
                if obj.GetAttrStr("state") == "disconnected" {
2✔
1422
                        cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Opflex device disconnected")
1✔
1423
                        cont.indexMutex.Lock()
1✔
1424
                        for node, devices := range cont.nodeOpflexDevice {
1✔
1425
                                if node == obj.GetAttrStr("hostName") {
×
1426
                                        for _, device := range devices {
×
1427
                                                if device.GetDn() == obj.GetDn() {
×
1428
                                                        device.SetAttr("state", "disconnected")
×
1429
                                                        cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Debug("Opflex device cache updated for disconnected node")
×
1430
                                                }
×
1431
                                        }
1432
                                        cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", devices)
×
1433
                                        break
×
1434
                                }
1435
                        }
1436
                        cont.indexMutex.Unlock()
1✔
1437
                        cont.updateDeviceCluster()
1✔
1438
                        return
1✔
1439
                }
1440
                var nodeUpdates []string
1✔
1441

1✔
1442
                cont.indexMutex.Lock()
1✔
1443
                nodefound := false
1✔
1444
                for node, devices := range cont.nodeOpflexDevice {
2✔
1445
                        found := false
1✔
1446

1✔
1447
                        if node == obj.GetAttrStr("hostName") {
2✔
1448
                                nodefound = true
1✔
1449
                        }
1✔
1450

1451
                        for i, device := range devices {
2✔
1452
                                if device.GetDn() != obj.GetDn() {
2✔
1453
                                        continue
1✔
1454
                                }
1455
                                found = true
1✔
1456

1✔
1457
                                if obj.GetAttrStr("hostName") != node {
2✔
1458
                                        cont.fabricPathLogger(node, device).
1✔
1459
                                                Debug("Moving opflex device from node")
1✔
1460

1✔
1461
                                        devices = append(devices[:i], devices[i+1:]...)
1✔
1462
                                        cont.nodeOpflexDevice[node] = devices
1✔
1463
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1464
                                        break
1✔
1465
                                } else if (device.GetAttrStr("mac") != obj.GetAttrStr("mac")) ||
1✔
1466
                                        (device.GetAttrStr("fabricPathDn") != obj.GetAttrStr("fabricPathDn")) ||
1✔
1467
                                        (device.GetAttrStr("state") != obj.GetAttrStr("state")) {
2✔
1468
                                        cont.fabricPathLogger(node, obj).
1✔
1469
                                                Debug("Updating opflex device")
1✔
1470

1✔
1471
                                        devices = append(append(devices[:i], devices[i+1:]...), obj)
1✔
1472
                                        cont.nodeOpflexDevice[node] = devices
1✔
1473
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1474
                                        break
1✔
1475
                                }
1476
                        }
1477
                        if !found && obj.GetAttrStr("hostName") == node {
2✔
1478
                                cont.fabricPathLogger(node, obj).
1✔
1479
                                        Debug("Appending opflex device")
1✔
1480

1✔
1481
                                devices = append(devices, obj)
1✔
1482
                                cont.nodeOpflexDevice[node] = devices
1✔
1483
                                nodeUpdates = append(nodeUpdates, node)
1✔
1484
                        }
1✔
1485
                }
1486
                if !nodefound {
2✔
1487
                        node := obj.GetAttrStr("hostName")
1✔
1488
                        cont.fabricPathLogger(node, obj).Debug("Adding opflex device")
1✔
1489
                        cont.nodeOpflexDevice[node] = apicapi.ApicSlice{obj}
1✔
1490
                        nodeUpdates = append(nodeUpdates, node)
1✔
1491
                }
1✔
1492
                cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", cont.nodeOpflexDevice[obj.GetAttrStr("hostName")])
1✔
1493
                cont.indexMutex.Unlock()
1✔
1494

1✔
1495
                for _, node := range nodeUpdates {
2✔
1496
                        cont.env.NodeServiceChanged(node)
1✔
1497
                        cont.erspanSyncOpflexDev()
1✔
1498
                }
1✔
1499
                cont.updateDeviceCluster()
1✔
1500
        }
1501
}
1502

1503
func (cont *AciController) postOpflexDeviceDelete(nodes []string) {
1✔
1504
        cont.updateDeviceCluster()
1✔
1505
        for _, node := range nodes {
2✔
1506
                cont.env.NodeServiceChanged(node)
1✔
1507
                cont.erspanSyncOpflexDev()
1✔
1508
        }
1✔
1509
}
1510

1511
func (cont *AciController) opflexDeviceDeleted(dn string) {
1✔
1512
        var nodeUpdates []string
1✔
1513
        var dnFound bool //to check if the dn belongs to this cluster
1✔
1514
        cont.indexMutex.Lock()
1✔
1515
        for node, devices := range cont.nodeOpflexDevice {
2✔
1516
                for i, device := range devices {
2✔
1517
                        if device.GetDn() != dn {
2✔
1518
                                continue
1✔
1519
                        }
1520
                        dnFound = true
1✔
1521
                        cont.fabricPathLogger(node, device).
1✔
1522
                                Debug("Deleting opflex device path")
1✔
1523
                        devices = append(devices[:i], devices[i+1:]...)
1✔
1524
                        cont.nodeOpflexDevice[node] = devices
1✔
1525
                        nodeUpdates = append(nodeUpdates, node)
1✔
1526
                        break
1✔
1527
                }
1528
                if len(devices) == 0 {
2✔
1529
                        delete(cont.nodeOpflexDevice, node)
1✔
1530
                }
1✔
1531
        }
1532

1533
        // For clusters other than OpenShift On OpenStack,
1534
        // openStackFabricPathDnMap will be empty
1535
        for host, opflexOdevInfo := range cont.openStackFabricPathDnMap {
1✔
NEW
1536
                if _, ok := opflexOdevInfo.opflexODevDn[dn]; ok {
×
NEW
1537
                        cont.log.Info("Received OpenStack opflexODev delete notification for ", dn)
×
NEW
1538
                        delete(opflexOdevInfo.opflexODevDn, dn)
×
NEW
1539
                        if len(opflexOdevInfo.opflexODevDn) < 1 {
×
NEW
1540
                                delete(cont.openStackFabricPathDnMap, host)
×
NEW
1541
                                cont.log.Info("OpenStack opflexODev of host ", host, " is deleted from cache")
×
NEW
1542
                                dnFound = true
×
NEW
1543
                        } else {
×
NEW
1544
                                cont.openStackFabricPathDnMap[host] = opflexOdevInfo
×
NEW
1545
                        }
×
NEW
1546
                        break
×
1547
                }
1548
        }
1549
        cont.indexMutex.Unlock()
1✔
1550

1✔
1551
        if dnFound {
2✔
1552
                cont.postOpflexDeviceDelete(nodeUpdates)
1✔
1553
        }
1✔
1554
}
1555

1556
func (cont *AciController) writeApicSvc(key string, service *v1.Service) {
1✔
1557
        if cont.config.ChainedMode {
1✔
1558
                return
×
1559
        }
×
1560
        aobj := apicapi.NewVmmInjectedSvc(cont.vmmDomainProvider(),
1✔
1561
                cont.config.AciVmmDomain, cont.config.AciVmmController,
1✔
1562
                service.Namespace, service.Name)
1✔
1563
        aobjDn := aobj.GetDn()
1✔
1564
        aobj.SetAttr("guid", string(service.UID))
1✔
1565

1✔
1566
        svcns := service.ObjectMeta.Namespace
1✔
1567
        _, exists, err := cont.namespaceIndexer.GetByKey(svcns)
1✔
1568
        if err != nil {
1✔
1569
                cont.log.Error("Failed to lookup ns : ", svcns, " ", err)
×
1570
                return
×
1571
        }
×
1572
        if !exists {
2✔
1573
                cont.log.Debug("Namespace of service ", service.ObjectMeta.Name, ": ", svcns, " doesn't exist, hence not sending an update to the APIC")
1✔
1574
                return
1✔
1575
        }
1✔
1576

1577
        if !cont.serviceEndPoints.SetServiceApicObject(aobj, service) {
2✔
1578
                return
1✔
1579
        }
1✔
1580
        var setApicSvcDnsName bool
1✔
1581
        if len(cont.config.ApicHosts) != 0 && apicapi.ApicVersion >= "5.1" {
1✔
1582
                setApicSvcDnsName = true
×
1583
        }
×
1584
        // APIC model only allows one of these
1585
        for _, ingress := range service.Status.LoadBalancer.Ingress {
1✔
1586
                if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
1587
                        aobj.SetAttr("lbIp", ingress.IP)
×
1588
                } else if ingress.Hostname != "" {
×
1589
                        ipList, err := net.LookupHost(ingress.Hostname)
×
1590
                        if err == nil && len(ipList) > 0 {
×
1591
                                aobj.SetAttr("lbIp", ipList[0])
×
1592
                        } else {
×
1593
                                cont.log.Errorf("Lookup: err: %v, ipList: %+v", err, ipList)
×
1594
                        }
×
1595
                }
1596
                break
×
1597
        }
1598
        if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != "None" {
2✔
1599
                aobj.SetAttr("clusterIp", service.Spec.ClusterIP)
1✔
1600
        }
1✔
1601

1602
        var t string
1✔
1603
        switch service.Spec.Type {
1✔
1604
        case v1.ServiceTypeClusterIP:
×
1605
                t = "clusterIp"
×
1606
        case v1.ServiceTypeNodePort:
×
1607
                t = "nodePort"
×
1608
        case v1.ServiceTypeLoadBalancer:
1✔
1609
                t = "loadBalancer"
1✔
1610
        case v1.ServiceTypeExternalName:
×
1611
                t = "externalName"
×
1612
        }
1613
        if t != "" {
2✔
1614
                aobj.SetAttr("type", t)
1✔
1615
        }
1✔
1616

1617
        if setApicSvcDnsName || cont.config.Flavor == "k8s-overlay" {
1✔
1618
                dnsName := fmt.Sprintf("%s.%s.svc.cluster.local", service.Name, service.Namespace)
×
1619

×
1620
                for _, ingress := range service.Status.LoadBalancer.Ingress {
×
1621
                        if ingress.Hostname != "" {
×
1622
                                aobj.SetAttr("dnsName", ingress.Hostname)
×
1623
                        } else if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
1624
                                aobj.SetAttr("dnsName", dnsName)
×
1625
                        }
×
1626
                }
1627
                if t == "clusterIp" || t == "nodePort" || t == "externalName" {
×
1628
                        aobj.SetAttr("dnsName", dnsName)
×
1629
                }
×
1630
        }
1631
        for _, port := range service.Spec.Ports {
2✔
1632
                proto := getProtocolStr(port.Protocol)
1✔
1633
                p := apicapi.NewVmmInjectedSvcPort(aobjDn,
1✔
1634
                        strconv.Itoa(int(port.Port)), proto, port.TargetPort.String())
1✔
1635
                p.SetAttr("nodePort", strconv.Itoa(int(port.NodePort)))
1✔
1636
                aobj.AddChild(p)
1✔
1637
        }
1✔
1638
        if cont.config.EnableVmmInjectedLabels && service.ObjectMeta.Labels != nil && apicapi.ApicVersion >= "5.2" {
1✔
1639
                for key, val := range service.ObjectMeta.Labels {
×
1640
                        newLabelKey := cont.aciNameForKey("label", key)
×
1641
                        label := apicapi.NewVmmInjectedLabel(aobj.GetDn(),
×
1642
                                newLabelKey, val)
×
1643
                        aobj.AddChild(label)
×
1644
                }
×
1645
        }
1646
        name := cont.aciNameForKey("service-vmm", key)
1✔
1647
        cont.log.Debug("Write Service Object: ", aobj)
1✔
1648
        cont.apicConn.WriteApicObjects(name, apicapi.ApicSlice{aobj})
1✔
1649
        cont.log.Debugf("svcObject: %+v", aobj)
1✔
1650
}
1651

1652
func removeAllConditions(conditions []metav1.Condition, conditionType string) []metav1.Condition {
1✔
1653
        i := 0
1✔
1654
        for _, cond := range conditions {
1✔
1655
                if cond.Type != conditionType {
×
1656
                        conditions[i] = cond
×
1657
                }
×
1658
        }
1659
        return conditions[:i]
1✔
1660
}
1661

1662
func (cont *AciController) updateServiceCondition(service *v1.Service, success bool, reason string, message string) error {
1✔
1663
        conditionType := "LbIpamAllocation"
1✔
1664

1✔
1665
        var err error
1✔
1666
        var condition metav1.Condition
1✔
1667
        if success {
2✔
1668
                condition.Status = metav1.ConditionTrue
1✔
1669
        } else {
2✔
1670
                condition.Status = metav1.ConditionFalse
1✔
1671
                condition.Message = message
1✔
1672
        }
1✔
1673
        condition.Type = conditionType
1✔
1674
        condition.Reason = reason
1✔
1675
        condition.LastTransitionTime = metav1.Time{time.Now()}
1✔
1676
        for _, cond := range service.Status.Conditions {
2✔
1677
                if cond.Type == conditionType &&
1✔
1678
                        cond.Status == condition.Status &&
1✔
1679
                        cond.Message == condition.Message &&
1✔
1680
                        cond.Reason == condition.Reason {
2✔
1681
                        return nil
1✔
1682
                }
1✔
1683
        }
1684

1685
        service.Status.Conditions = removeAllConditions(service.Status.Conditions, conditionType)
1✔
1686
        service.Status.Conditions = append(service.Status.Conditions, condition)
1✔
1687
        _, err = cont.updateServiceStatus(service)
1✔
1688
        return err
1✔
1689
}
1690

1691
func (cont *AciController) validateRequestedIps(lbIpList []string) (net.IP, net.IP, bool) {
1✔
1692
        var ipv4, ipv6 net.IP
1✔
1693
        for _, lbIp := range lbIpList {
2✔
1694
                ip := net.ParseIP(lbIp)
1✔
1695
                if ip != nil {
2✔
1696
                        if ip.To4() != nil {
2✔
1697
                                if ipv4.Equal(net.IP{}) {
2✔
1698
                                        ipv4 = ip
1✔
1699
                                } else {
2✔
1700
                                        cont.log.Error("Annotation should have only one ipv4")
1✔
1701
                                        return ipv4, ipv6, false
1✔
1702
                                }
1✔
1703
                        } else if ip.To16() != nil {
2✔
1704
                                if ipv6.Equal(net.IP{}) {
2✔
1705
                                        ipv6 = ip
1✔
1706
                                } else {
2✔
1707
                                        cont.log.Error("Annotation should have only one ipv6")
1✔
1708
                                        return ipv4, ipv6, false
1✔
1709
                                }
1✔
1710
                        }
1711
                }
1712
        }
1713
        return ipv4, ipv6, true
1✔
1714
}
1715

1716
func (cont *AciController) returnUnusedStaticIngressIps(staticIngressIps, requestedIps []net.IP) {
1✔
1717
        for _, staticIp := range staticIngressIps {
2✔
1718
                found := false
1✔
1719
                for _, reqIp := range requestedIps {
2✔
1720
                        if reqIp.Equal(staticIp) {
2✔
1721
                                found = true
1✔
1722
                        }
1✔
1723
                }
1724
                if !found {
1✔
1725
                        returnIps(cont.staticServiceIps, []net.IP{staticIp})
×
1726
                }
×
1727
        }
1728
}
1729

1730
func (cont *AciController) allocateServiceIps(servicekey string,
1731
        service *v1.Service) bool {
1✔
1732
        logger := serviceLogger(cont.log, service)
1✔
1733
        cont.indexMutex.Lock()
1✔
1734
        meta, ok := cont.serviceMetaCache[servicekey]
1✔
1735
        if !ok {
2✔
1736
                meta = &serviceMeta{}
1✔
1737
                cont.serviceMetaCache[servicekey] = meta
1✔
1738

1✔
1739
                // Read any existing IPs and attempt to allocate them to the pod
1✔
1740
                for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
1741
                        ip := net.ParseIP(ingress.IP)
1✔
1742
                        if ip == nil {
1✔
1743
                                continue
×
1744
                        }
1745
                        if ip.To4() != nil {
2✔
1746
                                if cont.serviceIps.GetV4IpCache()[0].RemoveIp(ip) {
2✔
1747
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
1748
                                } else if cont.staticServiceIps.V4.RemoveIp(ip) {
3✔
1749
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
1750
                                }
1✔
1751
                        } else if ip.To16() != nil {
2✔
1752
                                if cont.serviceIps.GetV6IpCache()[0].RemoveIp(ip) {
2✔
1753
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
1754
                                } else if cont.staticServiceIps.V6.RemoveIp(ip) {
3✔
1755
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
1756
                                }
1✔
1757
                        }
1758
                }
1759
        }
1760

1761
        if !cont.serviceSyncEnabled {
2✔
1762
                cont.indexMutex.Unlock()
1✔
1763
                return false
1✔
1764
        }
1✔
1765

1766
        var requestedIps []net.IP
1✔
1767
        // try to give the requested load balancer IP to the pod
1✔
1768
        lbIps, ok := service.ObjectMeta.Annotations[metadata.LbIpAnnotation]
1✔
1769
        if ok {
2✔
1770
                lbIpList := strings.Split(lbIps, ",")
1✔
1771
                ipv4, ipv6, valid := cont.validateRequestedIps(lbIpList)
1✔
1772
                if valid {
2✔
1773
                        if ipv4 != nil {
2✔
1774
                                requestedIps = append(requestedIps, ipv4)
1✔
1775
                        }
1✔
1776
                        if ipv6 != nil {
2✔
1777
                                requestedIps = append(requestedIps, ipv6)
1✔
1778
                        }
1✔
1779
                } else {
1✔
1780
                        cont.returnServiceIps(meta.ingressIps)
1✔
1781
                        cont.log.Error("Invalid LB IP annotation for service ", servicekey)
1✔
1782
                        err := cont.updateServiceCondition(service, false, "InvalidAnnotation", "Invalid Loadbalancer IP annotation")
1✔
1783
                        if err != nil {
1✔
1784
                                logger.Error("Failed to update service status : ", err)
×
1785
                                cont.indexMutex.Unlock()
×
1786
                                return true
×
1787
                        }
×
1788
                        cont.indexMutex.Unlock()
1✔
1789
                        return false
1✔
1790
                }
1791
        } else {
1✔
1792
                requestedIp := net.ParseIP(service.Spec.LoadBalancerIP)
1✔
1793
                if requestedIp != nil {
2✔
1794
                        requestedIps = append(requestedIps, requestedIp)
1✔
1795
                }
1✔
1796
        }
1797
        if len(requestedIps) > 0 {
2✔
1798
                meta.requestedIps = []net.IP{}
1✔
1799
                for _, requestedIp := range requestedIps {
2✔
1800
                        hasRequestedIp := false
1✔
1801
                        for _, ip := range meta.staticIngressIps {
2✔
1802
                                if reflect.DeepEqual(requestedIp, ip) {
2✔
1803
                                        hasRequestedIp = true
1✔
1804
                                }
1✔
1805
                        }
1806
                        if !hasRequestedIp {
2✔
1807
                                if requestedIp.To4() != nil &&
1✔
1808
                                        cont.staticServiceIps.V4.RemoveIp(requestedIp) {
2✔
1809
                                        hasRequestedIp = true
1✔
1810
                                } else if requestedIp.To16() != nil &&
2✔
1811
                                        cont.staticServiceIps.V6.RemoveIp(requestedIp) {
2✔
1812
                                        hasRequestedIp = true
1✔
1813
                                }
1✔
1814
                        }
1815
                        if hasRequestedIp {
2✔
1816
                                meta.requestedIps = append(meta.requestedIps, requestedIp)
1✔
1817
                        }
1✔
1818
                }
1819
                cont.returnUnusedStaticIngressIps(meta.staticIngressIps, meta.requestedIps)
1✔
1820
                meta.staticIngressIps = meta.requestedIps
1✔
1821
                cont.returnServiceIps(meta.ingressIps)
1✔
1822
                meta.ingressIps = nil
1✔
1823
                // If no requested ips are allocatable
1✔
1824
                if len(meta.requestedIps) < 1 {
2✔
1825
                        logger.Error("No Requested Ip addresses available for service ", servicekey)
1✔
1826
                        err := cont.updateServiceCondition(service, false, "RequestedIpsNotAllocatable", "The requested ips for loadbalancer service are not available or not in extern static range")
1✔
1827
                        if err != nil {
1✔
1828
                                cont.indexMutex.Unlock()
×
1829
                                logger.Error("Failed to update service status: ", err)
×
1830
                                return true
×
1831
                        }
×
1832
                        cont.indexMutex.Unlock()
1✔
1833
                        return false
1✔
1834
                }
1835
        } else if len(meta.requestedIps) > 0 {
1✔
1836
                meta.requestedIps = nil
×
1837
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
×
1838
                meta.staticIngressIps = nil
×
1839
        }
×
1840
        ingressIps := make([]net.IP, 0)
1✔
1841
        ingressIps = append(ingressIps, meta.ingressIps...)
1✔
1842
        ingressIps = append(ingressIps, meta.staticIngressIps...)
1✔
1843

1✔
1844
        var ipv4, ipv6 net.IP
1✔
1845
        for _, ip := range ingressIps {
2✔
1846
                if ip.To4() != nil {
2✔
1847
                        ipv4 = ip
1✔
1848
                } else if ip.To16() != nil {
3✔
1849
                        ipv6 = ip
1✔
1850
                }
1✔
1851
        }
1852
        var clusterIPv4, clusterIPv6 net.IP
1✔
1853
        clusterIPs := append([]string{service.Spec.ClusterIP}, service.Spec.ClusterIPs...)
1✔
1854
        for _, ipStr := range clusterIPs {
2✔
1855
                ip := net.ParseIP(ipStr)
1✔
1856
                if ip == nil {
1✔
1857
                        continue
×
1858
                }
1859
                if ip.To4() != nil && clusterIPv4 == nil {
2✔
1860
                        clusterIPv4 = ip
1✔
1861
                } else if ip.To16() != nil && strings.Contains(ip.String(), ":") && clusterIPv6 == nil {
3✔
1862
                        clusterIPv6 = ip
1✔
1863
                }
1✔
1864
        }
1865
        if clusterIPv4 != nil && ipv4 == nil {
2✔
1866
                if len(requestedIps) < 1 {
2✔
1867
                        ipv4, _ = cont.serviceIps.AllocateIp(true)
1✔
1868
                        if ipv4 != nil {
2✔
1869
                                ingressIps = append(ingressIps, ipv4)
1✔
1870
                        }
1✔
1871
                }
1872
        } else if clusterIPv4 == nil && ipv4 != nil {
1✔
1873
                cont.removeIpFromIngressIPList(&ingressIps, ipv4)
×
1874
        }
×
1875

1876
        if clusterIPv6 != nil && ipv6 == nil {
2✔
1877
                if len(requestedIps) < 1 {
2✔
1878
                        ipv6, _ = cont.serviceIps.AllocateIp(false)
1✔
1879
                        if ipv6 != nil {
2✔
1880
                                ingressIps = append(ingressIps, ipv6)
1✔
1881
                        }
1✔
1882
                }
1883
        } else if clusterIPv6 == nil && ipv6 != nil {
1✔
1884
                cont.removeIpFromIngressIPList(&ingressIps, ipv6)
×
1885
        }
×
1886

1887
        meta.ingressIps = ingressIps
1✔
1888
        if ipv4 == nil && ipv6 == nil {
2✔
1889
                logger.Error("No IP addresses available for service")
1✔
1890
                cont.indexMutex.Unlock()
1✔
1891
                return true
1✔
1892
        }
1✔
1893
        cont.indexMutex.Unlock()
1✔
1894
        var newIngress []v1.LoadBalancerIngress
1✔
1895
        for _, ip := range meta.ingressIps {
2✔
1896
                newIngress = append(newIngress, v1.LoadBalancerIngress{IP: ip.String()})
1✔
1897
        }
1✔
1898

1899
        if !reflect.DeepEqual(newIngress, service.Status.LoadBalancer.Ingress) {
2✔
1900
                service.Status.LoadBalancer.Ingress = newIngress
1✔
1901

1✔
1902
                _, err := cont.updateServiceStatus(service)
1✔
1903
                if err != nil {
1✔
UNCOV
1904
                        logger.Error("Failed to update service: ", err)
×
UNCOV
1905
                        return true
×
1906
                } else {
1✔
1907
                        logger.WithFields(logrus.Fields{
1✔
1908
                                "status": service.Status.LoadBalancer.Ingress,
1✔
1909
                        }).Info("Updated service load balancer status")
1✔
1910
                }
1✔
1911
        }
1912

1913
        success := true
1✔
1914
        reason := "Success"
1✔
1915
        message := ""
1✔
1916
        if len(requestedIps) > 0 && len(requestedIps) != len(meta.ingressIps) {
1✔
UNCOV
1917
                success = false
×
UNCOV
1918
                reason = "OneIpNotAllocatable"
×
UNCOV
1919
                message = "One of the requested Ips is not allocatable"
×
UNCOV
1920
        }
×
1921
        err := cont.updateServiceCondition(service, success, reason, message)
1✔
1922
        if err != nil {
1✔
1923
                logger.Error("Failed to update service status: ", err)
×
1924
                return true
×
1925
        }
×
1926
        return false
1✔
1927
}
1928

1929
func (cont *AciController) handleServiceDelete(servicekey string) bool {
1✔
1930
        if cont.config.ChainedMode {
1✔
UNCOV
1931
                return false
×
UNCOV
1932
        }
×
1933
        cont.clearLbService(servicekey)
1✔
1934
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("service-vmm",
1✔
1935
                servicekey))
1✔
1936
        return false
1✔
1937
}
1938

1939
func (cont *AciController) handleServiceUpdate(service *v1.Service) bool {
1✔
1940
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
1941
        if err != nil {
1✔
UNCOV
1942
                serviceLogger(cont.log, service).
×
UNCOV
1943
                        Error("Could not create service key: ", err)
×
UNCOV
1944
                return false
×
UNCOV
1945
        }
×
1946
        if cont.config.ChainedMode {
1✔
1947
                return false
×
1948
        }
×
1949
        var requeue bool
1✔
1950
        isLoadBalancer := service.Spec.Type == v1.ServiceTypeLoadBalancer
1✔
1951
        aciLB := cont.config.LBType == lbTypeAci
1✔
1952
        if isLoadBalancer && aciLB {
2✔
1953
                if *cont.config.AllocateServiceIps {
2✔
1954
                        requeue = cont.allocateServiceIps(servicekey, service)
1✔
1955
                }
1✔
1956
                cont.indexMutex.Lock()
1✔
1957
                if cont.serviceSyncEnabled {
2✔
1958
                        cont.indexMutex.Unlock()
1✔
1959
                        err = cont.updateServiceDeviceInstance(servicekey, service)
1✔
1960
                        if err != nil {
1✔
UNCOV
1961
                                serviceLogger(cont.log, service).
×
UNCOV
1962
                                        Error("Failed to update service device Instance: ", err)
×
UNCOV
1963
                                return true
×
UNCOV
1964
                        }
×
1965
                } else {
1✔
1966
                        cont.indexMutex.Unlock()
1✔
1967
                }
1✔
1968
        } else if aciLB {
2✔
1969
                cont.clearLbService(servicekey)
1✔
1970
        }
1✔
1971
        cont.writeApicSvc(servicekey, service)
1✔
1972
        return requeue
1✔
1973
}
1974

1975
func (cont *AciController) clearLbService(servicekey string) {
1✔
1976
        cont.indexMutex.Lock()
1✔
1977
        if meta, ok := cont.serviceMetaCache[servicekey]; ok {
2✔
1978
                cont.returnServiceIps(meta.ingressIps)
1✔
1979
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
1✔
1980
                delete(cont.serviceMetaCache, servicekey)
1✔
1981
        }
1✔
1982
        cont.indexMutex.Unlock()
1✔
1983
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("svc", servicekey))
1✔
1984
}
1985

1986
func getEndpointsIps(endpoints *v1.Endpoints) map[string]bool {
1✔
1987
        ips := make(map[string]bool)
1✔
1988
        for _, subset := range endpoints.Subsets {
2✔
1989
                for _, addr := range subset.Addresses {
2✔
1990
                        ips[addr.IP] = true
1✔
1991
                }
1✔
1992
                for _, addr := range subset.NotReadyAddresses {
1✔
UNCOV
1993
                        ips[addr.IP] = true
×
UNCOV
1994
                }
×
1995
        }
1996
        return ips
1✔
1997
}
1998

1999
func getServiceTargetPorts(service *v1.Service) map[string]targetPort {
1✔
2000
        ports := make(map[string]targetPort)
1✔
2001
        for _, port := range service.Spec.Ports {
2✔
2002
                portNum := port.TargetPort.IntValue()
1✔
2003
                if portNum <= 0 {
2✔
2004
                        portNum = int(port.Port)
1✔
2005
                }
1✔
2006
                key := portProto(&port.Protocol) + "-num-" + strconv.Itoa(portNum)
1✔
2007
                ports[key] = targetPort{
1✔
2008
                        proto: port.Protocol,
1✔
2009
                        ports: []int{portNum},
1✔
2010
                }
1✔
2011
        }
2012
        return ports
1✔
2013
}
2014

2015
func (cont *AciController) endpointsAdded(obj interface{}) {
1✔
2016
        endpoints := obj.(*v1.Endpoints)
1✔
2017
        servicekey, err := cache.MetaNamespaceKeyFunc(obj.(*v1.Endpoints))
1✔
2018
        if err != nil {
1✔
UNCOV
2019
                cont.log.Error("Could not create service key: ", err)
×
UNCOV
2020
                return
×
UNCOV
2021
        }
×
2022

2023
        ips := getEndpointsIps(endpoints)
1✔
2024
        cont.indexMutex.Lock()
1✔
2025
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
2026
        cont.queueIPNetPolUpdates(ips)
1✔
2027
        cont.indexMutex.Unlock()
1✔
2028

1✔
2029
        cont.queueEndpointsNetPolUpdates(endpoints)
1✔
2030

1✔
2031
        cont.queueServiceUpdateByKey(servicekey)
1✔
2032
}
2033

2034
func (cont *AciController) endpointsDeleted(obj interface{}) {
1✔
2035
        endpoints, isEndpoints := obj.(*v1.Endpoints)
1✔
2036
        if !isEndpoints {
1✔
UNCOV
2037
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
UNCOV
2038
                if !ok {
×
UNCOV
2039
                        cont.log.Error("Received unexpected object: ", obj)
×
UNCOV
2040
                        return
×
UNCOV
2041
                }
×
2042
                endpoints, ok = deletedState.Obj.(*v1.Endpoints)
×
2043
                if !ok {
×
2044
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpoints object: ", deletedState.Obj)
×
2045
                        return
×
2046
                }
×
2047
        }
2048
        servicekey, err := cache.MetaNamespaceKeyFunc(endpoints)
1✔
2049
        if err != nil {
1✔
2050
                cont.log.Error("Could not create service key: ", err)
×
2051
                return
×
UNCOV
2052
        }
×
2053

2054
        ips := getEndpointsIps(endpoints)
1✔
2055
        cont.indexMutex.Lock()
1✔
2056
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
2057
        cont.queueIPNetPolUpdates(ips)
1✔
2058
        cont.indexMutex.Unlock()
1✔
2059

1✔
2060
        cont.queueEndpointsNetPolUpdates(endpoints)
1✔
2061

1✔
2062
        cont.queueServiceUpdateByKey(servicekey)
1✔
2063
}
2064

2065
func (cont *AciController) endpointsUpdated(oldEps, newEps interface{}) {
1✔
2066
        oldendpoints := oldEps.(*v1.Endpoints)
1✔
2067
        newendpoints := newEps.(*v1.Endpoints)
1✔
2068
        servicekey, err := cache.MetaNamespaceKeyFunc(newendpoints)
1✔
2069
        if err != nil {
1✔
UNCOV
2070
                cont.log.Error("Could not create service key: ", err)
×
UNCOV
2071
                return
×
UNCOV
2072
        }
×
2073

2074
        oldIps := getEndpointsIps(oldendpoints)
1✔
2075
        newIps := getEndpointsIps(newendpoints)
1✔
2076
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
2077
                cont.indexMutex.Lock()
1✔
2078
                cont.queueIPNetPolUpdates(oldIps)
1✔
2079
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
2080
                cont.queueIPNetPolUpdates(newIps)
1✔
2081
                cont.indexMutex.Unlock()
1✔
2082
        }
1✔
2083

2084
        if !reflect.DeepEqual(oldendpoints.Subsets, newendpoints.Subsets) {
2✔
2085
                cont.queueEndpointsNetPolUpdates(oldendpoints)
1✔
2086
                cont.queueEndpointsNetPolUpdates(newendpoints)
1✔
2087
        }
1✔
2088

2089
        cont.queueServiceUpdateByKey(servicekey)
1✔
2090
}
2091

2092
func (cont *AciController) serviceAdded(obj interface{}) {
1✔
2093
        service := obj.(*v1.Service)
1✔
2094
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2095
        if err != nil {
1✔
UNCOV
2096
                serviceLogger(cont.log, service).
×
UNCOV
2097
                        Error("Could not create service key: ", err)
×
UNCOV
2098
                return
×
UNCOV
2099
        }
×
2100

2101
        ports := getServiceTargetPorts(service)
1✔
2102
        cont.indexMutex.Lock()
1✔
2103
        cont.queuePortNetPolUpdates(ports)
1✔
2104
        cont.updateTargetPortIndex(true, servicekey, nil, ports)
1✔
2105
        cont.indexMutex.Unlock()
1✔
2106

1✔
2107
        cont.queueServiceUpdateByKey(servicekey)
1✔
2108
}
2109

2110
func (cont *AciController) serviceUpdated(oldSvc, newSvc interface{}) {
1✔
2111
        oldservice := oldSvc.(*v1.Service)
1✔
2112
        newservice := newSvc.(*v1.Service)
1✔
2113
        servicekey, err := cache.MetaNamespaceKeyFunc(newservice)
1✔
2114
        if err != nil {
1✔
UNCOV
2115
                serviceLogger(cont.log, newservice).
×
UNCOV
2116
                        Error("Could not create service key: ", err)
×
UNCOV
2117
                return
×
UNCOV
2118
        }
×
2119
        oldPorts := getServiceTargetPorts(oldservice)
1✔
2120
        newPorts := getServiceTargetPorts(newservice)
1✔
2121
        if !reflect.DeepEqual(oldPorts, newPorts) {
1✔
2122
                cont.indexMutex.Lock()
×
2123
                cont.queuePortNetPolUpdates(oldPorts)
×
UNCOV
2124
                cont.updateTargetPortIndex(true, servicekey, oldPorts, newPorts)
×
UNCOV
2125
                cont.queuePortNetPolUpdates(newPorts)
×
UNCOV
2126
                cont.indexMutex.Unlock()
×
2127
        }
×
2128
        cont.queueServiceUpdateByKey(servicekey)
1✔
2129
}
2130

2131
func (cont *AciController) serviceDeleted(obj interface{}) {
1✔
2132
        service, isService := obj.(*v1.Service)
1✔
2133
        if !isService {
1✔
UNCOV
2134
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
UNCOV
2135
                if !ok {
×
UNCOV
2136
                        serviceLogger(cont.log, service).
×
UNCOV
2137
                                Error("Received unexpected object: ", obj)
×
UNCOV
2138
                        return
×
2139
                }
×
2140
                service, ok = deletedState.Obj.(*v1.Service)
×
2141
                if !ok {
×
2142
                        serviceLogger(cont.log, service).
×
2143
                                Error("DeletedFinalStateUnknown contained non-Services object: ", deletedState.Obj)
×
2144
                        return
×
2145
                }
×
2146
        }
2147
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2148
        if err != nil {
1✔
2149
                serviceLogger(cont.log, service).
×
2150
                        Error("Could not create service key: ", err)
×
UNCOV
2151
                return
×
UNCOV
2152
        }
×
2153

2154
        ports := getServiceTargetPorts(service)
1✔
2155
        cont.indexMutex.Lock()
1✔
2156
        cont.updateTargetPortIndex(true, servicekey, ports, nil)
1✔
2157
        cont.queuePortNetPolUpdates(ports)
1✔
2158
        delete(cont.snatServices, servicekey)
1✔
2159
        cont.indexMutex.Unlock()
1✔
2160

1✔
2161
        deletedServiceKey := "DELETED_" + servicekey
1✔
2162
        cont.queueServiceUpdateByKey(deletedServiceKey)
1✔
2163
}
2164

2165
func (cont *AciController) serviceFullSync() {
1✔
2166
        cache.ListAll(cont.serviceIndexer, labels.Everything(),
1✔
2167
                func(sobj interface{}) {
2✔
2168
                        cont.queueServiceUpdate(sobj.(*v1.Service))
1✔
2169
                })
1✔
2170
}
2171

2172
func (cont *AciController) getEndpointSliceIps(endpointSlice *discovery.EndpointSlice) map[string]bool {
1✔
2173
        ips := make(map[string]bool)
1✔
2174
        for _, endpoints := range endpointSlice.Endpoints {
2✔
2175
                for _, addr := range endpoints.Addresses {
2✔
2176
                        ips[addr] = true
1✔
2177
                }
1✔
2178
        }
2179
        return ips
1✔
2180
}
2181

UNCOV
2182
func (cont *AciController) notReadyEndpointPresent(endpointSlice *discovery.EndpointSlice) bool {
×
UNCOV
2183
        for _, endpoints := range endpointSlice.Endpoints {
×
UNCOV
2184
                if (endpoints.Conditions.Ready != nil && !*endpoints.Conditions.Ready) &&
×
UNCOV
2185
                        (endpoints.Conditions.Terminating == nil || !*endpoints.Conditions.Terminating) {
×
UNCOV
2186
                        return true
×
2187
                }
×
2188
        }
2189
        return false
×
2190
}
2191

2192
func (cont *AciController) getEndpointSliceEpIps(endpoints *discovery.Endpoint) map[string]bool {
×
UNCOV
2193
        ips := make(map[string]bool)
×
2194
        for _, addr := range endpoints.Addresses {
×
UNCOV
2195
                ips[addr] = true
×
UNCOV
2196
        }
×
2197
        return ips
×
2198
}
2199

2200
func (cont *AciController) processDelayedEpSlices() {
1✔
2201
        var processEps []DelayedEpSlice
1✔
2202
        cont.indexMutex.Lock()
1✔
2203
        for i := 0; i < len(cont.delayedEpSlices); i++ {
1✔
UNCOV
2204
                delayedepslice := cont.delayedEpSlices[i]
×
UNCOV
2205
                if time.Now().After(delayedepslice.DelayedTime) {
×
UNCOV
2206
                        var toprocess DelayedEpSlice
×
UNCOV
2207
                        err := util.DeepCopyObj(&delayedepslice, &toprocess)
×
UNCOV
2208
                        if err != nil {
×
2209
                                cont.log.Error(err)
×
2210
                                continue
×
2211
                        }
2212
                        processEps = append(processEps, toprocess)
×
2213
                        cont.delayedEpSlices = append(cont.delayedEpSlices[:i], cont.delayedEpSlices[i+1:]...)
×
2214
                }
2215
        }
2216

2217
        cont.indexMutex.Unlock()
1✔
2218
        for _, epslice := range processEps {
1✔
UNCOV
2219
                //ignore the epslice if newly added endpoint is not ready
×
UNCOV
2220
                if cont.notReadyEndpointPresent(epslice.NewEpSlice) {
×
UNCOV
2221
                        cont.log.Debug("Ignoring the update as the new endpoint is not ready : ", epslice.NewEpSlice)
×
UNCOV
2222
                } else {
×
UNCOV
2223
                        cont.log.Debug("Processing update of epslice : ", epslice.NewEpSlice)
×
2224
                        cont.doendpointSliceUpdated(epslice.OldEpSlice, epslice.NewEpSlice)
×
2225
                }
×
2226
        }
2227
}
2228

2229
func (cont *AciController) endpointSliceAdded(obj interface{}) {
1✔
2230
        endpointslice, ok := obj.(*discovery.EndpointSlice)
1✔
2231
        if !ok {
1✔
UNCOV
2232
                cont.log.Error("error processing Endpointslice object: ", obj)
×
UNCOV
2233
                return
×
UNCOV
2234
        }
×
2235
        servicekey, valid := getServiceKey(endpointslice)
1✔
2236
        if !valid {
1✔
2237
                return
×
2238
        }
×
2239
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
2240
        cont.indexMutex.Lock()
1✔
2241
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
2242
        cont.queueIPNetPolUpdates(ips)
1✔
2243
        cont.indexMutex.Unlock()
1✔
2244

1✔
2245
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
2246

1✔
2247
        cont.queueServiceUpdateByKey(servicekey)
1✔
2248
        cont.log.Info("EndPointSlice Object Added: ", servicekey)
1✔
2249
}
2250

2251
func (cont *AciController) endpointSliceDeleted(obj interface{}) {
1✔
2252
        endpointslice, isEndpointslice := obj.(*discovery.EndpointSlice)
1✔
2253
        if !isEndpointslice {
1✔
UNCOV
2254
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
UNCOV
2255
                if !ok {
×
UNCOV
2256
                        cont.log.Error("Received unexpected object: ", obj)
×
UNCOV
2257
                        return
×
UNCOV
2258
                }
×
2259
                endpointslice, ok = deletedState.Obj.(*discovery.EndpointSlice)
×
2260
                if !ok {
×
2261
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpointslice object: ", deletedState.Obj)
×
2262
                        return
×
2263
                }
×
2264
        }
2265
        servicekey, valid := getServiceKey(endpointslice)
1✔
2266
        if !valid {
1✔
2267
                return
×
2268
        }
×
2269
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
2270
        cont.indexMutex.Lock()
1✔
2271
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
2272
        cont.queueIPNetPolUpdates(ips)
1✔
2273
        cont.indexMutex.Unlock()
1✔
2274
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
2275
        cont.queueServiceUpdateByKey(servicekey)
1✔
2276
}
2277

2278
// Checks if the given service is present in the user configured list of services
2279
// for pbr delay and if present, returns the servie specific delay if configured
UNCOV
2280
func (cont *AciController) svcInAddDelayList(name, ns string) (int, bool) {
×
UNCOV
2281
        for _, svc := range cont.config.ServiceGraphEndpointAddDelay.Services {
×
UNCOV
2282
                if svc.Name == name && svc.Namespace == ns {
×
UNCOV
2283
                        return svc.Delay, true
×
UNCOV
2284
                }
×
2285
        }
2286
        return 0, false
×
2287
}
2288

2289
// Check if the endpointslice update notification has any deletion of enpoint
UNCOV
2290
func (cont *AciController) isDeleteEndpointSlice(oldendpointslice, newendpointslice *discovery.EndpointSlice) bool {
×
2291
        del := false
×
UNCOV
2292

×
UNCOV
2293
        // if any endpoint is removed from endpontslice
×
UNCOV
2294
        if len(newendpointslice.Endpoints) < len(oldendpointslice.Endpoints) {
×
2295
                del = true
×
2296
        }
×
2297

2298
        if !del {
×
2299
                // if any one of the endpoint is in terminating state
×
2300
                for _, endpoint := range newendpointslice.Endpoints {
×
2301
                        if endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating {
×
UNCOV
2302
                                del = true
×
2303
                                break
×
2304
                        }
2305
                }
2306
        }
2307
        if !del {
×
2308
                // if any one of endpoint moved from ready state to not-ready state
×
UNCOV
2309
                for ix := range oldendpointslice.Endpoints {
×
UNCOV
2310
                        oldips := cont.getEndpointSliceEpIps(&oldendpointslice.Endpoints[ix])
×
UNCOV
2311
                        for newIx := range newendpointslice.Endpoints {
×
2312
                                newips := cont.getEndpointSliceEpIps(&newendpointslice.Endpoints[newIx])
×
2313
                                if reflect.DeepEqual(oldips, newips) {
×
2314
                                        if (oldendpointslice.Endpoints[ix].Conditions.Ready != nil && *oldendpointslice.Endpoints[ix].Conditions.Ready) &&
×
2315
                                                (newendpointslice.Endpoints[newIx].Conditions.Ready != nil && !*newendpointslice.Endpoints[newIx].Conditions.Ready) {
×
2316
                                                del = true
×
2317
                                        }
×
2318
                                        break
×
2319
                                }
2320
                        }
2321
                }
2322
        }
2323
        return del
×
2324
}
2325

2326
func (cont *AciController) doendpointSliceUpdatedDelay(oldendpointslice *discovery.EndpointSlice,
UNCOV
2327
        newendpointslice *discovery.EndpointSlice) {
×
2328
        svc, ns, valid := getServiceNameAndNs(newendpointslice)
×
UNCOV
2329
        if !valid {
×
UNCOV
2330
                return
×
UNCOV
2331
        }
×
2332
        svckey, valid := getServiceKey(newendpointslice)
×
2333
        if !valid {
×
2334
                return
×
2335
        }
×
2336
        delay := cont.config.ServiceGraphEndpointAddDelay.Delay
×
2337
        svcDelay, exists := cont.svcInAddDelayList(svc, ns)
×
2338
        if svcDelay > 0 {
×
2339
                delay = svcDelay
×
2340
        }
×
2341
        delayedsvc := exists && delay > 0
×
2342
        if delayedsvc {
×
2343
                cont.log.Debug("Delay of ", delay, " seconds is applicable for svc :", svc, " in ns: ", ns)
×
2344
                var delayedepslice DelayedEpSlice
×
2345
                delayedepslice.OldEpSlice = oldendpointslice
×
2346
                delayedepslice.ServiceKey = svckey
×
2347
                delayedepslice.NewEpSlice = newendpointslice
×
2348
                currentTime := time.Now()
×
2349
                delayedepslice.DelayedTime = currentTime.Add(time.Duration(delay) * time.Second)
×
2350
                cont.indexMutex.Lock()
×
2351
                cont.delayedEpSlices = append(cont.delayedEpSlices, &delayedepslice)
×
2352
                cont.indexMutex.Unlock()
×
2353
        } else {
×
2354
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
2355
        }
×
2356

2357
        if delayedsvc && cont.isDeleteEndpointSlice(oldendpointslice, newendpointslice) {
×
2358
                cont.log.Debug("Proceeding by ignoring delay as the update is due to delete of endpoint")
×
2359
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
2360
        }
×
2361
}
2362

2363
func (cont *AciController) endpointSliceUpdated(oldobj, newobj interface{}) {
1✔
2364
        oldendpointslice, ok := oldobj.(*discovery.EndpointSlice)
1✔
2365
        if !ok {
1✔
UNCOV
2366
                cont.log.Error("error processing Endpointslice object: ", oldobj)
×
UNCOV
2367
                return
×
UNCOV
2368
        }
×
2369
        newendpointslice, ok := newobj.(*discovery.EndpointSlice)
1✔
2370
        if !ok {
1✔
2371
                cont.log.Error("error processing Endpointslice object: ", newobj)
×
2372
                return
×
2373
        }
×
2374
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
UNCOV
2375
                cont.doendpointSliceUpdatedDelay(oldendpointslice, newendpointslice)
×
2376
        } else {
1✔
2377
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
1✔
2378
        }
1✔
2379
}
2380

2381
func (cont *AciController) doendpointSliceUpdated(oldendpointslice *discovery.EndpointSlice,
2382
        newendpointslice *discovery.EndpointSlice) {
1✔
2383
        servicekey, valid := getServiceKey(newendpointslice)
1✔
2384
        if !valid {
1✔
UNCOV
2385
                return
×
UNCOV
2386
        }
×
2387
        oldIps := cont.getEndpointSliceIps(oldendpointslice)
1✔
2388
        newIps := cont.getEndpointSliceIps(newendpointslice)
1✔
2389
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
2390
                cont.indexMutex.Lock()
1✔
2391
                cont.queueIPNetPolUpdates(oldIps)
1✔
2392
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
2393
                cont.queueIPNetPolUpdates(newIps)
1✔
2394
                cont.indexMutex.Unlock()
1✔
2395
        }
1✔
2396

2397
        if !reflect.DeepEqual(oldendpointslice.Endpoints, newendpointslice.Endpoints) {
2✔
2398
                cont.queueEndpointSliceNetPolUpdates(oldendpointslice)
1✔
2399
                cont.queueEndpointSliceNetPolUpdates(newendpointslice)
1✔
2400
        }
1✔
2401
        cont.log.Debug("EndPointSlice Object Update: ", servicekey)
1✔
2402
        cont.queueServiceUpdateByKey(servicekey)
1✔
2403
}
2404

2405
func (cont *AciController) queueEndpointSliceNetPolUpdates(endpointslice *discovery.EndpointSlice) {
1✔
2406
        for _, endpoint := range endpointslice.Endpoints {
2✔
2407
                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" ||
1✔
2408
                        endpoint.TargetRef.Namespace == "" || endpoint.TargetRef.Name == "" {
2✔
2409
                        continue
1✔
2410
                }
2411
                if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
1✔
UNCOV
2412
                        continue
×
2413
                }
2414
                podkey := endpoint.TargetRef.Namespace + "/" + endpoint.TargetRef.Name
1✔
2415
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
2416
                ps := make(map[string]bool)
1✔
2417
                for _, npkey := range npkeys {
2✔
2418
                        cont.queueNetPolUpdateByKey(npkey)
1✔
2419
                }
1✔
2420
                // Process if the  any matching namedport wildcard policy is present
2421
                // ignore np already processed policies
2422
                cont.queueMatchingNamedNp(ps, podkey)
1✔
2423
        }
2424
}
2425

2426
func getServiceKey(endPointSlice *discovery.EndpointSlice) (string, bool) {
1✔
2427
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
1✔
2428
        if !ok {
1✔
UNCOV
2429
                return "", false
×
UNCOV
2430
        }
×
2431
        return endPointSlice.ObjectMeta.Namespace + "/" + serviceName, true
1✔
2432
}
2433

2434
func getServiceNameAndNs(endPointSlice *discovery.EndpointSlice) (string, string, bool) {
×
2435
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
×
UNCOV
2436
        if !ok {
×
UNCOV
2437
                return "", "", false
×
UNCOV
2438
        }
×
2439
        return serviceName, endPointSlice.ObjectMeta.Namespace, true
×
2440
}
2441

2442
// can be called with index lock
2443
func (sep *serviceEndpoint) UpdateServicesForNode(nodename string) {
1✔
2444
        cont := sep.cont
1✔
2445
        cache.ListAll(cont.endpointsIndexer, labels.Everything(),
1✔
2446
                func(endpointsobj interface{}) {
2✔
2447
                        endpoints := endpointsobj.(*v1.Endpoints)
1✔
2448
                        for _, subset := range endpoints.Subsets {
2✔
2449
                                for _, addr := range subset.Addresses {
2✔
2450
                                        if addr.NodeName != nil && *addr.NodeName == nodename {
2✔
2451
                                                servicekey, err :=
1✔
2452
                                                        cache.MetaNamespaceKeyFunc(endpointsobj.(*v1.Endpoints))
1✔
2453
                                                if err != nil {
1✔
UNCOV
2454
                                                        cont.log.Error("Could not create endpoints key: ", err)
×
UNCOV
2455
                                                        return
×
UNCOV
2456
                                                }
×
2457
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
2458
                                                return
1✔
2459
                                        }
2460
                                }
2461
                        }
2462
                })
2463
}
2464

2465
func (seps *serviceEndpointSlice) UpdateServicesForNode(nodename string) {
1✔
2466
        // 1. List all the endpointslice and check for matching nodename
1✔
2467
        // 2. if it matches trigger the Service update and mark it visited
1✔
2468
        cont := seps.cont
1✔
2469
        visited := make(map[string]bool)
1✔
2470
        cache.ListAll(cont.endpointSliceIndexer, labels.Everything(),
1✔
2471
                func(endpointSliceobj interface{}) {
2✔
2472
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2473
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2474
                                if endpoint.NodeName != nil && *endpoint.NodeName == nodename {
2✔
2475
                                        servicekey, valid := getServiceKey(endpointSlices)
1✔
2476
                                        if !valid {
1✔
UNCOV
2477
                                                return
×
UNCOV
2478
                                        }
×
2479
                                        if _, ok := visited[servicekey]; !ok {
2✔
2480
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
2481
                                                visited[servicekey] = true
1✔
2482
                                                return
1✔
2483
                                        }
1✔
2484
                                }
2485
                        }
2486
                })
2487
}
2488
func (cont *AciController) setNodeMap(nodeMap map[string]*metadata.ServiceEndpoint, nodeName string) {
1✔
2489
        nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
2490
        if !ok {
1✔
UNCOV
2491
                return
×
UNCOV
2492
        }
×
2493
        _, ok = cont.fabricPathForNode(nodeName)
1✔
2494
        if !ok {
2✔
2495
                return
1✔
2496
        }
1✔
2497
        nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
2498
}
2499

2500
// 2 cases when epslices corresponding to given service is presnt in delayedEpSlices:
2501
//  1. endpoint not present in delayedEpSlices of the service
2502
//  2. endpoint present in delayedEpSlices of the service but in not ready state
2503
//
2504
// indexMutex lock must be acquired before calling the function
UNCOV
2505
func (cont *AciController) isDelayedEndpoint(endpoint *discovery.Endpoint, svckey string) bool {
×
UNCOV
2506
        delayed := false
×
UNCOV
2507
        endpointips := cont.getEndpointSliceEpIps(endpoint)
×
UNCOV
2508
        for _, delayedepslices := range cont.delayedEpSlices {
×
UNCOV
2509
                if delayedepslices.ServiceKey == svckey {
×
2510
                        var found bool
×
2511
                        epslice := delayedepslices.OldEpSlice
×
2512
                        for ix := range epslice.Endpoints {
×
2513
                                epips := cont.getEndpointSliceEpIps(&epslice.Endpoints[ix])
×
2514
                                if reflect.DeepEqual(endpointips, epips) {
×
2515
                                        // case 2
×
2516
                                        if epslice.Endpoints[ix].Conditions.Ready != nil && !*epslice.Endpoints[ix].Conditions.Ready {
×
2517
                                                delayed = true
×
2518
                                        }
×
2519
                                        found = true
×
2520
                                }
2521
                        }
2522
                        // case 1
2523
                        if !found {
×
2524
                                delayed = true
×
UNCOV
2525
                        }
×
2526
                }
2527
        }
2528
        return delayed
×
2529
}
2530

2531
// set nodemap only if endoint is ready and not in delayedEpSlices
2532
func (cont *AciController) setNodeMapDelay(nodeMap map[string]*metadata.ServiceEndpoint,
2533
        endpoint *discovery.Endpoint, service *v1.Service) {
×
UNCOV
2534
        svckey, err := cache.MetaNamespaceKeyFunc(service)
×
UNCOV
2535
        if err != nil {
×
UNCOV
2536
                cont.log.Error("Could not create service key: ", err)
×
UNCOV
2537
                return
×
2538
        }
×
2539
        if cont.config.NoWaitForServiceEpReadiness ||
×
2540
                (endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready) {
×
2541
                if endpoint.NodeName != nil && *endpoint.NodeName != "" {
×
2542
                        // donot setNodeMap for endpoint if:
×
2543
                        //   endpoint is newly added
×
2544
                        //   endpoint status changed from not ready to ready
×
2545
                        if !cont.isDelayedEndpoint(endpoint, svckey) {
×
2546
                                cont.setNodeMap(nodeMap, *endpoint.NodeName)
×
2547
                        }
×
2548
                }
2549
        }
2550
}
2551

2552
func (sep *serviceEndpoint) GetnodesMetadata(key string,
2553
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
2554
        cont := sep.cont
1✔
2555
        endpointsobj, exists, err := cont.endpointsIndexer.GetByKey(key)
1✔
2556
        if err != nil {
1✔
UNCOV
2557
                cont.log.Error("Could not lookup endpoints for " +
×
UNCOV
2558
                        key + ": " + err.Error())
×
UNCOV
2559
        }
×
2560
        if exists && endpointsobj != nil {
2✔
2561
                endpoints := endpointsobj.(*v1.Endpoints)
1✔
2562
                for _, subset := range endpoints.Subsets {
2✔
2563
                        for _, addr := range subset.Addresses {
2✔
2564
                                if addr.NodeName == nil {
2✔
2565
                                        continue
1✔
2566
                                }
2567
                                cont.setNodeMap(nodeMap, *addr.NodeName)
1✔
2568
                        }
2569
                }
2570
        }
2571
        cont.log.Info("NodeMap: ", nodeMap)
1✔
2572
}
2573

2574
func (seps *serviceEndpointSlice) GetnodesMetadata(key string,
2575
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
2576
        cont := seps.cont
1✔
2577
        // 1. Get all the Endpoint slices matching the label service-name
1✔
2578
        // 2. update the node map matching with endpoints nodes name
1✔
2579
        label := map[string]string{"kubernetes.io/service-name": service.ObjectMeta.Name}
1✔
2580
        selector := labels.SelectorFromSet(label)
1✔
2581
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2582
                func(endpointSliceobj interface{}) {
2✔
2583
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2584
                        for ix := range endpointSlices.Endpoints {
2✔
2585
                                if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
UNCOV
2586
                                        cont.setNodeMapDelay(nodeMap, &endpointSlices.Endpoints[ix], service)
×
2587
                                } else if cont.config.NoWaitForServiceEpReadiness ||
1✔
2588
                                        (endpointSlices.Endpoints[ix].Conditions.Ready != nil && *endpointSlices.Endpoints[ix].Conditions.Ready) {
2✔
2589
                                        if endpointSlices.Endpoints[ix].NodeName != nil && *endpointSlices.Endpoints[ix].NodeName != "" {
2✔
2590
                                                cont.setNodeMap(nodeMap, *endpointSlices.Endpoints[ix].NodeName)
1✔
2591
                                        }
1✔
2592
                                }
2593
                        }
2594
                })
2595
        cont.log.Debug("NodeMap: ", nodeMap)
1✔
2596
}
2597

2598
func (sep *serviceEndpoint) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2599
        cont := sep.cont
1✔
2600
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
2601
        if err != nil {
1✔
UNCOV
2602
                serviceLogger(cont.log, service).
×
UNCOV
2603
                        Error("Could not create service key: ", err)
×
UNCOV
2604
                return false
×
UNCOV
2605
        }
×
2606
        endpointsobj, _, err := cont.endpointsIndexer.GetByKey(key)
1✔
2607
        if err != nil {
1✔
2608
                cont.log.Error("Could not lookup endpoints for " +
×
2609
                        key + ": " + err.Error())
×
2610
                return false
×
UNCOV
2611
        }
×
2612
        if endpointsobj != nil {
2✔
2613
                for _, subset := range endpointsobj.(*v1.Endpoints).Subsets {
2✔
2614
                        for _, addr := range subset.Addresses {
2✔
2615
                                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" {
1✔
2616
                                        continue
×
2617
                                }
2618
                                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(),
1✔
2619
                                        addr.TargetRef.Name))
1✔
2620
                        }
2621
                }
2622
        }
2623
        return true
1✔
2624
}
2625

2626
func (seps *serviceEndpointSlice) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2627
        cont := seps.cont
1✔
2628
        label := map[string]string{"kubernetes.io/service-name": service.ObjectMeta.Name}
1✔
2629
        selector := labels.SelectorFromSet(label)
1✔
2630
        epcount := 0
1✔
2631
        childs := make(map[string]struct{})
1✔
2632
        var exists = struct{}{}
1✔
2633
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2634
                func(endpointSliceobj interface{}) {
2✔
2635
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2636
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2637
                                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" {
1✔
UNCOV
2638
                                        continue
×
2639
                                }
2640
                                epcount++
1✔
2641
                                childs[endpoint.TargetRef.Name] = exists
1✔
2642
                                cont.log.Debug("EndPoint added: ", endpoint.TargetRef.Name)
1✔
2643
                        }
2644
                })
2645
        for child := range childs {
2✔
2646
                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(), child))
1✔
2647
        }
1✔
2648
        return epcount != 0
1✔
2649
}
2650

2651
func getProtocolStr(proto v1.Protocol) string {
1✔
2652
        var protostring string
1✔
2653
        switch proto {
1✔
2654
        case v1.ProtocolUDP:
1✔
2655
                protostring = "udp"
1✔
2656
        case v1.ProtocolTCP:
1✔
2657
                protostring = "tcp"
1✔
UNCOV
2658
        case v1.ProtocolSCTP:
×
UNCOV
2659
                protostring = "sctp"
×
UNCOV
2660
        default:
×
UNCOV
2661
                protostring = "tcp"
×
2662
        }
2663
        return protostring
1✔
2664
}
2665

2666
func (cont *AciController) removeIpFromIngressIPList(ingressIps *[]net.IP, ip net.IP) {
×
UNCOV
2667
        cont.returnServiceIps([]net.IP{ip})
×
UNCOV
2668
        index := -1
×
UNCOV
2669
        for i, v := range *ingressIps {
×
UNCOV
2670
                if v.Equal(ip) {
×
2671
                        index = i
×
2672
                        break
×
2673
                }
2674
        }
2675
        if index == -1 {
×
2676
                return
×
2677
        }
×
UNCOV
2678
        *ingressIps = append((*ingressIps)[:index], (*ingressIps)[index+1:]...)
×
2679
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc