• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 10657

21 May 2025 10:05PM UTC coverage: 68.948% (-0.004%) from 68.952%
10657

push

travis-pro

web-flow
Merge pull request #1527 from noironetworks/resilient_hashing_backport

Enable Resilient Hashing in LoadBalancer PBR

2 of 4 new or added lines in 2 files covered. (50.0%)

7 existing lines in 3 files now uncovered.

13320 of 19319 relevant lines covered (68.95%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.88
/pkg/controller/services.go
1
// Copyright 2016 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "errors"
19
        "fmt"
20
        "net"
21
        "reflect"
22
        "regexp"
23
        "sort"
24
        "strconv"
25
        "strings"
26
        "time"
27

28
        "github.com/noironetworks/aci-containers/pkg/apicapi"
29
        "github.com/noironetworks/aci-containers/pkg/metadata"
30
        "github.com/noironetworks/aci-containers/pkg/util"
31
        "github.com/sirupsen/logrus"
32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35
        "k8s.io/apimachinery/pkg/fields"
36
        "k8s.io/apimachinery/pkg/labels"
37
        "k8s.io/client-go/kubernetes"
38
        "k8s.io/client-go/tools/cache"
39
)
40

41
// Default service contract scope value
42
const DefaultServiceContractScope = "context"
43

44
// Default service ext subnet scope - enable shared security
45
const DefaultServiceExtSubNetShared = false
46

47
func (cont *AciController) initEndpointsInformerFromClient(
48
        kubeClient kubernetes.Interface) {
×
49
        cont.initEndpointsInformerBase(
×
50
                cache.NewListWatchFromClient(
×
51
                        kubeClient.CoreV1().RESTClient(), "endpoints",
×
52
                        metav1.NamespaceAll, fields.Everything()))
×
53
}
×
54

55
func (cont *AciController) initEndpointSliceInformerFromClient(
56
        kubeClient kubernetes.Interface) {
×
57
        cont.initEndpointSliceInformerBase(
×
58
                cache.NewListWatchFromClient(
×
59
                        kubeClient.DiscoveryV1().RESTClient(), "endpointslices",
×
60
                        metav1.NamespaceAll, fields.Everything()))
×
61
}
×
62

63
func (cont *AciController) initEndpointSliceInformerBase(listWatch *cache.ListWatch) {
1✔
64
        cont.endpointSliceIndexer, cont.endpointSliceInformer = cache.NewIndexerInformer(
1✔
65
                listWatch, &discovery.EndpointSlice{}, 0,
1✔
66
                cache.ResourceEventHandlerFuncs{
1✔
67
                        AddFunc: func(obj interface{}) {
2✔
68
                                cont.endpointSliceAdded(obj)
1✔
69
                        },
1✔
70
                        UpdateFunc: func(oldobj interface{}, newobj interface{}) {
1✔
71
                                cont.endpointSliceUpdated(oldobj, newobj)
1✔
72
                        },
1✔
73
                        DeleteFunc: func(obj interface{}) {
1✔
74
                                cont.endpointSliceDeleted(obj)
1✔
75
                        },
1✔
76
                },
77
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
78
        )
79
}
80

81
func (cont *AciController) initEndpointsInformerBase(listWatch *cache.ListWatch) {
1✔
82
        cont.endpointsIndexer, cont.endpointsInformer = cache.NewIndexerInformer(
1✔
83
                listWatch, &v1.Endpoints{}, 0,
1✔
84
                cache.ResourceEventHandlerFuncs{
1✔
85
                        AddFunc: func(obj interface{}) {
2✔
86
                                cont.endpointsAdded(obj)
1✔
87
                        },
1✔
88
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
89
                                cont.endpointsUpdated(old, new)
1✔
90
                        },
1✔
91
                        DeleteFunc: func(obj interface{}) {
1✔
92
                                cont.endpointsDeleted(obj)
1✔
93
                        },
1✔
94
                },
95
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
96
        )
97
}
98

99
func (cont *AciController) initServiceInformerFromClient(
100
        kubeClient *kubernetes.Clientset) {
×
101
        cont.initServiceInformerBase(
×
102
                cache.NewListWatchFromClient(
×
103
                        kubeClient.CoreV1().RESTClient(), "services",
×
104
                        metav1.NamespaceAll, fields.Everything()))
×
105
}
×
106

107
func (cont *AciController) initServiceInformerBase(listWatch *cache.ListWatch) {
1✔
108
        cont.serviceIndexer, cont.serviceInformer = cache.NewIndexerInformer(
1✔
109
                listWatch, &v1.Service{}, 0,
1✔
110
                cache.ResourceEventHandlerFuncs{
1✔
111
                        AddFunc: func(obj interface{}) {
2✔
112
                                cont.serviceAdded(obj)
1✔
113
                        },
1✔
114
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
115
                                cont.serviceUpdated(old, new)
1✔
116
                        },
1✔
117
                        DeleteFunc: func(obj interface{}) {
×
118
                                cont.serviceDeleted(obj)
×
119
                        },
×
120
                },
121
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
122
        )
123
}
124

125
func serviceLogger(log *logrus.Logger, as *v1.Service) *logrus.Entry {
1✔
126
        return log.WithFields(logrus.Fields{
1✔
127
                "namespace": as.ObjectMeta.Namespace,
1✔
128
                "name":      as.ObjectMeta.Name,
1✔
129
                "type":      as.Spec.Type,
1✔
130
        })
1✔
131
}
1✔
132

133
func (cont *AciController) queueIPNetPolUpdates(ips map[string]bool) {
1✔
134
        for ipStr := range ips {
2✔
135
                ip := net.ParseIP(ipStr)
1✔
136
                if ip == nil {
1✔
137
                        continue
×
138
                }
139
                entries, err := cont.netPolSubnetIndex.ContainingNetworks(ip)
1✔
140
                if err != nil {
1✔
141
                        cont.log.Error("Corrupted network policy IP index, err: ", err)
×
142
                        return
×
143
                }
×
144
                for _, entry := range entries {
2✔
145
                        for npkey := range entry.(*ipIndexEntry).keys {
2✔
146
                                cont.queueNetPolUpdateByKey(npkey)
1✔
147
                        }
1✔
148
                }
149
        }
150
}
151

152
func (cont *AciController) queuePortNetPolUpdates(ports map[string]targetPort) {
1✔
153
        for portkey := range ports {
2✔
154
                entry := cont.targetPortIndex[portkey]
1✔
155
                if entry == nil {
2✔
156
                        continue
1✔
157
                }
158
                for npkey := range entry.networkPolicyKeys {
2✔
159
                        cont.queueNetPolUpdateByKey(npkey)
1✔
160
                }
1✔
161
        }
162
}
163

164
func (cont *AciController) queueNetPolForEpAddrs(addrs []v1.EndpointAddress) {
1✔
165
        for _, addr := range addrs {
2✔
166
                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" ||
1✔
167
                        addr.TargetRef.Namespace == "" || addr.TargetRef.Name == "" {
2✔
168
                        continue
1✔
169
                }
170
                podkey := addr.TargetRef.Namespace + "/" + addr.TargetRef.Name
1✔
171
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
172
                ps := make(map[string]bool)
1✔
173
                for _, npkey := range npkeys {
2✔
174
                        cont.queueNetPolUpdateByKey(npkey)
1✔
175
                        ps[npkey] = true
1✔
176
                }
1✔
177
                // Process if the  any matching namedport wildcard policy is present
178
                // ignore np already processed policies
179
                cont.queueMatchingNamedNp(ps, podkey)
1✔
180
        }
181
}
182

183
func (cont *AciController) queueMatchingNamedNp(served map[string]bool, podkey string) {
1✔
184
        cont.indexMutex.Lock()
1✔
185
        for npkey := range cont.nmPortNp {
2✔
186
                if _, ok := served[npkey]; !ok {
2✔
187
                        if cont.checkPodNmpMatchesNp(npkey, podkey) {
2✔
188
                                cont.queueNetPolUpdateByKey(npkey)
1✔
189
                        }
1✔
190
                }
191
        }
192
        cont.indexMutex.Unlock()
1✔
193
}
194
func (cont *AciController) queueEndpointsNetPolUpdates(endpoints *v1.Endpoints) {
1✔
195
        for _, subset := range endpoints.Subsets {
2✔
196
                cont.queueNetPolForEpAddrs(subset.Addresses)
1✔
197
                cont.queueNetPolForEpAddrs(subset.NotReadyAddresses)
1✔
198
        }
1✔
199
}
200

201
func (cont *AciController) returnServiceIps(ips []net.IP) {
1✔
202
        for _, ip := range ips {
2✔
203
                if ip.To4() != nil {
2✔
204
                        cont.serviceIps.DeallocateIp(ip)
1✔
205
                } else if ip.To16() != nil {
3✔
206
                        cont.serviceIps.DeallocateIp(ip)
1✔
207
                }
1✔
208
        }
209
}
210

211
func returnIps(pool *netIps, ips []net.IP) {
1✔
212
        for _, ip := range ips {
2✔
213
                if ip.To4() != nil {
2✔
214
                        pool.V4.AddIp(ip)
1✔
215
                } else if ip.To16() != nil {
3✔
216
                        pool.V6.AddIp(ip)
1✔
217
                }
1✔
218
        }
219
}
220

221
func (cont *AciController) staticMonPolName() string {
1✔
222
        return cont.aciNameForKey("monPol", cont.env.ServiceBd())
1✔
223
}
1✔
224

225
func (cont *AciController) staticMonPolDn() string {
1✔
226
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
227
                return fmt.Sprintf("uni/tn-%s/ipslaMonitoringPol-%s",
1✔
228
                        cont.config.AciVrfTenant, cont.staticMonPolName())
1✔
229
        }
1✔
230
        return ""
×
231
}
232

233
func (cont *AciController) staticServiceObjs() apicapi.ApicSlice {
1✔
234
        var serviceObjs apicapi.ApicSlice
1✔
235

1✔
236
        // Service bridge domain
1✔
237
        bdName := cont.aciNameForKey("bd", cont.env.ServiceBd())
1✔
238
        bd := apicapi.NewFvBD(cont.config.AciVrfTenant, bdName)
1✔
239
        if apicapi.ApicVersion >= "6.0(4c)" {
1✔
240
                bd.SetAttr("serviceBdRoutingDisable", "yes")
×
241
        }
×
242
        bd.SetAttr("arpFlood", "yes")
1✔
243
        bd.SetAttr("ipLearning", "no")
1✔
244
        bd.SetAttr("unkMacUcastAct", cont.config.UnknownMacUnicastAction)
1✔
245
        bdToOut := apicapi.NewRsBdToOut(bd.GetDn(), cont.config.AciL3Out)
1✔
246
        bd.AddChild(bdToOut)
1✔
247
        bdToVrf := apicapi.NewRsCtx(bd.GetDn(), cont.config.AciVrf)
1✔
248
        bd.AddChild(bdToVrf)
1✔
249

1✔
250
        bdn := bd.GetDn()
1✔
251
        for _, cidr := range cont.config.NodeServiceSubnets {
2✔
252
                sn := apicapi.NewFvSubnet(bdn, cidr)
1✔
253
                bd.AddChild(sn)
1✔
254
        }
1✔
255
        serviceObjs = append(serviceObjs, bd)
1✔
256

1✔
257
        // Service IP SLA monitoring policy
1✔
258
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
259
                monPol := apicapi.NewFvIPSLAMonitoringPol(cont.config.AciVrfTenant,
1✔
260
                        cont.staticMonPolName())
1✔
261
                monPol.SetAttr("slaFrequency",
1✔
262
                        strconv.Itoa(cont.config.AciServiceMonitorInterval))
1✔
263
                serviceObjs = append(serviceObjs, monPol)
1✔
264
        }
1✔
265

266
        return serviceObjs
1✔
267
}
268

269
func (cont *AciController) initStaticServiceObjs() {
1✔
270
        cont.apicConn.WriteApicObjects(cont.config.AciPrefix+"_service_static",
1✔
271
                cont.staticServiceObjs())
1✔
272
}
1✔
273

274
func (cont *AciController) updateServicesForNode(nodename string) {
1✔
275
        cont.serviceEndPoints.UpdateServicesForNode(nodename)
1✔
276
}
1✔
277

278
// must have index lock
279
func (cont *AciController) getActiveFabricPathDn(node string) string {
×
280
        var fabricPathDn string
×
281
        sz := len(cont.nodeOpflexDevice[node])
×
282
        for i := range cont.nodeOpflexDevice[node] {
×
283
                device := cont.nodeOpflexDevice[node][sz-1-i]
×
284
                if device.GetAttrStr("state") == "connected" {
×
285
                        fabricPathDn = device.GetAttrStr("fabricPathDn")
×
286
                        break
×
287
                }
288
        }
289
        return fabricPathDn
×
290
}
291

292
func (cont *AciController) getOpflexOdevCount(node string) (int, string) {
×
293
        fabricPathDnCount := make(map[string]int)
×
294
        var maxCount int
×
295
        var fabricPathDn string
×
296
        for _, device := range cont.nodeOpflexDevice[node] {
×
297
                fabricpathdn := device.GetAttrStr("fabricPathDn")
×
298
                count, ok := fabricPathDnCount[fabricpathdn]
×
299
                if ok {
×
300
                        fabricPathDnCount[fabricpathdn] = count + 1
×
301
                } else {
×
302
                        fabricPathDnCount[fabricpathdn] = 1
×
303
                }
×
304
                if fabricPathDnCount[fabricpathdn] >= maxCount {
×
305
                        maxCount = fabricPathDnCount[fabricpathdn]
×
306
                        fabricPathDn = fabricpathdn
×
307
                }
×
308
        }
309
        return maxCount, fabricPathDn
×
310
}
311

312
func deleteDevicesFromList(delDevices, devices apicapi.ApicSlice) apicapi.ApicSlice {
×
313
        var newDevices apicapi.ApicSlice
×
314
        for _, device := range devices {
×
315
                found := false
×
316
                for _, delDev := range delDevices {
×
317
                        if reflect.DeepEqual(delDev, device) {
×
318
                                found = true
×
319
                        }
×
320
                }
321
                if !found {
×
322
                        newDevices = append(newDevices, device)
×
323
                }
×
324
        }
325
        return newDevices
×
326
}
327

328
func (cont *AciController) getAciPodSubnet(pod string) (string, error) {
×
329
        podslice := strings.Split(pod, "-")
×
330
        if len(podslice) < 2 {
×
331
                return "", fmt.Errorf("Failed to get podid from pod")
×
332
        }
×
333
        podid := podslice[1]
×
334
        var subnet string
×
335
        args := []string{
×
336
                "query-target=self",
×
337
        }
×
338
        url := fmt.Sprintf("/api/node/mo/uni/controller/setuppol/setupp-%s.json?%s", podid, strings.Join(args, "&"))
×
339
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
340
        if err != nil {
×
341
                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
342
                return subnet, err
×
343
        }
×
344
        for _, obj := range apicresp.Imdata {
×
345
                for _, body := range obj {
×
346
                        tepPool, ok := body.Attributes["tepPool"].(string)
×
347
                        if ok {
×
348
                                subnet = tepPool
×
349
                                break
×
350
                        }
351
                }
352
        }
353
        return subnet, nil
×
354
}
355

356
func (cont *AciController) createAciPodAnnotation(node string) (aciPodAnnot, error) {
×
357
        odevCount, fabricPathDn := cont.getOpflexOdevCount(node)
×
358
        nodeAciPodAnnot := cont.nodeACIPod[node]
×
359
        isSingleOdev := nodeAciPodAnnot.isSingleOpflexOdev
×
360
        if (odevCount == 0) ||
×
361
                (odevCount == 1 && !isSingleOdev) ||
×
362
                (odevCount == 1 && isSingleOdev && !nodeAciPodAnnot.disconnectTime.IsZero()) {
×
363
                if nodeAciPodAnnot.aciPod != "none" {
×
364
                        if nodeAciPodAnnot.disconnectTime.IsZero() {
×
365
                                nodeAciPodAnnot.disconnectTime = time.Now()
×
366
                        } else {
×
367
                                currentTime := time.Now()
×
368
                                diff := currentTime.Sub(nodeAciPodAnnot.disconnectTime)
×
369
                                if diff.Seconds() > float64(cont.config.OpflexDeviceReconnectWaitTimeout) {
×
370
                                        nodeAciPodAnnot.aciPod = "none"
×
371
                                        nodeAciPodAnnot.disconnectTime = time.Time{}
×
372
                                }
×
373
                        }
374
                }
375
                if odevCount == 1 {
×
376
                        nodeAciPodAnnot.isSingleOpflexOdev = true
×
377
                }
×
378
                nodeAciPodAnnot.connectTime = time.Time{}
×
379
                return nodeAciPodAnnot, nil
×
380
        } else if (odevCount == 2) ||
×
381
                (odevCount == 1 && isSingleOdev && nodeAciPodAnnot.disconnectTime.IsZero()) {
×
382
                // when there is already a connected opflex device,
×
383
                // fabricPathDn will have latest pod iformation
×
384
                // and annotation will be in the form pod-<podid>-<subnet of pod>
×
385
                nodeAciPodAnnot.disconnectTime = time.Time{}
×
386
                nodeAciPod := nodeAciPodAnnot.aciPod
×
387
                if odevCount == 2 {
×
388
                        nodeAciPodAnnot.isSingleOpflexOdev = false
×
389
                }
×
390
                if odevCount == 1 && nodeAciPod == "none" {
×
391
                        if nodeAciPodAnnot.connectTime.IsZero() {
×
392
                                nodeAciPodAnnot.connectTime = time.Now()
×
393
                                return nodeAciPodAnnot, nil
×
394
                        } else {
×
395
                                currentTime := time.Now()
×
396
                                diff := currentTime.Sub(nodeAciPodAnnot.connectTime)
×
397
                                if diff.Seconds() < float64(10) {
×
398
                                        return nodeAciPodAnnot, nil
×
399
                                }
×
400
                        }
401
                }
402
                nodeAciPodAnnot.connectTime = time.Time{}
×
403
                pathSlice := strings.Split(fabricPathDn, "/")
×
404
                if len(pathSlice) > 1 {
×
405
                        pod := pathSlice[1]
×
406

×
407
                        // when there is difference in pod info avaliable from fabricPathDn
×
408
                        // and what we have in cache, update info in cache and change annotation on node
×
409
                        if !strings.Contains(nodeAciPod, pod) {
×
410
                                subnet, err := cont.getAciPodSubnet(pod)
×
411
                                if err != nil {
×
412
                                        cont.log.Error("Failed to get subnet of aci pod ", err.Error())
×
413
                                        return nodeAciPodAnnot, err
×
414
                                } else {
×
415
                                        nodeAciPodAnnot.aciPod = pod + "-" + subnet
×
416
                                        return nodeAciPodAnnot, nil
×
417
                                }
×
418
                        } else {
×
419
                                return nodeAciPodAnnot, nil
×
420
                        }
×
421
                } else {
×
422
                        cont.log.Error("Invalid fabricPathDn of opflexOdev of node ", node)
×
423
                        return nodeAciPodAnnot, fmt.Errorf("Invalid fabricPathDn of opflexOdev")
×
424
                }
×
425
        }
426
        return nodeAciPodAnnot, fmt.Errorf("Failed to get annotation for node %s", node)
×
427
}
428

429
func (cont *AciController) createNodeAciPodAnnotation(node string) (aciPodAnnot, error) {
×
430
        odevCount, fabricPathDn := cont.getOpflexOdevCount(node)
×
431
        nodeAciPodAnnot := cont.nodeACIPodAnnot[node]
×
432
        isSingleOdev := nodeAciPodAnnot.isSingleOpflexOdev
×
433
        if (odevCount == 0) ||
×
434
                (odevCount == 1 && !isSingleOdev) {
×
435
                if nodeAciPodAnnot.aciPod != "none" {
×
436
                        nodeAciPodAnnot.aciPod = "none"
×
437
                }
×
438
                if odevCount == 1 {
×
439
                        nodeAciPodAnnot.isSingleOpflexOdev = true
×
440
                }
×
441
                nodeAciPodAnnot.connectTime = time.Time{}
×
442
                return nodeAciPodAnnot, nil
×
443
        } else if (odevCount == 2) ||
×
444
                (odevCount == 1 && isSingleOdev) {
×
445
                nodeAciPod := nodeAciPodAnnot.aciPod
×
446
                if odevCount == 2 {
×
447
                        nodeAciPodAnnot.isSingleOpflexOdev = false
×
448
                }
×
449
                if odevCount == 1 && nodeAciPod == "none" {
×
450
                        if nodeAciPodAnnot.connectTime.IsZero() {
×
451
                                nodeAciPodAnnot.connectTime = time.Now()
×
452
                                return nodeAciPodAnnot, nil
×
453
                        } else {
×
454
                                currentTime := time.Now()
×
455
                                diff := currentTime.Sub(nodeAciPodAnnot.connectTime)
×
456
                                if diff.Seconds() < float64(10) {
×
457
                                        return nodeAciPodAnnot, nil
×
458
                                }
×
459
                        }
460
                }
461
                nodeAciPodAnnot.connectTime = time.Time{}
×
462
                pathSlice := strings.Split(fabricPathDn, "/")
×
463
                if len(pathSlice) > 1 {
×
464

×
465
                        nodeAciPodAnnot.aciPod = pathSlice[1]
×
466
                        return nodeAciPodAnnot, nil
×
467
                } else {
×
468
                        cont.log.Error("Invalid fabricPathDn of opflexOdev of node ", node)
×
469
                        return nodeAciPodAnnot, fmt.Errorf("Invalid fabricPathDn of opflexOdev")
×
470
                }
×
471
        }
472
        return nodeAciPodAnnot, fmt.Errorf("Failed to get annotation for node %s", node)
×
473
}
474

475
func (cont *AciController) checkChangeOfOpflexOdevAciPod() {
×
476
        var nodeAnnotationUpdates []string
×
477
        cont.apicConn.SyncMutex.Lock()
×
478
        syncDone := cont.apicConn.SyncDone
×
479
        cont.apicConn.SyncMutex.Unlock()
×
480

×
481
        if !syncDone {
×
482
                return
×
483
        }
×
484

485
        cont.indexMutex.Lock()
×
486
        for node := range cont.nodeACIPodAnnot {
×
487
                annot, err := cont.createNodeAciPodAnnotation(node)
×
488
                if err != nil {
×
489
                        if strings.Contains(fmt.Sprint(err), "Failed to get annotation") {
×
490
                                now := time.Now()
×
491
                                if annot.lastErrorTime.IsZero() || now.Sub(annot.lastErrorTime).Seconds() >= 60 {
×
492
                                        annot.lastErrorTime = now
×
493
                                        cont.nodeACIPodAnnot[node] = annot
×
494
                                        cont.log.Error(err.Error())
×
495
                                }
×
496
                        } else {
×
497
                                cont.log.Error(err.Error())
×
498
                        }
×
499
                } else {
×
500
                        if annot != cont.nodeACIPodAnnot[node] {
×
501
                                cont.nodeACIPodAnnot[node] = annot
×
502
                                nodeAnnotationUpdates = append(nodeAnnotationUpdates, node)
×
503
                        }
×
504
                }
505
        }
506
        cont.indexMutex.Unlock()
×
507
        if len(nodeAnnotationUpdates) > 0 {
×
508
                for _, updatednode := range nodeAnnotationUpdates {
×
509
                        go cont.env.NodeAnnotationChanged(updatednode)
×
510
                }
×
511
        }
512
}
513

514
func (cont *AciController) checkChangeOfOdevAciPod() {
×
515
        var nodeAnnotationUpdates []string
×
516
        cont.apicConn.SyncMutex.Lock()
×
517
        syncDone := cont.apicConn.SyncDone
×
518
        cont.apicConn.SyncMutex.Unlock()
×
519

×
520
        if !syncDone {
×
521
                return
×
522
        }
×
523

524
        cont.indexMutex.Lock()
×
525
        for node := range cont.nodeACIPod {
×
526
                annot, err := cont.createAciPodAnnotation(node)
×
527
                if err != nil {
×
528
                        if strings.Contains(fmt.Sprint(err), "Failed to get annotation") {
×
529
                                now := time.Now()
×
530
                                if annot.lastErrorTime.IsZero() || now.Sub(annot.lastErrorTime).Seconds() >= 60 {
×
531
                                        annot.lastErrorTime = now
×
532
                                        cont.nodeACIPod[node] = annot
×
533
                                        cont.log.Error(err.Error())
×
534
                                }
×
535
                        } else {
×
536
                                cont.log.Error(err.Error())
×
537
                        }
×
538
                } else {
×
539
                        if annot != cont.nodeACIPod[node] {
×
540
                                cont.nodeACIPod[node] = annot
×
541
                                nodeAnnotationUpdates = append(nodeAnnotationUpdates, node)
×
542
                        }
×
543
                }
544
        }
545
        cont.indexMutex.Unlock()
×
546
        if len(nodeAnnotationUpdates) > 0 {
×
547
                for _, updatednode := range nodeAnnotationUpdates {
×
548
                        go cont.env.NodeAnnotationChanged(updatednode)
×
549
                }
×
550
        }
551
}
552

553
func (cont *AciController) deleteOldOpflexDevices() {
1✔
554
        var nodeUpdates []string
1✔
555
        cont.indexMutex.Lock()
1✔
556
        for node, devices := range cont.nodeOpflexDevice {
1✔
557
                var delDevices apicapi.ApicSlice
×
558
                fabricPathDn := cont.getActiveFabricPathDn(node)
×
559
                if fabricPathDn != "" {
×
560
                        for _, device := range devices {
×
561
                                if device.GetAttrStr("delete") == "true" && device.GetAttrStr("fabricPathDn") != fabricPathDn {
×
562
                                        deleteTimeStr := device.GetAttrStr("deleteTime")
×
563
                                        deleteTime, err := time.Parse(time.RFC3339, deleteTimeStr)
×
564
                                        if err != nil {
×
565
                                                cont.log.Error("Failed to parse opflex device delete time: ", err)
×
566
                                                continue
×
567
                                        }
568
                                        now := time.Now()
×
569
                                        diff := now.Sub(deleteTime)
×
570
                                        if diff.Seconds() >= cont.config.OpflexDeviceDeleteTimeout {
×
571
                                                delDevices = append(delDevices, device)
×
572
                                        }
×
573
                                }
574
                        }
575
                        if len(delDevices) > 0 {
×
576
                                newDevices := deleteDevicesFromList(delDevices, devices)
×
577
                                cont.nodeOpflexDevice[node] = newDevices
×
578
                                cont.log.Info("Opflex device list for node ", node, " after deleting stale entries: ", cont.nodeOpflexDevice[node])
×
579
                                if len(newDevices) == 0 {
×
580
                                        delete(cont.nodeOpflexDevice, node)
×
581
                                }
×
582
                                nodeUpdates = append(nodeUpdates, node)
×
583
                        }
584
                }
585
        }
586
        cont.indexMutex.Unlock()
1✔
587
        if len(nodeUpdates) > 0 {
1✔
588
                cont.postOpflexDeviceDelete(nodeUpdates)
×
589
        }
×
590
}
591

592
// must have index lock
593
func (cont *AciController) setDeleteFlagForOldDevices(node, fabricPathDn string) {
1✔
594
        for _, device := range cont.nodeOpflexDevice[node] {
2✔
595
                if device.GetAttrStr("fabricPathDn") != fabricPathDn {
1✔
596
                        t := time.Now()
×
597
                        device.SetAttr("delete", "true")
×
598
                        device.SetAttr("deleteTime", t.Format(time.RFC3339))
×
599
                }
×
600
        }
601
}
602

603
// must have index lock
604
func (cont *AciController) fabricPathForNode(name string) (string, bool) {
1✔
605
        sz := len(cont.nodeOpflexDevice[name])
1✔
606
        for i := range cont.nodeOpflexDevice[name] {
2✔
607
                device := cont.nodeOpflexDevice[name][sz-1-i]
1✔
608
                deviceState := device.GetAttrStr("state")
1✔
609
                if deviceState == "connected" {
2✔
610
                        if deviceState != device.GetAttrStr("prevState") {
2✔
611
                                cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Info("Processing fabric path for node ",
1✔
612
                                        "when connected device state is found")
1✔
613
                                device.SetAttr("prevState", deviceState)
1✔
614
                        }
1✔
615
                        fabricPathDn := device.GetAttrStr("fabricPathDn")
1✔
616
                        cont.setDeleteFlagForOldDevices(name, fabricPathDn)
1✔
617
                        return fabricPathDn, true
1✔
618
                } else {
1✔
619
                        device.SetAttr("prevState", deviceState)
1✔
620
                }
1✔
621
        }
622
        if sz > 0 {
2✔
623
                // When the opflex-device for a node changes, for example during a live migration,
1✔
624
                // we end up with both the old and the new device objects till the old object
1✔
625
                // ages out on APIC. The new object is at end of the devices list (see opflexDeviceChanged),
1✔
626
                // so we return the fabricPathDn of the last opflex-device.
1✔
627
                cont.fabricPathLogger(cont.nodeOpflexDevice[name][sz-1].GetAttrStr("hostName"),
1✔
628
                        cont.nodeOpflexDevice[name][sz-1]).Info("Processing fabricPathDn for node")
1✔
629
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("fabricPathDn"), true
1✔
630
        }
1✔
631
        return "", false
1✔
632
}
633

634
// must have index lock
635
func (cont *AciController) deviceMacForNode(name string) (string, bool) {
1✔
636
        sz := len(cont.nodeOpflexDevice[name])
1✔
637
        if sz > 0 {
2✔
638
                // When the opflex-device for a node changes, for example when the
1✔
639
                // node is recreated, we end up with both the old and the new
1✔
640
                // device objects till the old object ages out on APIC. The
1✔
641
                // new object is at end of the devices list (see
1✔
642
                // opflexDeviceChanged), so we return the MAC address of the
1✔
643
                // last opflex-device.
1✔
644
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("mac"), true
1✔
645
        }
1✔
646
        return "", false
1✔
647
}
648

649
func apicRedirectDst(rpDn string, ip string, mac string,
650
        descr string, healthGroupDn string, enablePbrTracking bool) apicapi.ApicObject {
1✔
651
        dst := apicapi.NewVnsRedirectDest(rpDn, ip, mac).SetAttr("descr", descr)
1✔
652
        if healthGroupDn != "" && enablePbrTracking {
2✔
653
                dst.AddChild(apicapi.NewVnsRsRedirectHealthGroup(dst.GetDn(),
1✔
654
                        healthGroupDn))
1✔
655
        }
1✔
656
        return dst
1✔
657
}
658

659
func (cont *AciController) apicRedirectPol(name string, tenantName string, nodes []string,
660
        nodeMap map[string]*metadata.ServiceEndpoint,
661
        monPolDn string, enablePbrTracking bool) (apicapi.ApicObject, string) {
1✔
662
        rp := apicapi.NewVnsSvcRedirectPol(tenantName, name)
1✔
663
        rp.SetAttr("thresholdDownAction", "deny")
1✔
664
        if cont.config.DisableResilientHashing {
1✔
NEW
665
                rp.SetAttr("resilientHashEnabled", "no")
×
NEW
666
        }
×
667
        rpDn := rp.GetDn()
1✔
668
        for _, node := range nodes {
2✔
669
                cont.indexMutex.Lock()
1✔
670
                serviceEp, ok := nodeMap[node]
1✔
671
                if !ok {
1✔
672
                        continue
×
673
                }
674
                if serviceEp.Ipv4 != nil {
2✔
675
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv4.String(),
1✔
676
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
1✔
677
                }
1✔
678
                if serviceEp.Ipv6 != nil {
1✔
679
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv6.String(),
×
680
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
×
681
                }
×
682
                cont.indexMutex.Unlock()
1✔
683
        }
684
        if monPolDn != "" && enablePbrTracking {
2✔
685
                rp.AddChild(apicapi.NewVnsRsIPSLAMonitoringPol(rpDn, monPolDn))
1✔
686
        }
1✔
687
        return rp, rpDn
1✔
688
}
689

690
func apicExtNetCreate(enDn string, ingress string, ipv4 bool,
691
        cidr bool, sharedSec bool) apicapi.ApicObject {
1✔
692
        if !cidr {
2✔
693
                if ipv4 {
2✔
694
                        ingress += "/32"
1✔
695
                } else {
1✔
696
                        ingress += "/128"
×
697
                }
×
698
        }
699
        subnet := apicapi.NewL3extSubnet(enDn, ingress, "", "")
1✔
700
        if sharedSec {
2✔
701
                subnet.SetAttr("scope", "import-security,shared-security")
1✔
702
        }
1✔
703
        return subnet
1✔
704
}
705

706
func apicExtNet(name string, tenantName string, l3Out string,
707
        ingresses []string, sharedSecurity bool, snat bool) apicapi.ApicObject {
1✔
708
        en := apicapi.NewL3extInstP(tenantName, l3Out, name)
1✔
709
        enDn := en.GetDn()
1✔
710
        if snat {
2✔
711
                en.AddChild(apicapi.NewFvRsCons(enDn, name))
1✔
712
        } else {
2✔
713
                en.AddChild(apicapi.NewFvRsProv(enDn, name))
1✔
714
        }
1✔
715

716
        for _, ingress := range ingresses {
2✔
717
                ip, _, _ := net.ParseCIDR(ingress)
1✔
718
                // If ingress is a subnet
1✔
719
                if ip != nil {
2✔
720
                        if ip != nil && ip.To4() != nil {
2✔
721
                                subnet := apicExtNetCreate(enDn, ingress, true, true, sharedSecurity)
1✔
722
                                en.AddChild(subnet)
1✔
723
                        } else if ip != nil && ip.To16() != nil {
1✔
724
                                subnet := apicExtNetCreate(enDn, ingress, false, true, sharedSecurity)
×
725
                                en.AddChild(subnet)
×
726
                        }
×
727
                } else {
1✔
728
                        // If ingress is an IP address
1✔
729
                        ip := net.ParseIP(ingress)
1✔
730
                        if ip != nil && ip.To4() != nil {
2✔
731
                                subnet := apicExtNetCreate(enDn, ingress, true, false, sharedSecurity)
1✔
732
                                en.AddChild(subnet)
1✔
733
                        } else if ip != nil && ip.To16() != nil {
1✔
734
                                subnet := apicExtNetCreate(enDn, ingress, false, false, sharedSecurity)
×
735
                                en.AddChild(subnet)
×
736
                        }
×
737
                }
738
        }
739
        return en
1✔
740
}
741

742
func apicDefaultEgCons(conName string, tenantName string,
743
        appProfile string, epg string) apicapi.ApicObject {
×
744
        enDn := fmt.Sprintf("uni/tn-%s/ap-%s/epg-%s", tenantName, appProfile, epg)
×
745
        return apicapi.NewFvRsCons(enDn, conName)
×
746
}
×
747

748
func apicExtNetCons(conName string, tenantName string,
749
        l3Out string, net string) apicapi.ApicObject {
1✔
750
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
751
        return apicapi.NewFvRsCons(enDn, conName)
1✔
752
}
1✔
753

754
func apicExtNetProv(conName string, tenantName string,
755
        l3Out string, net string) apicapi.ApicObject {
1✔
756
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
757
        return apicapi.NewFvRsProv(enDn, conName)
1✔
758
}
1✔
759

760
// Helper function to check if a string item exists in a slice
761
func stringInSlice(str string, list []string) bool {
1✔
762
        for _, v := range list {
2✔
763
                if v == str {
2✔
764
                        return true
1✔
765
                }
1✔
766
        }
767
        return false
×
768
}
769

770
func validScope(scope string) bool {
1✔
771
        validValues := []string{"", "context", "tenant", "global"}
1✔
772
        return stringInSlice(scope, validValues)
1✔
773
}
1✔
774

775
func (cont *AciController) getGraphNameFromContract(name, tenantName string) (string, error) {
×
776
        var graphName string
×
777
        args := []string{
×
778
                "query-target=subtree",
×
779
        }
×
780
        url := fmt.Sprintf("/api/node/mo/uni/tn-%s/brc-%s.json?%s", tenantName, name, strings.Join(args, "&"))
×
781
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
782
        if err != nil {
×
783
                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
784
                return graphName, err
×
785
        }
×
786
        for _, obj := range apicresp.Imdata {
×
787
                for class, body := range obj {
×
788
                        if class == "vzRsSubjGraphAtt" {
×
789
                                tnVnsAbsGraphName, ok := body.Attributes["tnVnsAbsGraphName"].(string)
×
790
                                if ok {
×
791
                                        graphName = tnVnsAbsGraphName
×
792
                                }
×
793
                                break
×
794
                        }
795
                }
796
        }
797
        cont.log.Debug("graphName: ", graphName)
×
798
        return graphName, err
×
799
}
800

801
func apicContract(conName string, tenantName string,
802
        graphName string, scopeName string, isSnatPbrFltrChain bool,
803
        customSGAnnot bool) apicapi.ApicObject {
1✔
804
        con := apicapi.NewVzBrCP(tenantName, conName)
1✔
805
        if scopeName != "" && scopeName != "context" {
2✔
806
                con.SetAttr("scope", scopeName)
1✔
807
        }
1✔
808
        cs := apicapi.NewVzSubj(con.GetDn(), "loadbalancedservice")
1✔
809
        csDn := cs.GetDn()
1✔
810
        if isSnatPbrFltrChain {
2✔
811
                cs.SetAttr("revFltPorts", "no")
1✔
812
                inTerm := apicapi.NewVzInTerm(csDn)
1✔
813
                outTerm := apicapi.NewVzOutTerm(csDn)
1✔
814
                inTerm.AddChild(apicapi.NewVzRsInTermGraphAtt(inTerm.GetDn(), graphName))
1✔
815
                inTerm.AddChild(apicapi.NewVzRsFiltAtt(inTerm.GetDn(), conName+"_fromCons-toProv"))
1✔
816
                outTerm.AddChild(apicapi.NewVzRsOutTermGraphAtt(outTerm.GetDn(), graphName))
1✔
817
                outTerm.AddChild(apicapi.NewVzRsFiltAtt(outTerm.GetDn(), conName+"_fromProv-toCons"))
1✔
818
                cs.AddChild(inTerm)
1✔
819
                cs.AddChild(outTerm)
1✔
820
        } else {
2✔
821
                cs.AddChild(apicapi.NewVzRsSubjGraphAtt(csDn, graphName, customSGAnnot))
1✔
822
                cs.AddChild(apicapi.NewVzRsSubjFiltAtt(csDn, conName))
1✔
823
        }
1✔
824
        con.AddChild(cs)
1✔
825
        return con
1✔
826
}
827

828
func apicDevCtx(name string, tenantName string,
829
        graphName string, deviceName string, bdName string, rpDn string, isSnatPbrFltrChain bool) apicapi.ApicObject {
1✔
830
        cc := apicapi.NewVnsLDevCtx(tenantName, name, graphName, "loadbalancer")
1✔
831
        ccDn := cc.GetDn()
1✔
832
        graphDn := fmt.Sprintf("uni/tn-%s/lDevVip-%s", tenantName, deviceName)
1✔
833
        lifDn := fmt.Sprintf("%s/lIf-%s", graphDn, "interface")
1✔
834
        bdDn := fmt.Sprintf("uni/tn-%s/BD-%s", tenantName, bdName)
1✔
835
        cc.AddChild(apicapi.NewVnsRsLDevCtxToLDev(ccDn, graphDn))
1✔
836
        rpDnBase := rpDn
1✔
837
        for _, ctxConn := range []string{"consumer", "provider"} {
2✔
838
                lifCtx := apicapi.NewVnsLIfCtx(ccDn, ctxConn)
1✔
839
                if isSnatPbrFltrChain {
2✔
840
                        if ctxConn == "consumer" {
2✔
841
                                rpDn = rpDnBase + "_Cons"
1✔
842
                        } else {
2✔
843
                                rpDn = rpDnBase + "_Prov"
1✔
844
                        }
1✔
845
                }
846
                lifCtxDn := lifCtx.GetDn()
1✔
847
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToSvcRedirectPol(lifCtxDn, rpDn))
1✔
848
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToBD(lifCtxDn, bdDn))
1✔
849
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToLIf(lifCtxDn, lifDn))
1✔
850
                cc.AddChild(lifCtx)
1✔
851
        }
852
        return cc
1✔
853
}
854

855
func apicFilterEntry(filterDn string, count string, p_start string,
856
        p_end string, protocol string, stateful string, snat bool, outTerm bool) apicapi.ApicObject {
1✔
857
        fe := apicapi.NewVzEntry(filterDn, count)
1✔
858
        fe.SetAttr("etherT", "ip")
1✔
859
        fe.SetAttr("prot", protocol)
1✔
860
        if snat {
2✔
861
                if outTerm {
2✔
862
                        if protocol == "tcp" {
2✔
863
                                fe.SetAttr("tcpRules", "est")
1✔
864
                        }
1✔
865
                        // Reverse the ports for outTerm
866
                        fe.SetAttr("dFromPort", p_start)
1✔
867
                        fe.SetAttr("dToPort", p_end)
1✔
868
                } else {
1✔
869
                        fe.SetAttr("sFromPort", p_start)
1✔
870
                        fe.SetAttr("sToPort", p_end)
1✔
871
                }
1✔
872
        } else {
1✔
873
                fe.SetAttr("dFromPort", p_start)
1✔
874
                fe.SetAttr("dToPort", p_end)
1✔
875
        }
1✔
876
        fe.SetAttr("stateful", stateful)
1✔
877
        return fe
1✔
878
}
879
func apicFilter(name string, tenantName string,
880
        portSpec []v1.ServicePort, snat bool, snatRange portRangeSnat) apicapi.ApicObject {
1✔
881
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
882
        filterDn := filter.GetDn()
1✔
883

1✔
884
        var i int
1✔
885
        var port v1.ServicePort
1✔
886
        for i, port = range portSpec {
2✔
887
                pstr := strconv.Itoa(int(port.Port))
1✔
888
                proto := getProtocolStr(port.Protocol)
1✔
889
                fe := apicFilterEntry(filterDn, strconv.Itoa(i), pstr,
1✔
890
                        pstr, proto, "no", false, false)
1✔
891
                filter.AddChild(fe)
1✔
892
        }
1✔
893

894
        if snat {
1✔
895
                portSpec := []portRangeSnat{snatRange}
×
896
                p_start := strconv.Itoa(portSpec[0].start)
×
897
                p_end := strconv.Itoa(portSpec[0].end)
×
898

×
899
                fe1 := apicFilterEntry(filterDn, strconv.Itoa(i+1), p_start,
×
900
                        p_end, "tcp", "no", false, false)
×
901
                filter.AddChild(fe1)
×
902
                fe2 := apicFilterEntry(filterDn, strconv.Itoa(i+2), p_start,
×
903
                        p_end, "udp", "no", false, false)
×
904
                filter.AddChild(fe2)
×
905
        }
×
906
        return filter
1✔
907
}
908

909
func apicFilterSnat(name string, tenantName string,
910
        portSpec []portRangeSnat, outTerm bool) apicapi.ApicObject {
1✔
911
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
912
        filterDn := filter.GetDn()
1✔
913

1✔
914
        p_start := strconv.Itoa(portSpec[0].start)
1✔
915
        p_end := strconv.Itoa(portSpec[0].end)
1✔
916

1✔
917
        fe := apicFilterEntry(filterDn, "0", p_start,
1✔
918
                p_end, "tcp", "no", true, outTerm)
1✔
919
        filter.AddChild(fe)
1✔
920
        fe1 := apicFilterEntry(filterDn, "1", p_start,
1✔
921
                p_end, "udp", "no", true, outTerm)
1✔
922
        filter.AddChild(fe1)
1✔
923

1✔
924
        return filter
1✔
925
}
1✔
926

927
func (cont *AciController) updateServiceDeviceInstance(key string,
928
        service *v1.Service) error {
1✔
929
        cont.indexMutex.Lock()
1✔
930
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
931
        cont.serviceEndPoints.GetnodesMetadata(key, service, nodeMap)
1✔
932
        cont.indexMutex.Unlock()
1✔
933

1✔
934
        var nodes []string
1✔
935
        for node := range nodeMap {
2✔
936
                nodes = append(nodes, node)
1✔
937
        }
1✔
938
        sort.Strings(nodes)
1✔
939
        name := cont.aciNameForKey("svc", key)
1✔
940
        var conScope string
1✔
941
        scopeVal, ok := service.ObjectMeta.Annotations[metadata.ServiceContractScopeAnnotation]
1✔
942
        if ok {
2✔
943
                normScopeVal := strings.ToLower(scopeVal)
1✔
944
                if !validScope(normScopeVal) {
1✔
945
                        errString := "Invalid service contract scope value provided " + scopeVal
×
946
                        err := errors.New(errString)
×
947
                        serviceLogger(cont.log, service).Error("Could not create contract: ", err)
×
948
                        return err
×
949
                } else {
1✔
950
                        conScope = normScopeVal
1✔
951
                }
1✔
952
        } else {
1✔
953
                conScope = DefaultServiceContractScope
1✔
954
        }
1✔
955

956
        var sharedSecurity bool
1✔
957
        if conScope == "global" {
2✔
958
                sharedSecurity = true
1✔
959
        } else {
2✔
960
                sharedSecurity = DefaultServiceExtSubNetShared
1✔
961
        }
1✔
962

963
        graphName := cont.aciNameForKey("svc", "global")
1✔
964
        deviceName := cont.aciNameForKey("svc", "global")
1✔
965
        _, customSGAnnPresent := service.ObjectMeta.Annotations[metadata.ServiceGraphNameAnnotation]
1✔
966
        if customSGAnnPresent {
1✔
967
                customSG, err := cont.getGraphNameFromContract(name, cont.config.AciVrfTenant)
×
968
                if err == nil {
×
969
                        graphName = customSG
×
970
                }
×
971
        }
972
        cont.log.Debug("Using service graph ", graphName, " for service ", key)
1✔
973

1✔
974
        var serviceObjs apicapi.ApicSlice
1✔
975
        if len(nodes) > 0 {
2✔
976
                // 1. Service redirect policy
1✔
977
                // The service redirect policy contains the MAC address
1✔
978
                // and IP address of each of the service endpoints for
1✔
979
                // each node that hosts a pod for this service.  The
1✔
980
                // example below shows the case of two nodes.
1✔
981
                rp, rpDn :=
1✔
982
                        cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
1✔
983
                                nodeMap, cont.staticMonPolDn(), cont.config.AciPbrTrackingNonSnat)
1✔
984
                serviceObjs = append(serviceObjs, rp)
1✔
985

1✔
986
                // 2. Service graph contract and external network
1✔
987
                // The service graph contract must be bound to the service
1✔
988
                // graph.  This contract must be consumed by the default
1✔
989
                // layer 3 network and provided by the service layer 3
1✔
990
                // network.
1✔
991
                {
2✔
992
                        var ingresses []string
1✔
993
                        for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
994
                                ingresses = append(ingresses, ingress.IP)
1✔
995
                        }
1✔
996
                        serviceObjs = append(serviceObjs,
1✔
997
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
998
                                        cont.config.AciL3Out, ingresses, sharedSecurity, false))
1✔
999
                }
1000

1001
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, false, customSGAnnPresent)
1✔
1002
                serviceObjs = append(serviceObjs, contract)
1✔
1003
                for _, net := range cont.config.AciExtNetworks {
2✔
1004
                        serviceObjs = append(serviceObjs,
1✔
1005
                                apicExtNetCons(name, cont.config.AciVrfTenant,
1✔
1006
                                        cont.config.AciL3Out, net))
1✔
1007
                }
1✔
1008

1009
                if cont.config.AddExternalContractToDefaultEPG && service.Spec.Type == v1.ServiceTypeLoadBalancer {
1✔
1010
                        defaultEpgTenant := cont.config.DefaultEg.PolicySpace
×
1011
                        defaultEpgStringSplit := strings.Split(cont.config.DefaultEg.Name, "|")
×
1012
                        var defaultEpgName, appProfile string
×
1013
                        if len(defaultEpgStringSplit) > 1 {
×
1014
                                appProfile = defaultEpgStringSplit[0]
×
1015
                                defaultEpgName = defaultEpgStringSplit[1]
×
1016
                        } else {
×
1017
                                appProfile = cont.config.AppProfile
×
1018
                                defaultEpgName = defaultEpgStringSplit[0]
×
1019
                        }
×
1020
                        serviceObjs = append(serviceObjs,
×
1021
                                apicDefaultEgCons(name, defaultEpgTenant, appProfile, defaultEpgName))
×
1022
                }
1023

1024
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
1025
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
1026

1✔
1027
                _, snat := cont.snatServices[key]
1✔
1028
                filter := apicFilter(name, cont.config.AciVrfTenant,
1✔
1029
                        service.Spec.Ports, snat, defaultPortRange)
1✔
1030
                serviceObjs = append(serviceObjs, filter)
1✔
1031

1✔
1032
                // 3. Device cluster context
1✔
1033
                // The logical device context binds the service contract
1✔
1034
                // to the redirect policy and the device cluster and
1✔
1035
                // bridge domain for the device cluster.
1✔
1036
                serviceObjs = append(serviceObjs,
1✔
1037
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName, deviceName,
1✔
1038
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, false))
1✔
1039
        }
1040

1041
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1042
        return nil
1✔
1043
}
1044

1045
func (cont *AciController) updateServiceDeviceInstanceSnat(key string) error {
1✔
1046
        nodeList := cont.nodeIndexer.List()
1✔
1047
        cont.indexMutex.Lock()
1✔
1048
        if len(cont.nodeServiceMetaCache) == 0 {
2✔
1049
                cont.indexMutex.Unlock()
1✔
1050
                return nil
1✔
1051
        }
1✔
1052
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
1053
        sort.Slice(nodeList, func(i, j int) bool {
2✔
1054
                nodeA := nodeList[i].(*v1.Node)
1✔
1055
                nodeB := nodeList[j].(*v1.Node)
1✔
1056
                return nodeA.ObjectMeta.Name < nodeB.ObjectMeta.Name
1✔
1057
        })
1✔
1058
        for itr, nodeItem := range nodeList {
2✔
1059
                if itr == cont.config.MaxSvcGraphNodes {
1✔
1060
                        break
×
1061
                }
1062
                node := nodeItem.(*v1.Node)
1✔
1063
                nodeName := node.ObjectMeta.Name
1✔
1064
                nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
1065
                if !ok {
2✔
1066
                        continue
1✔
1067
                }
1068
                _, ok = cont.fabricPathForNode(nodeName)
1✔
1069
                if !ok {
1✔
1070
                        continue
×
1071
                }
1072
                nodeLabels := node.ObjectMeta.Labels
1✔
1073
                excludeNode := cont.nodeLabelsInExcludeList(nodeLabels)
1✔
1074
                if excludeNode {
1✔
1075
                        continue
×
1076
                }
1077
                nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
1078
        }
1079
        cont.indexMutex.Unlock()
1✔
1080

1✔
1081
        var nodes []string
1✔
1082
        for node := range nodeMap {
2✔
1083
                nodes = append(nodes, node)
1✔
1084
        }
1✔
1085
        sort.Strings(nodes)
1✔
1086
        name := cont.aciNameForKey("snat", key)
1✔
1087
        var conScope = cont.config.SnatSvcContractScope
1✔
1088
        sharedSecurity := true
1✔
1089

1✔
1090
        graphName := cont.aciNameForKey("svc", "global")
1✔
1091
        var serviceObjs apicapi.ApicSlice
1✔
1092
        if len(nodes) > 0 {
2✔
1093
                // 1. Service redirect policy
1✔
1094
                // The service redirect policy contains the MAC address
1✔
1095
                // and IP address of each of the service endpoints for
1✔
1096
                // each node that hosts a pod for this service.
1✔
1097
                // For SNAT with the introduction of filter-chain usage, to work-around
1✔
1098
                // an APIC limitation, creating two PBR policies with same nodes.
1✔
1099
                var rpDn string
1✔
1100
                var rp apicapi.ApicObject
1✔
1101
                if cont.apicConn.SnatPbrFltrChain {
2✔
1102
                        rpCons, rpDnCons :=
1✔
1103
                                cont.apicRedirectPol(name+"_Cons", cont.config.AciVrfTenant, nodes,
1✔
1104
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
1105
                        serviceObjs = append(serviceObjs, rpCons)
1✔
1106
                        rpProv, _ :=
1✔
1107
                                cont.apicRedirectPol(name+"_Prov", cont.config.AciVrfTenant, nodes,
1✔
1108
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
1109
                        serviceObjs = append(serviceObjs, rpProv)
1✔
1110
                        rpDn = strings.TrimSuffix(rpDnCons, "_Cons")
1✔
1111
                } else {
1✔
1112
                        rp, rpDn =
×
1113
                                cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
×
1114
                                        nodeMap, cont.staticMonPolDn(), true)
×
1115
                        serviceObjs = append(serviceObjs, rp)
×
1116
                }
×
1117
                // 2. Service graph contract and external network
1118
                // The service graph contract must be bound to the
1119
                // service
1120
                // graph.  This contract must be consumed by the default
1121
                // layer 3 network and provided by the service layer 3
1122
                // network.
1123
                {
1✔
1124
                        var ingresses []string
1✔
1125
                        for _, policy := range cont.snatPolicyCache {
2✔
1126
                                ingresses = append(ingresses, policy.SnatIp...)
1✔
1127
                        }
1✔
1128
                        serviceObjs = append(serviceObjs,
1✔
1129
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
1130
                                        cont.config.AciL3Out, ingresses, sharedSecurity, true))
1✔
1131
                }
1132

1133
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, cont.apicConn.SnatPbrFltrChain, false)
1✔
1134
                serviceObjs = append(serviceObjs, contract)
1✔
1135

1✔
1136
                for _, net := range cont.config.AciExtNetworks {
2✔
1137
                        serviceObjs = append(serviceObjs,
1✔
1138
                                apicExtNetProv(name, cont.config.AciVrfTenant,
1✔
1139
                                        cont.config.AciL3Out, net))
1✔
1140
                }
1✔
1141

1142
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
1143
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
1144
                portSpec := []portRangeSnat{defaultPortRange}
1✔
1145
                if cont.apicConn.SnatPbrFltrChain {
2✔
1146
                        filterIn := apicFilterSnat(name+"_fromCons-toProv", cont.config.AciVrfTenant, portSpec, false)
1✔
1147
                        serviceObjs = append(serviceObjs, filterIn)
1✔
1148
                        filterOut := apicFilterSnat(name+"_fromProv-toCons", cont.config.AciVrfTenant, portSpec, true)
1✔
1149
                        serviceObjs = append(serviceObjs, filterOut)
1✔
1150
                } else {
1✔
1151
                        filter := apicFilterSnat(name, cont.config.AciVrfTenant, portSpec, false)
×
1152
                        serviceObjs = append(serviceObjs, filter)
×
1153
                }
×
1154
                // 3. Device cluster context
1155
                // The logical device context binds the service contract
1156
                // to the redirect policy and the device cluster and
1157
                // bridge domain for the device cluster.
1158
                serviceObjs = append(serviceObjs,
1✔
1159
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName, graphName,
1✔
1160
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, cont.apicConn.SnatPbrFltrChain))
1✔
1161
        }
1162
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1163
        return nil
1✔
1164
}
1165

1166
func (cont *AciController) nodeLabelsInExcludeList(Labels map[string]string) bool {
1✔
1167
        nodeSnatRedirectExclude := cont.config.NodeSnatRedirectExclude
1✔
1168

1✔
1169
        for _, nodeGroup := range nodeSnatRedirectExclude {
1✔
1170
                if len(nodeGroup.Labels) == 0 {
×
1171
                        continue
×
1172
                }
1173
                matchFound := true
×
1174
                for _, label := range nodeGroup.Labels {
×
1175
                        if _, ok := Labels["node-role.kubernetes.io/"+label]; !ok {
×
1176
                                matchFound = false
×
1177
                                break
×
1178
                        }
1179
                }
1180
                if matchFound {
×
1181
                        return true
×
1182
                }
×
1183
        }
1184
        return false
1✔
1185
}
1186

1187
func (cont *AciController) queueServiceUpdateByKey(key string) {
1✔
1188
        cont.serviceQueue.Add(key)
1✔
1189
}
1✔
1190

1191
func (cont *AciController) queueServiceUpdate(service *v1.Service) {
1✔
1192
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
1193
        if err != nil {
1✔
1194
                serviceLogger(cont.log, service).
×
1195
                        Error("Could not create service key: ", err)
×
1196
                return
×
1197
        }
×
1198
        cont.serviceQueue.Add(key)
1✔
1199
}
1200

1201
func apicDeviceCluster(name string, vrfTenant string,
1202
        physDom string, encap string,
1203
        nodes []string, nodeMap map[string]string) (apicapi.ApicObject, string) {
1✔
1204
        dc := apicapi.NewVnsLDevVip(vrfTenant, name)
1✔
1205
        dc.SetAttr("managed", "no")
1✔
1206
        dcDn := dc.GetDn()
1✔
1207
        dc.AddChild(apicapi.NewVnsRsALDevToPhysDomP(dcDn,
1✔
1208
                fmt.Sprintf("uni/phys-%s", physDom)))
1✔
1209
        lif := apicapi.NewVnsLIf(dcDn, "interface")
1✔
1210
        lif.SetAttr("encap", encap)
1✔
1211
        lifDn := lif.GetDn()
1✔
1212

1✔
1213
        for _, node := range nodes {
2✔
1214
                path, ok := nodeMap[node]
1✔
1215
                if !ok {
1✔
1216
                        continue
×
1217
                }
1218

1219
                cdev := apicapi.NewVnsCDev(dcDn, node)
1✔
1220
                cif := apicapi.NewVnsCif(cdev.GetDn(), "interface")
1✔
1221
                cif.AddChild(apicapi.NewVnsRsCIfPathAtt(cif.GetDn(), path))
1✔
1222
                cdev.AddChild(cif)
1✔
1223
                lif.AddChild(apicapi.NewVnsRsCIfAttN(lifDn, cif.GetDn()))
1✔
1224
                dc.AddChild(cdev)
1✔
1225
        }
1226

1227
        dc.AddChild(lif)
1✔
1228

1✔
1229
        return dc, dcDn
1✔
1230
}
1231

1232
func apicServiceGraph(name string, tenantName string,
1233
        dcDn string) apicapi.ApicObject {
1✔
1234
        sg := apicapi.NewVnsAbsGraph(tenantName, name)
1✔
1235
        sgDn := sg.GetDn()
1✔
1236
        var provDn string
1✔
1237
        var consDn string
1✔
1238
        var cTermDn string
1✔
1239
        var pTermDn string
1✔
1240
        {
2✔
1241
                an := apicapi.NewVnsAbsNode(sgDn, "loadbalancer")
1✔
1242
                an.SetAttr("managed", "no")
1✔
1243
                an.SetAttr("routingMode", "Redirect")
1✔
1244
                anDn := an.GetDn()
1✔
1245
                cons := apicapi.NewVnsAbsFuncConn(anDn, "consumer")
1✔
1246
                consDn = cons.GetDn()
1✔
1247
                an.AddChild(cons)
1✔
1248
                prov := apicapi.NewVnsAbsFuncConn(anDn, "provider")
1✔
1249
                provDn = prov.GetDn()
1✔
1250
                an.AddChild(prov)
1✔
1251
                an.AddChild(apicapi.NewVnsRsNodeToLDev(anDn, dcDn))
1✔
1252
                sg.AddChild(an)
1✔
1253
        }
1✔
1254
        {
1✔
1255
                tnc := apicapi.NewVnsAbsTermNodeCon(sgDn, "T1")
1✔
1256
                tncDn := tnc.GetDn()
1✔
1257
                cTerm := apicapi.NewVnsAbsTermConn(tncDn)
1✔
1258
                cTermDn = cTerm.GetDn()
1✔
1259
                tnc.AddChild(cTerm)
1✔
1260
                tnc.AddChild(apicapi.NewVnsInTerm(tncDn))
1✔
1261
                tnc.AddChild(apicapi.NewVnsOutTerm(tncDn))
1✔
1262
                sg.AddChild(tnc)
1✔
1263
        }
1✔
1264
        {
1✔
1265
                tnp := apicapi.NewVnsAbsTermNodeProv(sgDn, "T2")
1✔
1266
                tnpDn := tnp.GetDn()
1✔
1267
                pTerm := apicapi.NewVnsAbsTermConn(tnpDn)
1✔
1268
                pTermDn = pTerm.GetDn()
1✔
1269
                tnp.AddChild(pTerm)
1✔
1270
                tnp.AddChild(apicapi.NewVnsInTerm(tnpDn))
1✔
1271
                tnp.AddChild(apicapi.NewVnsOutTerm(tnpDn))
1✔
1272
                sg.AddChild(tnp)
1✔
1273
        }
1✔
1274
        {
1✔
1275
                acc := apicapi.NewVnsAbsConnection(sgDn, "C1")
1✔
1276
                acc.SetAttr("connDir", "provider")
1✔
1277
                accDn := acc.GetDn()
1✔
1278
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, consDn))
1✔
1279
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, cTermDn))
1✔
1280
                sg.AddChild(acc)
1✔
1281
        }
1✔
1282
        {
1✔
1283
                acp := apicapi.NewVnsAbsConnection(sgDn, "C2")
1✔
1284
                acp.SetAttr("connDir", "provider")
1✔
1285
                acpDn := acp.GetDn()
1✔
1286
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, provDn))
1✔
1287
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, pTermDn))
1✔
1288
                sg.AddChild(acp)
1✔
1289
        }
1✔
1290
        return sg
1✔
1291
}
1292
func (cont *AciController) updateDeviceCluster() {
1✔
1293
        nodeMap := make(map[string]string)
1✔
1294
        cont.indexMutex.Lock()
1✔
1295
        for node := range cont.nodeOpflexDevice {
2✔
1296
                cont.log.Debug("Processing node in nodeOpflexDevice cache : ", node)
1✔
1297
                fabricPath, ok := cont.fabricPathForNode(node)
1✔
1298
                if !ok {
2✔
1299
                        continue
1✔
1300
                }
1301
                nodeMap[node] = fabricPath
1✔
1302
        }
1303

1304
        // For clusters other than OpenShift On OpenStack,
1305
        // openStackFabricPathDnMap will be empty
1306
        for host, opflexOdevInfo := range cont.openStackFabricPathDnMap {
1✔
1307
                nodeMap[host] = opflexOdevInfo.fabricPathDn
×
1308
        }
×
1309

1310
        // For OpenShift On OpenStack clusters,
1311
        // hostFabricPathDnMap will be empty
1312
        for host, fabricPathDn := range cont.hostFabricPathDnMap {
1✔
1313
                nodeMap[host] = fabricPathDn
×
1314
        }
×
1315
        cont.indexMutex.Unlock()
1✔
1316

1✔
1317
        var nodes []string
1✔
1318
        for node := range nodeMap {
2✔
1319
                nodes = append(nodes, node)
1✔
1320
        }
1✔
1321
        sort.Strings(nodes)
1✔
1322

1✔
1323
        name := cont.aciNameForKey("svc", "global")
1✔
1324
        var serviceObjs apicapi.ApicSlice
1✔
1325

1✔
1326
        // 1. Device cluster:
1✔
1327
        // The device cluster is a set of physical paths that need to be
1✔
1328
        // created for each node in the cluster, that correspond to the
1✔
1329
        // service interface for each node.
1✔
1330
        dc, dcDn := apicDeviceCluster(name, cont.config.AciVrfTenant,
1✔
1331
                cont.config.AciServicePhysDom, cont.config.AciServiceEncap,
1✔
1332
                nodes, nodeMap)
1✔
1333
        serviceObjs = append(serviceObjs, dc)
1✔
1334

1✔
1335
        // 2. Service graph template
1✔
1336
        // The service graph controls how the traffic will be redirected.
1✔
1337
        // A service graph must be created for each device cluster.
1✔
1338
        serviceObjs = append(serviceObjs,
1✔
1339
                apicServiceGraph(name, cont.config.AciVrfTenant, dcDn))
1✔
1340

1✔
1341
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1342
}
1343

1344
func (cont *AciController) fabricPathLogger(node string,
1345
        obj apicapi.ApicObject) *logrus.Entry {
1✔
1346
        return cont.log.WithFields(logrus.Fields{
1✔
1347
                "fabricPath": obj.GetAttr("fabricPathDn"),
1✔
1348
                "mac":        obj.GetAttr("mac"),
1✔
1349
                "node":       node,
1✔
1350
                "obj":        obj,
1✔
1351
        })
1✔
1352
}
1✔
1353

1354
func (cont *AciController) setOpenStackSystemId() string {
×
1355

×
1356
        // 1) get opflexIDEp with containerName == <node name of any one of the openshift nodes>
×
1357
        // 2) extract OpenStack system id from compHvDn attribute
×
1358
        //    comp/prov-OpenStack/ctrlr-[k8s-scale]-k8s-scale/hv-overcloud-novacompute-0 - sample compHvDn,
×
1359
        //    where k8s-scale is the system id
×
1360

×
1361
        var systemId string
×
1362
        nodeList := cont.nodeIndexer.List()
×
1363
        if len(nodeList) < 1 {
×
1364
                return systemId
×
1365
        }
×
1366
        node := nodeList[0].(*v1.Node)
×
1367
        nodeName := node.ObjectMeta.Name
×
1368
        opflexIDEpFilter := fmt.Sprintf("query-target-filter=and(eq(opflexIDEp.containerName,\"%s\"))", nodeName)
×
1369
        opflexIDEpArgs := []string{
×
1370
                opflexIDEpFilter,
×
1371
        }
×
1372
        url := fmt.Sprintf("/api/node/class/opflexIDEp.json?%s", strings.Join(opflexIDEpArgs, "&"))
×
1373
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
1374
        if err != nil {
×
1375
                cont.log.Error("Failed to get APIC response, err: ", err.Error())
×
1376
                return systemId
×
1377
        }
×
1378
        for _, obj := range apicresp.Imdata {
×
1379
                for _, body := range obj {
×
1380
                        compHvDn, ok := body.Attributes["compHvDn"].(string)
×
1381
                        if ok {
×
1382
                                systemId = compHvDn[strings.IndexByte(compHvDn, '[')+1 : strings.IndexByte(compHvDn, ']')]
×
1383
                                break
×
1384
                        }
1385
                }
1386
        }
1387
        cont.indexMutex.Lock()
×
1388
        cont.openStackSystemId = systemId
×
1389
        cont.log.Info("Setting OpenStack system id : ", cont.openStackSystemId)
×
1390
        cont.indexMutex.Unlock()
×
1391
        return systemId
×
1392
}
1393

1394
// Returns true when a new OpenStack opflexODev is added
1395
func (cont *AciController) openStackOpflexOdevUpdate(obj apicapi.ApicObject) bool {
×
1396

×
1397
        // If opflexOdev compHvDn contains comp/prov-OpenShift/ctrlr-[<systemid>]-<systemid>,
×
1398
        // it means that it is an OpenStack OpflexOdev which belongs to OpenStack with system id <systemid>
×
1399

×
1400
        var deviceClusterUpdate bool
×
1401
        compHvDn := obj.GetAttrStr("compHvDn")
×
1402
        if strings.Contains(compHvDn, "prov-OpenStack") {
×
1403
                cont.indexMutex.Lock()
×
1404
                systemId := cont.openStackSystemId
×
1405
                cont.indexMutex.Unlock()
×
1406
                if systemId == "" {
×
1407
                        systemId = cont.setOpenStackSystemId()
×
1408
                }
×
1409
                if systemId == "" {
×
1410
                        cont.log.Error("Failed  to get OpenStack system id")
×
1411
                        return deviceClusterUpdate
×
1412
                }
×
1413
                prefix := fmt.Sprintf("comp/prov-OpenStack/ctrlr-[%s]-%s", systemId, systemId)
×
1414
                if strings.Contains(compHvDn, prefix) {
×
1415
                        cont.log.Info("Received notification for OpenStack opflexODev update, hostName: ",
×
1416
                                obj.GetAttrStr("hostName"), " dn: ", obj.GetAttrStr("dn"))
×
1417
                        cont.indexMutex.Lock()
×
1418
                        opflexOdevInfo, ok := cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")]
×
1419
                        if ok {
×
1420
                                opflexOdevInfo.opflexODevDn[obj.GetAttrStr("dn")] = struct{}{}
×
1421
                                cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")] = opflexOdevInfo
×
1422
                        } else {
×
1423
                                var openstackopflexodevinfo openstackOpflexOdevInfo
×
1424
                                opflexODevDn := make(map[string]struct{})
×
1425
                                opflexODevDn[obj.GetAttrStr("dn")] = struct{}{}
×
1426
                                openstackopflexodevinfo.fabricPathDn = obj.GetAttrStr("fabricPathDn")
×
1427
                                openstackopflexodevinfo.opflexODevDn = opflexODevDn
×
1428
                                cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")] = openstackopflexodevinfo
×
1429
                                deviceClusterUpdate = true
×
1430
                        }
×
1431
                        cont.indexMutex.Unlock()
×
1432
                }
1433
        }
1434
        return deviceClusterUpdate
×
1435
}
1436

1437
func (cont *AciController) infraRtAttEntPDeleted(dn string) {
×
1438
        // dn format : uni/infra/attentp-k8s-scale-esxi-aaep/rtattEntP-[uni/infra/funcprof/accbundle-esxi1-vpc-ipg]
×
1439
        re := regexp.MustCompile(`accbundle-([^]]+)`)
×
1440
        cont.log.Info("Processing delete of infraRtAttEntP: ", dn)
×
1441
        matches := re.FindStringSubmatch(dn)
×
1442
        if len(matches) < 2 {
×
1443
                cont.log.Error("Failed to extract ipg from dn : ", dn)
×
1444
                return
×
1445
        }
×
1446
        host := matches[1]
×
1447

×
1448
        cont.indexMutex.Lock()
×
1449
        _, ok := cont.hostFabricPathDnMap[host]
×
1450
        if ok {
×
1451
                delete(cont.hostFabricPathDnMap, host)
×
1452
                cont.log.Info("Deleted ipg : ", host)
×
1453
        }
×
1454
        cont.indexMutex.Unlock()
×
1455

×
1456
        if ok {
×
1457
                cont.updateDeviceCluster()
×
1458
        }
×
1459
}
1460

1461
func (cont *AciController) infraRtAttEntPChanged(obj apicapi.ApicObject) {
×
1462
        var tdn string
×
1463
        for _, body := range obj {
×
1464
                var ok bool
×
1465
                tdn, ok = body.Attributes["tDn"].(string)
×
1466
                if !ok {
×
1467
                        cont.log.Error("tDn missing in infraRtAttEntP")
×
1468
                        return
×
1469
                }
×
1470
        }
1471
        var updated bool
×
1472
        if tdn != "" {
×
1473
                cont.log.Info("infraRtAttEntP updated, tDn : ", tdn)
×
1474

×
1475
                // tdn format for vpc : /uni/infra/funcprof/accbundle-esxi1-vpc-ipg
×
1476
                // tdn format for single leaf : /uni/infra/funcprof/accportgrp-IPG_CLIENT_SIM
×
1477

×
1478
                // Ignore processing of single leaf
×
1479
                if !strings.Contains(tdn, "/accbundle-") {
×
1480
                        cont.log.Info("Skipping processing of infraRtAttEntP update, not applicable for non-VPC configuration: ", tdn)
×
1481
                        return
×
1482
                }
×
1483

1484
                // extract esxi1-vpc-ipg
1485
                parts := strings.Split(tdn, "/")
×
1486
                lastPart := parts[len(parts)-1]
×
1487
                host := strings.TrimPrefix(lastPart, "accbundle-")
×
1488
                assocGrpFilter := fmt.Sprintf(`query-target-filter=and(eq(infraPortSummary.assocGrp,"%s"))`, tdn)
×
1489
                url := fmt.Sprintf("/api/class/infraPortSummary.json?%s", assocGrpFilter)
×
1490
                apicresp, err := cont.apicConn.GetApicResponse(url)
×
1491
                if err != nil {
×
1492
                        cont.log.Error("Failed to get APIC response, err: ", err.Error())
×
1493
                        return
×
1494
                }
×
1495
                for _, obj := range apicresp.Imdata {
×
1496
                        for _, body := range obj {
×
1497
                                pcPortDn, ok := body.Attributes["pcPortDn"].(string)
×
1498
                                if ok && pcPortDn != "" {
×
1499
                                        cont.indexMutex.Lock()
×
1500
                                        fabricPathDn, exists := cont.hostFabricPathDnMap[host]
×
1501
                                        if !exists || (exists && fabricPathDn != pcPortDn) {
×
1502
                                                cont.hostFabricPathDnMap[host] = pcPortDn
×
1503
                                                cont.log.Info("Updated fabricPathDn of ipg :", host, " to: ", pcPortDn)
×
1504
                                                updated = true
×
1505
                                        }
×
1506
                                        cont.indexMutex.Unlock()
×
1507
                                        break
×
1508
                                }
1509
                        }
1510
                }
1511

1512
        }
1513
        if updated {
×
1514
                cont.updateDeviceCluster()
×
1515
        }
×
1516
        return
×
1517
}
1518

1519
func (cont *AciController) opflexDeviceChanged(obj apicapi.ApicObject) {
1✔
1520
        devType := obj.GetAttrStr("devType")
1✔
1521
        domName := obj.GetAttrStr("domName")
1✔
1522
        ctrlrName := obj.GetAttrStr("ctrlrName")
1✔
1523

1✔
1524
        if !cont.config.DisableServiceVlanPreprovisioning && strings.Contains(cont.config.Flavor, "openstack") {
1✔
1525
                if cont.openStackOpflexOdevUpdate(obj) {
×
1526
                        cont.log.Info("OpenStack opflexODev for ", obj.GetAttrStr("hostName"), " is added")
×
1527
                        cont.updateDeviceCluster()
×
1528
                }
×
1529
        }
1530
        if (devType == cont.env.OpFlexDeviceType()) && (domName == cont.config.AciVmmDomain) && (ctrlrName == cont.config.AciVmmController) {
2✔
1531
                cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Processing opflex device update")
1✔
1532
                if obj.GetAttrStr("state") == "disconnected" {
2✔
1533
                        cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Opflex device disconnected")
1✔
1534
                        cont.indexMutex.Lock()
1✔
1535
                        for node, devices := range cont.nodeOpflexDevice {
1✔
1536
                                if node == obj.GetAttrStr("hostName") {
×
1537
                                        for _, device := range devices {
×
1538
                                                if device.GetDn() == obj.GetDn() {
×
1539
                                                        device.SetAttr("state", "disconnected")
×
1540
                                                        cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Debug("Opflex device cache updated for disconnected node")
×
1541
                                                }
×
1542
                                        }
1543
                                        cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", devices)
×
1544
                                        break
×
1545
                                }
1546
                        }
1547
                        cont.indexMutex.Unlock()
1✔
1548
                        cont.updateDeviceCluster()
1✔
1549
                        return
1✔
1550
                }
1551
                var nodeUpdates []string
1✔
1552

1✔
1553
                cont.indexMutex.Lock()
1✔
1554
                nodefound := false
1✔
1555
                for node, devices := range cont.nodeOpflexDevice {
2✔
1556
                        found := false
1✔
1557

1✔
1558
                        if node == obj.GetAttrStr("hostName") {
2✔
1559
                                nodefound = true
1✔
1560
                        }
1✔
1561

1562
                        for i, device := range devices {
2✔
1563
                                if device.GetDn() != obj.GetDn() {
2✔
1564
                                        continue
1✔
1565
                                }
1566
                                found = true
1✔
1567

1✔
1568
                                if obj.GetAttrStr("hostName") != node {
2✔
1569
                                        cont.fabricPathLogger(node, device).
1✔
1570
                                                Debug("Moving opflex device from node")
1✔
1571

1✔
1572
                                        devices = append(devices[:i], devices[i+1:]...)
1✔
1573
                                        cont.nodeOpflexDevice[node] = devices
1✔
1574
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1575
                                        break
1✔
1576
                                } else if (device.GetAttrStr("mac") != obj.GetAttrStr("mac")) ||
1✔
1577
                                        (device.GetAttrStr("fabricPathDn") != obj.GetAttrStr("fabricPathDn")) ||
1✔
1578
                                        (device.GetAttrStr("state") != obj.GetAttrStr("state")) {
2✔
1579
                                        cont.fabricPathLogger(node, obj).
1✔
1580
                                                Debug("Updating opflex device")
1✔
1581

1✔
1582
                                        devices = append(append(devices[:i], devices[i+1:]...), obj)
1✔
1583
                                        cont.nodeOpflexDevice[node] = devices
1✔
1584
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1585
                                        break
1✔
1586
                                }
1587
                        }
1588
                        if !found && obj.GetAttrStr("hostName") == node {
2✔
1589
                                cont.fabricPathLogger(node, obj).
1✔
1590
                                        Debug("Appending opflex device")
1✔
1591

1✔
1592
                                devices = append(devices, obj)
1✔
1593
                                cont.nodeOpflexDevice[node] = devices
1✔
1594
                                nodeUpdates = append(nodeUpdates, node)
1✔
1595
                        }
1✔
1596
                }
1597
                if !nodefound {
2✔
1598
                        node := obj.GetAttrStr("hostName")
1✔
1599
                        cont.fabricPathLogger(node, obj).Debug("Adding opflex device")
1✔
1600
                        cont.nodeOpflexDevice[node] = apicapi.ApicSlice{obj}
1✔
1601
                        nodeUpdates = append(nodeUpdates, node)
1✔
1602
                }
1✔
1603
                cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", cont.nodeOpflexDevice[obj.GetAttrStr("hostName")])
1✔
1604
                cont.indexMutex.Unlock()
1✔
1605

1✔
1606
                for _, node := range nodeUpdates {
2✔
1607
                        cont.env.NodeServiceChanged(node)
1✔
1608
                        cont.erspanSyncOpflexDev()
1✔
1609
                }
1✔
1610
                cont.updateDeviceCluster()
1✔
1611
        }
1612
}
1613

1614
func (cont *AciController) postOpflexDeviceDelete(nodes []string) {
1✔
1615
        cont.updateDeviceCluster()
1✔
1616
        for _, node := range nodes {
2✔
1617
                cont.env.NodeServiceChanged(node)
1✔
1618
                cont.erspanSyncOpflexDev()
1✔
1619
        }
1✔
1620
}
1621

1622
func (cont *AciController) opflexDeviceDeleted(dn string) {
1✔
1623
        var nodeUpdates []string
1✔
1624
        var dnFound bool //to check if the dn belongs to this cluster
1✔
1625
        cont.log.Info("Processing opflex device delete notification of ", dn)
1✔
1626
        cont.indexMutex.Lock()
1✔
1627
        for node, devices := range cont.nodeOpflexDevice {
2✔
1628
                for i, device := range devices {
2✔
1629
                        if device.GetDn() != dn {
2✔
1630
                                continue
1✔
1631
                        }
1632
                        dnFound = true
1✔
1633
                        cont.fabricPathLogger(node, device).
1✔
1634
                                Debug("Deleting opflex device path")
1✔
1635
                        devices = append(devices[:i], devices[i+1:]...)
1✔
1636
                        cont.nodeOpflexDevice[node] = devices
1✔
1637
                        cont.log.Info("Deleted opflex device of node ", node, ": ", dn)
1✔
1638
                        nodeUpdates = append(nodeUpdates, node)
1✔
1639
                        break
1✔
1640
                }
1641
                if len(devices) == 0 {
2✔
1642
                        delete(cont.nodeOpflexDevice, node)
1✔
1643
                }
1✔
1644
        }
1645

1646
        // For clusters other than OpenShift On OpenStack,
1647
        // openStackFabricPathDnMap will be empty
1648
        for host, opflexOdevInfo := range cont.openStackFabricPathDnMap {
1✔
1649
                if _, ok := opflexOdevInfo.opflexODevDn[dn]; ok {
×
1650
                        cont.log.Info("Received OpenStack opflexODev delete notification for ", dn)
×
1651
                        delete(opflexOdevInfo.opflexODevDn, dn)
×
1652
                        if len(opflexOdevInfo.opflexODevDn) < 1 {
×
1653
                                delete(cont.openStackFabricPathDnMap, host)
×
1654
                                cont.log.Info("OpenStack opflexODev of host ", host, " is deleted from cache")
×
1655
                                dnFound = true
×
1656
                        } else {
×
1657
                                cont.openStackFabricPathDnMap[host] = opflexOdevInfo
×
1658
                        }
×
1659
                        break
×
1660
                }
1661
        }
1662
        cont.indexMutex.Unlock()
1✔
1663

1✔
1664
        if dnFound {
2✔
1665
                cont.postOpflexDeviceDelete(nodeUpdates)
1✔
1666
        }
1✔
1667
}
1668

1669
func (cont *AciController) writeApicSvc(key string, service *v1.Service) {
1✔
1670
        if cont.config.ChainedMode {
1✔
1671
                return
×
1672
        }
×
1673
        aobj := apicapi.NewVmmInjectedSvc(cont.vmmDomainProvider(),
1✔
1674
                cont.config.AciVmmDomain, cont.config.AciVmmController,
1✔
1675
                service.Namespace, service.Name)
1✔
1676
        aobjDn := aobj.GetDn()
1✔
1677
        aobj.SetAttr("guid", string(service.UID))
1✔
1678

1✔
1679
        svcns := service.ObjectMeta.Namespace
1✔
1680
        _, exists, err := cont.namespaceIndexer.GetByKey(svcns)
1✔
1681
        if err != nil {
1✔
1682
                cont.log.Error("Failed to lookup ns : ", svcns, " ", err)
×
1683
                return
×
1684
        }
×
1685
        if !exists {
2✔
1686
                cont.log.Debug("Namespace of service ", service.ObjectMeta.Name, ": ", svcns, " doesn't exist, hence not sending an update to the APIC")
1✔
1687
                return
1✔
1688
        }
1✔
1689

1690
        if !cont.serviceEndPoints.SetServiceApicObject(aobj, service) {
2✔
1691
                return
1✔
1692
        }
1✔
1693
        var setApicSvcDnsName bool
1✔
1694
        if len(cont.config.ApicHosts) != 0 && apicapi.ApicVersion >= "5.1" {
1✔
1695
                setApicSvcDnsName = true
×
1696
        }
×
1697
        // APIC model only allows one of these
1698
        for _, ingress := range service.Status.LoadBalancer.Ingress {
1✔
1699
                if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
1700
                        aobj.SetAttr("lbIp", ingress.IP)
×
1701
                } else if ingress.Hostname != "" {
×
1702
                        ipList, err := net.LookupHost(ingress.Hostname)
×
1703
                        if err == nil && len(ipList) > 0 {
×
1704
                                aobj.SetAttr("lbIp", ipList[0])
×
1705
                        } else {
×
1706
                                cont.log.Errorf("Lookup: err: %v, ipList: %+v", err, ipList)
×
1707
                        }
×
1708
                }
1709
                break
×
1710
        }
1711
        if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != "None" {
2✔
1712
                aobj.SetAttr("clusterIp", service.Spec.ClusterIP)
1✔
1713
        }
1✔
1714

1715
        var t string
1✔
1716
        switch service.Spec.Type {
1✔
1717
        case v1.ServiceTypeClusterIP:
×
1718
                t = "clusterIp"
×
1719
        case v1.ServiceTypeNodePort:
×
1720
                t = "nodePort"
×
1721
        case v1.ServiceTypeLoadBalancer:
1✔
1722
                t = "loadBalancer"
1✔
1723
        case v1.ServiceTypeExternalName:
×
1724
                t = "externalName"
×
1725
        }
1726
        if t != "" {
2✔
1727
                aobj.SetAttr("type", t)
1✔
1728
        }
1✔
1729

1730
        if setApicSvcDnsName || cont.config.Flavor == "k8s-overlay" {
1✔
1731
                dnsName := fmt.Sprintf("%s.%s.svc.cluster.local", service.Name, service.Namespace)
×
1732

×
1733
                for _, ingress := range service.Status.LoadBalancer.Ingress {
×
1734
                        if ingress.Hostname != "" {
×
1735
                                aobj.SetAttr("dnsName", ingress.Hostname)
×
1736
                        } else if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
1737
                                aobj.SetAttr("dnsName", dnsName)
×
1738
                        }
×
1739
                }
1740
                if t == "clusterIp" || t == "nodePort" || t == "externalName" {
×
1741
                        aobj.SetAttr("dnsName", dnsName)
×
1742
                }
×
1743
        }
1744
        for _, port := range service.Spec.Ports {
2✔
1745
                proto := getProtocolStr(port.Protocol)
1✔
1746
                p := apicapi.NewVmmInjectedSvcPort(aobjDn,
1✔
1747
                        strconv.Itoa(int(port.Port)), proto, port.TargetPort.String())
1✔
1748
                p.SetAttr("nodePort", strconv.Itoa(int(port.NodePort)))
1✔
1749
                aobj.AddChild(p)
1✔
1750
        }
1✔
1751
        if cont.config.EnableVmmInjectedLabels && service.ObjectMeta.Labels != nil && apicapi.ApicVersion >= "5.2" {
1✔
1752
                for key, val := range service.ObjectMeta.Labels {
×
1753
                        newLabelKey := cont.aciNameForKey("label", key)
×
1754
                        label := apicapi.NewVmmInjectedLabel(aobj.GetDn(),
×
1755
                                newLabelKey, val)
×
1756
                        aobj.AddChild(label)
×
1757
                }
×
1758
        }
1759
        name := cont.aciNameForKey("service-vmm", key)
1✔
1760
        cont.log.Debug("Write Service Object: ", aobj)
1✔
1761
        cont.apicConn.WriteApicObjects(name, apicapi.ApicSlice{aobj})
1✔
1762
        cont.log.Debugf("svcObject: %+v", aobj)
1✔
1763
}
1764

1765
func removeAllConditions(conditions []metav1.Condition, conditionType string) []metav1.Condition {
1✔
1766
        i := 0
1✔
1767
        for _, cond := range conditions {
1✔
1768
                if cond.Type != conditionType {
×
1769
                        conditions[i] = cond
×
1770
                }
×
1771
        }
1772
        return conditions[:i]
1✔
1773
}
1774

1775
func (cont *AciController) updateServiceCondition(service *v1.Service, success bool, reason string, message string) bool {
1✔
1776
        conditionType := "LbIpamAllocation"
1✔
1777

1✔
1778
        var condition metav1.Condition
1✔
1779
        if success {
2✔
1780
                condition.Status = metav1.ConditionTrue
1✔
1781
        } else {
2✔
1782
                condition.Status = metav1.ConditionFalse
1✔
1783
                condition.Message = message
1✔
1784
        }
1✔
1785
        condition.Type = conditionType
1✔
1786
        condition.Reason = reason
1✔
1787
        condition.LastTransitionTime = metav1.Time{time.Now()}
1✔
1788
        for _, cond := range service.Status.Conditions {
2✔
1789
                if cond.Type == conditionType &&
1✔
1790
                        cond.Status == condition.Status &&
1✔
1791
                        cond.Message == condition.Message &&
1✔
1792
                        cond.Reason == condition.Reason {
2✔
1793
                        return false
1✔
1794
                }
1✔
1795
        }
1796

1797
        service.Status.Conditions = removeAllConditions(service.Status.Conditions, conditionType)
1✔
1798
        service.Status.Conditions = append(service.Status.Conditions, condition)
1✔
1799
        return true
1✔
1800
}
1801

1802
func (cont *AciController) validateRequestedIps(lbIpList []string) (net.IP, net.IP, bool) {
1✔
1803
        var ipv4, ipv6 net.IP
1✔
1804
        for _, lbIp := range lbIpList {
2✔
1805
                ip := net.ParseIP(lbIp)
1✔
1806
                if ip != nil {
2✔
1807
                        if ip.To4() != nil {
2✔
1808
                                if ipv4.Equal(net.IP{}) {
2✔
1809
                                        ipv4 = ip
1✔
1810
                                } else {
2✔
1811
                                        cont.log.Error("Annotation should have only one ipv4")
1✔
1812
                                        return ipv4, ipv6, false
1✔
1813
                                }
1✔
1814
                        } else if ip.To16() != nil {
2✔
1815
                                if ipv6.Equal(net.IP{}) {
2✔
1816
                                        ipv6 = ip
1✔
1817
                                } else {
2✔
1818
                                        cont.log.Error("Annotation should have only one ipv6")
1✔
1819
                                        return ipv4, ipv6, false
1✔
1820
                                }
1✔
1821
                        }
1822
                }
1823
        }
1824
        return ipv4, ipv6, true
1✔
1825
}
1826

1827
func (cont *AciController) returnUnusedStaticIngressIps(staticIngressIps, requestedIps []net.IP) {
1✔
1828
        for _, staticIp := range staticIngressIps {
2✔
1829
                found := false
1✔
1830
                for _, reqIp := range requestedIps {
2✔
1831
                        if reqIp.Equal(staticIp) {
2✔
1832
                                found = true
1✔
1833
                        }
1✔
1834
                }
1835
                if !found {
2✔
1836
                        returnIps(cont.staticServiceIps, []net.IP{staticIp})
1✔
1837
                }
1✔
1838
        }
1839
}
1840

1841
func (cont *AciController) allocateServiceIps(servicekey string,
1842
        service *v1.Service) bool {
1✔
1843
        logger := serviceLogger(cont.log, service)
1✔
1844
        cont.indexMutex.Lock()
1✔
1845
        meta, ok := cont.serviceMetaCache[servicekey]
1✔
1846
        if !ok {
2✔
1847
                meta = &serviceMeta{}
1✔
1848
                cont.serviceMetaCache[servicekey] = meta
1✔
1849

1✔
1850
                // Read any existing IPs and attempt to allocate them to the pod
1✔
1851
                for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
1852
                        ip := net.ParseIP(ingress.IP)
1✔
1853
                        if ip == nil {
1✔
1854
                                continue
×
1855
                        }
1856
                        if ip.To4() != nil {
2✔
1857
                                if cont.serviceIps.GetV4IpCache()[0].RemoveIp(ip) {
2✔
1858
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
1859
                                } else if cont.staticServiceIps.V4.RemoveIp(ip) {
3✔
1860
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
1861
                                }
1✔
1862
                        } else if ip.To16() != nil {
2✔
1863
                                if cont.serviceIps.GetV6IpCache()[0].RemoveIp(ip) {
2✔
1864
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
1865
                                } else if cont.staticServiceIps.V6.RemoveIp(ip) {
3✔
1866
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
1867
                                }
1✔
1868
                        }
1869
                }
1870
        }
1871

1872
        if !cont.serviceSyncEnabled {
2✔
1873
                cont.indexMutex.Unlock()
1✔
1874
                return false
1✔
1875
        }
1✔
1876

1877
        var requestedIps []net.IP
1✔
1878
        // try to give the requested load balancer IP to the pod
1✔
1879
        lbIps, ok := service.ObjectMeta.Annotations[metadata.LbIpAnnotation]
1✔
1880
        if ok {
2✔
1881
                lbIpList := strings.Split(lbIps, ",")
1✔
1882
                ipv4, ipv6, valid := cont.validateRequestedIps(lbIpList)
1✔
1883
                if valid {
2✔
1884
                        if ipv4 != nil {
2✔
1885
                                requestedIps = append(requestedIps, ipv4)
1✔
1886
                        }
1✔
1887
                        if ipv6 != nil {
2✔
1888
                                requestedIps = append(requestedIps, ipv6)
1✔
1889
                        }
1✔
1890
                } else {
1✔
1891
                        cont.returnServiceIps(meta.ingressIps)
1✔
1892
                        cont.log.Error("Invalid LB IP annotation for service ", servicekey)
1✔
1893
                        condUpdated := cont.updateServiceCondition(service, false, "InvalidAnnotation", "Invalid Loadbalancer IP annotation")
1✔
1894
                        if condUpdated {
2✔
1895
                                _, err := cont.updateServiceStatus(service)
1✔
1896
                                if err != nil {
1✔
1897
                                        logger.Error("Failed to update service status : ", err)
×
1898
                                        cont.indexMutex.Unlock()
×
1899
                                        return true
×
1900
                                }
×
1901
                        }
1902
                        cont.indexMutex.Unlock()
1✔
1903
                        return false
1✔
1904
                }
1905
        } else {
1✔
1906
                requestedIp := net.ParseIP(service.Spec.LoadBalancerIP)
1✔
1907
                if requestedIp != nil {
2✔
1908
                        requestedIps = append(requestedIps, requestedIp)
1✔
1909
                }
1✔
1910
        }
1911
        if len(requestedIps) > 0 {
2✔
1912
                meta.requestedIps = []net.IP{}
1✔
1913
                for _, requestedIp := range requestedIps {
2✔
1914
                        hasRequestedIp := false
1✔
1915
                        for _, ip := range meta.staticIngressIps {
2✔
1916
                                if reflect.DeepEqual(requestedIp, ip) {
2✔
1917
                                        hasRequestedIp = true
1✔
1918
                                }
1✔
1919
                        }
1920
                        if !hasRequestedIp {
2✔
1921
                                if requestedIp.To4() != nil &&
1✔
1922
                                        cont.staticServiceIps.V4.RemoveIp(requestedIp) {
2✔
1923
                                        hasRequestedIp = true
1✔
1924
                                } else if requestedIp.To16() != nil &&
2✔
1925
                                        cont.staticServiceIps.V6.RemoveIp(requestedIp) {
2✔
1926
                                        hasRequestedIp = true
1✔
1927
                                }
1✔
1928
                        }
1929
                        if hasRequestedIp {
2✔
1930
                                meta.requestedIps = append(meta.requestedIps, requestedIp)
1✔
1931
                        }
1✔
1932
                }
1933
                cont.returnUnusedStaticIngressIps(meta.staticIngressIps, meta.requestedIps)
1✔
1934
                meta.staticIngressIps = meta.requestedIps
1✔
1935
                cont.returnServiceIps(meta.ingressIps)
1✔
1936
                meta.ingressIps = nil
1✔
1937
                // If no requested ips are allocatable
1✔
1938
                if len(meta.requestedIps) < 1 {
2✔
1939
                        logger.Error("No Requested Ip addresses available for service ", servicekey)
1✔
1940
                        condUpdated := cont.updateServiceCondition(service, false, "RequestedIpsNotAllocatable", "The requested ips for loadbalancer service are not available or not in extern static range")
1✔
1941
                        if condUpdated {
2✔
1942
                                _, err := cont.updateServiceStatus(service)
1✔
1943
                                if err != nil {
1✔
1944
                                        cont.indexMutex.Unlock()
×
1945
                                        logger.Error("Failed to update service status: ", err)
×
1946
                                        return true
×
1947
                                }
×
1948
                        }
1949
                        cont.indexMutex.Unlock()
1✔
1950
                        return false
1✔
1951
                }
1952
        } else if len(meta.requestedIps) > 0 {
1✔
1953
                meta.requestedIps = nil
×
1954
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
×
1955
                meta.staticIngressIps = nil
×
1956
        }
×
1957
        ingressIps := make([]net.IP, 0)
1✔
1958
        ingressIps = append(ingressIps, meta.ingressIps...)
1✔
1959
        ingressIps = append(ingressIps, meta.staticIngressIps...)
1✔
1960

1✔
1961
        var ipv4, ipv6 net.IP
1✔
1962
        for _, ip := range ingressIps {
2✔
1963
                if ip.To4() != nil {
2✔
1964
                        ipv4 = ip
1✔
1965
                } else if ip.To16() != nil {
3✔
1966
                        ipv6 = ip
1✔
1967
                }
1✔
1968
        }
1969
        var clusterIPv4, clusterIPv6 net.IP
1✔
1970
        clusterIPs := append([]string{service.Spec.ClusterIP}, service.Spec.ClusterIPs...)
1✔
1971
        for _, ipStr := range clusterIPs {
2✔
1972
                ip := net.ParseIP(ipStr)
1✔
1973
                if ip == nil {
1✔
1974
                        continue
×
1975
                }
1976
                if ip.To4() != nil && clusterIPv4 == nil {
2✔
1977
                        clusterIPv4 = ip
1✔
1978
                } else if ip.To16() != nil && strings.Contains(ip.String(), ":") && clusterIPv6 == nil {
3✔
1979
                        clusterIPv6 = ip
1✔
1980
                }
1✔
1981
        }
1982
        if clusterIPv4 != nil && ipv4 == nil {
2✔
1983
                if len(requestedIps) < 1 {
2✔
1984
                        ipv4, _ = cont.serviceIps.AllocateIp(true)
1✔
1985
                        if ipv4 != nil {
2✔
1986
                                ingressIps = append(ingressIps, ipv4)
1✔
1987
                        }
1✔
1988
                }
1989
        } else if clusterIPv4 == nil && ipv4 != nil {
1✔
1990
                cont.removeIpFromIngressIPList(&ingressIps, ipv4)
×
1991
        }
×
1992

1993
        if clusterIPv6 != nil && ipv6 == nil {
2✔
1994
                if len(requestedIps) < 1 {
2✔
1995
                        ipv6, _ = cont.serviceIps.AllocateIp(false)
1✔
1996
                        if ipv6 != nil {
2✔
1997
                                ingressIps = append(ingressIps, ipv6)
1✔
1998
                        }
1✔
1999
                }
2000
        } else if clusterIPv6 == nil && ipv6 != nil {
1✔
2001
                cont.removeIpFromIngressIPList(&ingressIps, ipv6)
×
2002
        }
×
2003

2004
        if len(requestedIps) < 1 {
2✔
2005
                meta.ingressIps = ingressIps
1✔
2006
        }
1✔
2007
        if ipv4 == nil && ipv6 == nil {
2✔
2008
                logger.Error("No IP addresses available for service")
1✔
2009
                cont.indexMutex.Unlock()
1✔
2010
                return true
1✔
2011
        }
1✔
2012
        cont.indexMutex.Unlock()
1✔
2013
        var newIngress []v1.LoadBalancerIngress
1✔
2014
        for _, ip := range meta.ingressIps {
2✔
2015
                newIngress = append(newIngress, v1.LoadBalancerIngress{IP: ip.String()})
1✔
2016
        }
1✔
2017
        for _, ip := range meta.staticIngressIps {
2✔
2018
                newIngress = append(newIngress, v1.LoadBalancerIngress{IP: ip.String()})
1✔
2019
        }
1✔
2020

2021
        ipUpdated := false
1✔
2022
        if !reflect.DeepEqual(newIngress, service.Status.LoadBalancer.Ingress) {
2✔
2023
                service.Status.LoadBalancer.Ingress = newIngress
1✔
2024

1✔
2025
                logger.WithFields(logrus.Fields{
1✔
2026
                        "status": service.Status.LoadBalancer.Ingress,
1✔
2027
                }).Info("Updating service load balancer status")
1✔
2028

1✔
2029
                ipUpdated = true
1✔
2030
        }
1✔
2031

2032
        success := true
1✔
2033
        reason := "Success"
1✔
2034
        message := ""
1✔
2035
        if len(requestedIps) > 0 && len(requestedIps) != len(meta.staticIngressIps) {
1✔
2036
                success = false
×
2037
                reason = "OneIpNotAllocatable"
×
2038
                message = "One of the requested Ips is not allocatable"
×
2039
        }
×
2040
        condUpdated := cont.updateServiceCondition(service, success, reason, message)
1✔
2041
        if ipUpdated || condUpdated {
2✔
2042
                _, err := cont.updateServiceStatus(service)
1✔
2043
                if err != nil {
1✔
2044
                        logger.Error("Failed to update service status: ", err)
×
2045
                        return true
×
2046
                }
×
2047
        }
2048
        return false
1✔
2049
}
2050

2051
func (cont *AciController) handleServiceDelete(servicekey string) bool {
1✔
2052
        if cont.config.ChainedMode {
1✔
2053
                return false
×
2054
        }
×
2055
        cont.clearLbService(servicekey)
1✔
2056
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("service-vmm",
1✔
2057
                servicekey))
1✔
2058
        return false
1✔
2059
}
2060

2061
func (cont *AciController) handleServiceUpdate(service *v1.Service) bool {
1✔
2062
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2063
        if err != nil {
1✔
2064
                serviceLogger(cont.log, service).
×
2065
                        Error("Could not create service key: ", err)
×
2066
                return false
×
2067
        }
×
2068
        if cont.config.ChainedMode {
1✔
2069
                return false
×
2070
        }
×
2071
        var requeue bool
1✔
2072
        isLoadBalancer := service.Spec.Type == v1.ServiceTypeLoadBalancer
1✔
2073
        if isLoadBalancer {
2✔
2074
                if *cont.config.AllocateServiceIps {
2✔
2075
                        requeue = cont.allocateServiceIps(servicekey, service)
1✔
2076
                }
1✔
2077
                cont.indexMutex.Lock()
1✔
2078
                if cont.serviceSyncEnabled {
2✔
2079
                        cont.indexMutex.Unlock()
1✔
2080
                        err = cont.updateServiceDeviceInstance(servicekey, service)
1✔
2081
                        if err != nil {
1✔
2082
                                serviceLogger(cont.log, service).
×
2083
                                        Error("Failed to update service device Instance: ", err)
×
2084
                                return true
×
2085
                        }
×
2086
                } else {
1✔
2087
                        cont.indexMutex.Unlock()
1✔
2088
                }
1✔
2089
        } else {
1✔
2090
                cont.clearLbService(servicekey)
1✔
2091
        }
1✔
2092
        cont.writeApicSvc(servicekey, service)
1✔
2093
        return requeue
1✔
2094
}
2095

2096
func (cont *AciController) clearLbService(servicekey string) {
1✔
2097
        cont.indexMutex.Lock()
1✔
2098
        if meta, ok := cont.serviceMetaCache[servicekey]; ok {
2✔
2099
                cont.returnServiceIps(meta.ingressIps)
1✔
2100
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
1✔
2101
                delete(cont.serviceMetaCache, servicekey)
1✔
2102
        }
1✔
2103
        cont.indexMutex.Unlock()
1✔
2104
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("svc", servicekey))
1✔
2105
}
2106

2107
func getEndpointsIps(endpoints *v1.Endpoints) map[string]bool {
1✔
2108
        ips := make(map[string]bool)
1✔
2109
        for _, subset := range endpoints.Subsets {
2✔
2110
                for _, addr := range subset.Addresses {
2✔
2111
                        ips[addr.IP] = true
1✔
2112
                }
1✔
2113
                for _, addr := range subset.NotReadyAddresses {
1✔
2114
                        ips[addr.IP] = true
×
2115
                }
×
2116
        }
2117
        return ips
1✔
2118
}
2119

2120
func getServiceTargetPorts(service *v1.Service) map[string]targetPort {
1✔
2121
        ports := make(map[string]targetPort)
1✔
2122
        for _, port := range service.Spec.Ports {
2✔
2123
                portNum := port.TargetPort.IntValue()
1✔
2124
                if portNum <= 0 {
2✔
2125
                        portNum = int(port.Port)
1✔
2126
                }
1✔
2127
                key := portProto(&port.Protocol) + "-num-" + strconv.Itoa(portNum)
1✔
2128
                ports[key] = targetPort{
1✔
2129
                        proto: port.Protocol,
1✔
2130
                        ports: []int{portNum},
1✔
2131
                }
1✔
2132
        }
2133
        return ports
1✔
2134
}
2135

2136
func (cont *AciController) endpointsAdded(obj interface{}) {
1✔
2137
        endpoints := obj.(*v1.Endpoints)
1✔
2138
        servicekey, err := cache.MetaNamespaceKeyFunc(obj.(*v1.Endpoints))
1✔
2139
        if err != nil {
1✔
2140
                cont.log.Error("Could not create service key: ", err)
×
2141
                return
×
2142
        }
×
2143

2144
        ips := getEndpointsIps(endpoints)
1✔
2145
        cont.indexMutex.Lock()
1✔
2146
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
2147
        cont.queueIPNetPolUpdates(ips)
1✔
2148
        cont.indexMutex.Unlock()
1✔
2149

1✔
2150
        cont.queueEndpointsNetPolUpdates(endpoints)
1✔
2151

1✔
2152
        cont.queueServiceUpdateByKey(servicekey)
1✔
2153
}
2154

2155
func (cont *AciController) endpointsDeleted(obj interface{}) {
1✔
2156
        endpoints, isEndpoints := obj.(*v1.Endpoints)
1✔
2157
        if !isEndpoints {
1✔
2158
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2159
                if !ok {
×
2160
                        cont.log.Error("Received unexpected object: ", obj)
×
2161
                        return
×
2162
                }
×
2163
                endpoints, ok = deletedState.Obj.(*v1.Endpoints)
×
2164
                if !ok {
×
2165
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpoints object: ", deletedState.Obj)
×
2166
                        return
×
2167
                }
×
2168
        }
2169
        servicekey, err := cache.MetaNamespaceKeyFunc(endpoints)
1✔
2170
        if err != nil {
1✔
2171
                cont.log.Error("Could not create service key: ", err)
×
2172
                return
×
2173
        }
×
2174

2175
        ips := getEndpointsIps(endpoints)
1✔
2176
        cont.indexMutex.Lock()
1✔
2177
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
2178
        cont.queueIPNetPolUpdates(ips)
1✔
2179
        cont.indexMutex.Unlock()
1✔
2180

1✔
2181
        cont.queueEndpointsNetPolUpdates(endpoints)
1✔
2182

1✔
2183
        cont.queueServiceUpdateByKey(servicekey)
1✔
2184
}
2185

2186
func (cont *AciController) endpointsUpdated(oldEps, newEps interface{}) {
1✔
2187
        oldendpoints := oldEps.(*v1.Endpoints)
1✔
2188
        newendpoints := newEps.(*v1.Endpoints)
1✔
2189
        servicekey, err := cache.MetaNamespaceKeyFunc(newendpoints)
1✔
2190
        if err != nil {
1✔
2191
                cont.log.Error("Could not create service key: ", err)
×
2192
                return
×
2193
        }
×
2194

2195
        oldIps := getEndpointsIps(oldendpoints)
1✔
2196
        newIps := getEndpointsIps(newendpoints)
1✔
2197
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
2198
                cont.indexMutex.Lock()
1✔
2199
                cont.queueIPNetPolUpdates(oldIps)
1✔
2200
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
2201
                cont.queueIPNetPolUpdates(newIps)
1✔
2202
                cont.indexMutex.Unlock()
1✔
2203
        }
1✔
2204

2205
        if !reflect.DeepEqual(oldendpoints.Subsets, newendpoints.Subsets) {
2✔
2206
                cont.queueEndpointsNetPolUpdates(oldendpoints)
1✔
2207
                cont.queueEndpointsNetPolUpdates(newendpoints)
1✔
2208
        }
1✔
2209

2210
        cont.queueServiceUpdateByKey(servicekey)
1✔
2211
}
2212

2213
func (cont *AciController) serviceAdded(obj interface{}) {
1✔
2214
        service := obj.(*v1.Service)
1✔
2215
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2216
        if err != nil {
1✔
2217
                serviceLogger(cont.log, service).
×
2218
                        Error("Could not create service key: ", err)
×
2219
                return
×
2220
        }
×
2221

2222
        ports := getServiceTargetPorts(service)
1✔
2223
        cont.indexMutex.Lock()
1✔
2224
        cont.queuePortNetPolUpdates(ports)
1✔
2225
        cont.updateTargetPortIndex(true, servicekey, nil, ports)
1✔
2226
        cont.indexMutex.Unlock()
1✔
2227

1✔
2228
        cont.queueServiceUpdateByKey(servicekey)
1✔
2229
}
2230

2231
func (cont *AciController) serviceUpdated(oldSvc, newSvc interface{}) {
1✔
2232
        oldservice := oldSvc.(*v1.Service)
1✔
2233
        newservice := newSvc.(*v1.Service)
1✔
2234
        servicekey, err := cache.MetaNamespaceKeyFunc(newservice)
1✔
2235
        if err != nil {
1✔
2236
                serviceLogger(cont.log, newservice).
×
2237
                        Error("Could not create service key: ", err)
×
2238
                return
×
2239
        }
×
2240
        oldPorts := getServiceTargetPorts(oldservice)
1✔
2241
        newPorts := getServiceTargetPorts(newservice)
1✔
2242
        if !reflect.DeepEqual(oldPorts, newPorts) {
1✔
2243
                cont.indexMutex.Lock()
×
2244
                cont.queuePortNetPolUpdates(oldPorts)
×
2245
                cont.updateTargetPortIndex(true, servicekey, oldPorts, newPorts)
×
2246
                cont.queuePortNetPolUpdates(newPorts)
×
2247
                cont.indexMutex.Unlock()
×
2248
        }
×
2249
        cont.queueServiceUpdateByKey(servicekey)
1✔
2250
}
2251

2252
func (cont *AciController) serviceDeleted(obj interface{}) {
1✔
2253
        service, isService := obj.(*v1.Service)
1✔
2254
        if !isService {
1✔
2255
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2256
                if !ok {
×
2257
                        serviceLogger(cont.log, service).
×
2258
                                Error("Received unexpected object: ", obj)
×
2259
                        return
×
2260
                }
×
2261
                service, ok = deletedState.Obj.(*v1.Service)
×
2262
                if !ok {
×
2263
                        serviceLogger(cont.log, service).
×
2264
                                Error("DeletedFinalStateUnknown contained non-Services object: ", deletedState.Obj)
×
2265
                        return
×
2266
                }
×
2267
        }
2268
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2269
        if err != nil {
1✔
2270
                serviceLogger(cont.log, service).
×
2271
                        Error("Could not create service key: ", err)
×
2272
                return
×
2273
        }
×
2274

2275
        ports := getServiceTargetPorts(service)
1✔
2276
        cont.indexMutex.Lock()
1✔
2277
        cont.updateTargetPortIndex(true, servicekey, ports, nil)
1✔
2278
        cont.queuePortNetPolUpdates(ports)
1✔
2279
        delete(cont.snatServices, servicekey)
1✔
2280
        cont.indexMutex.Unlock()
1✔
2281

1✔
2282
        deletedServiceKey := "DELETED_" + servicekey
1✔
2283
        cont.queueServiceUpdateByKey(deletedServiceKey)
1✔
2284
}
2285

2286
func (cont *AciController) serviceFullSync() {
1✔
2287
        cache.ListAll(cont.serviceIndexer, labels.Everything(),
1✔
2288
                func(sobj interface{}) {
2✔
2289
                        cont.queueServiceUpdate(sobj.(*v1.Service))
1✔
2290
                })
1✔
2291
}
2292

2293
func (cont *AciController) getEndpointSliceIps(endpointSlice *discovery.EndpointSlice) map[string]bool {
1✔
2294
        ips := make(map[string]bool)
1✔
2295
        for _, endpoints := range endpointSlice.Endpoints {
2✔
2296
                for _, addr := range endpoints.Addresses {
2✔
2297
                        ips[addr] = true
1✔
2298
                }
1✔
2299
        }
2300
        return ips
1✔
2301
}
2302

2303
func (cont *AciController) notReadyEndpointPresent(endpointSlice *discovery.EndpointSlice) bool {
×
2304
        for _, endpoints := range endpointSlice.Endpoints {
×
2305
                if (endpoints.Conditions.Ready != nil && !*endpoints.Conditions.Ready) &&
×
2306
                        (endpoints.Conditions.Terminating == nil || !*endpoints.Conditions.Terminating) {
×
2307
                        return true
×
2308
                }
×
2309
        }
2310
        return false
×
2311
}
2312

2313
func (cont *AciController) getEndpointSliceEpIps(endpoints *discovery.Endpoint) map[string]bool {
×
2314
        ips := make(map[string]bool)
×
2315
        for _, addr := range endpoints.Addresses {
×
2316
                ips[addr] = true
×
2317
        }
×
2318
        return ips
×
2319
}
2320

2321
func (cont *AciController) processDelayedEpSlices() {
1✔
2322
        var processEps []DelayedEpSlice
1✔
2323
        cont.indexMutex.Lock()
1✔
2324
        for i := 0; i < len(cont.delayedEpSlices); i++ {
1✔
2325
                delayedepslice := cont.delayedEpSlices[i]
×
2326
                if time.Now().After(delayedepslice.DelayedTime) {
×
2327
                        var toprocess DelayedEpSlice
×
2328
                        err := util.DeepCopyObj(&delayedepslice, &toprocess)
×
2329
                        if err != nil {
×
2330
                                cont.log.Error(err)
×
2331
                                continue
×
2332
                        }
2333
                        processEps = append(processEps, toprocess)
×
2334
                        cont.delayedEpSlices = append(cont.delayedEpSlices[:i], cont.delayedEpSlices[i+1:]...)
×
2335
                }
2336
        }
2337

2338
        cont.indexMutex.Unlock()
1✔
2339
        for _, epslice := range processEps {
1✔
2340
                //ignore the epslice if newly added endpoint is not ready
×
2341
                if cont.notReadyEndpointPresent(epslice.NewEpSlice) {
×
2342
                        cont.log.Debug("Ignoring the update as the new endpoint is not ready : ", epslice.NewEpSlice)
×
2343
                } else {
×
2344
                        cont.log.Debug("Processing update of epslice : ", epslice.NewEpSlice)
×
2345
                        cont.doendpointSliceUpdated(epslice.OldEpSlice, epslice.NewEpSlice)
×
2346
                }
×
2347
        }
2348
}
2349

2350
func (cont *AciController) endpointSliceAdded(obj interface{}) {
1✔
2351
        endpointslice, ok := obj.(*discovery.EndpointSlice)
1✔
2352
        if !ok {
1✔
2353
                cont.log.Error("error processing Endpointslice object: ", obj)
×
2354
                return
×
2355
        }
×
2356
        servicekey, valid := getServiceKey(endpointslice)
1✔
2357
        if !valid {
1✔
2358
                return
×
2359
        }
×
2360
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
2361
        cont.indexMutex.Lock()
1✔
2362
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
2363
        cont.queueIPNetPolUpdates(ips)
1✔
2364
        cont.indexMutex.Unlock()
1✔
2365

1✔
2366
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
2367

1✔
2368
        cont.queueServiceUpdateByKey(servicekey)
1✔
2369
        cont.log.Info("EndPointSlice Object Added: ", servicekey)
1✔
2370
}
2371

2372
func (cont *AciController) endpointSliceDeleted(obj interface{}) {
1✔
2373
        endpointslice, isEndpointslice := obj.(*discovery.EndpointSlice)
1✔
2374
        if !isEndpointslice {
1✔
2375
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2376
                if !ok {
×
2377
                        cont.log.Error("Received unexpected object: ", obj)
×
2378
                        return
×
2379
                }
×
2380
                endpointslice, ok = deletedState.Obj.(*discovery.EndpointSlice)
×
2381
                if !ok {
×
2382
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpointslice object: ", deletedState.Obj)
×
2383
                        return
×
2384
                }
×
2385
        }
2386
        servicekey, valid := getServiceKey(endpointslice)
1✔
2387
        if !valid {
1✔
2388
                return
×
2389
        }
×
2390
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
2391
        cont.indexMutex.Lock()
1✔
2392
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
2393
        cont.queueIPNetPolUpdates(ips)
1✔
2394
        cont.indexMutex.Unlock()
1✔
2395
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
2396
        cont.queueServiceUpdateByKey(servicekey)
1✔
2397
}
2398

2399
// Checks if the given service is present in the user configured list of services
2400
// for pbr delay and if present, returns the servie specific delay if configured
2401
func (cont *AciController) svcInAddDelayList(name, ns string) (int, bool) {
×
2402
        for _, svc := range cont.config.ServiceGraphEndpointAddDelay.Services {
×
2403
                if svc.Name == name && svc.Namespace == ns {
×
2404
                        return svc.Delay, true
×
2405
                }
×
2406
        }
2407
        return 0, false
×
2408
}
2409

2410
// Check if the endpointslice update notification has any deletion of enpoint
2411
func (cont *AciController) isDeleteEndpointSlice(oldendpointslice, newendpointslice *discovery.EndpointSlice) bool {
×
2412
        del := false
×
2413

×
2414
        // if any endpoint is removed from endpontslice
×
2415
        if len(newendpointslice.Endpoints) < len(oldendpointslice.Endpoints) {
×
2416
                del = true
×
2417
        }
×
2418

2419
        if !del {
×
2420
                // if any one of the endpoint is in terminating state
×
2421
                for _, endpoint := range newendpointslice.Endpoints {
×
2422
                        if endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating {
×
2423
                                del = true
×
2424
                                break
×
2425
                        }
2426
                }
2427
        }
2428
        if !del {
×
2429
                // if any one of endpoint moved from ready state to not-ready state
×
2430
                for ix := range oldendpointslice.Endpoints {
×
2431
                        oldips := cont.getEndpointSliceEpIps(&oldendpointslice.Endpoints[ix])
×
2432
                        for newIx := range newendpointslice.Endpoints {
×
2433
                                newips := cont.getEndpointSliceEpIps(&newendpointslice.Endpoints[newIx])
×
2434
                                if reflect.DeepEqual(oldips, newips) {
×
2435
                                        if (oldendpointslice.Endpoints[ix].Conditions.Ready != nil && *oldendpointslice.Endpoints[ix].Conditions.Ready) &&
×
2436
                                                (newendpointslice.Endpoints[newIx].Conditions.Ready != nil && !*newendpointslice.Endpoints[newIx].Conditions.Ready) {
×
2437
                                                del = true
×
2438
                                        }
×
2439
                                        break
×
2440
                                }
2441
                        }
2442
                }
2443
        }
2444
        return del
×
2445
}
2446

2447
func (cont *AciController) doendpointSliceUpdatedDelay(oldendpointslice *discovery.EndpointSlice,
2448
        newendpointslice *discovery.EndpointSlice) {
×
2449
        svc, ns, valid := getServiceNameAndNs(newendpointslice)
×
2450
        if !valid {
×
2451
                return
×
2452
        }
×
2453
        svckey, valid := getServiceKey(newendpointslice)
×
2454
        if !valid {
×
2455
                return
×
2456
        }
×
2457
        delay := cont.config.ServiceGraphEndpointAddDelay.Delay
×
2458
        svcDelay, exists := cont.svcInAddDelayList(svc, ns)
×
2459
        if svcDelay > 0 {
×
2460
                delay = svcDelay
×
2461
        }
×
2462
        delayedsvc := exists && delay > 0
×
2463
        if delayedsvc {
×
2464
                cont.log.Debug("Delay of ", delay, " seconds is applicable for svc :", svc, " in ns: ", ns)
×
2465
                var delayedepslice DelayedEpSlice
×
2466
                delayedepslice.OldEpSlice = oldendpointslice
×
2467
                delayedepslice.ServiceKey = svckey
×
2468
                delayedepslice.NewEpSlice = newendpointslice
×
2469
                currentTime := time.Now()
×
2470
                delayedepslice.DelayedTime = currentTime.Add(time.Duration(delay) * time.Second)
×
2471
                cont.indexMutex.Lock()
×
2472
                cont.delayedEpSlices = append(cont.delayedEpSlices, &delayedepslice)
×
2473
                cont.indexMutex.Unlock()
×
2474
        } else {
×
2475
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
2476
        }
×
2477

2478
        if delayedsvc && cont.isDeleteEndpointSlice(oldendpointslice, newendpointslice) {
×
2479
                cont.log.Debug("Proceeding by ignoring delay as the update is due to delete of endpoint")
×
2480
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
2481
        }
×
2482
}
2483

2484
func (cont *AciController) endpointSliceUpdated(oldobj, newobj interface{}) {
1✔
2485
        oldendpointslice, ok := oldobj.(*discovery.EndpointSlice)
1✔
2486
        if !ok {
1✔
2487
                cont.log.Error("error processing Endpointslice object: ", oldobj)
×
2488
                return
×
2489
        }
×
2490
        newendpointslice, ok := newobj.(*discovery.EndpointSlice)
1✔
2491
        if !ok {
1✔
2492
                cont.log.Error("error processing Endpointslice object: ", newobj)
×
2493
                return
×
2494
        }
×
2495
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
2496
                cont.doendpointSliceUpdatedDelay(oldendpointslice, newendpointslice)
×
2497
        } else {
1✔
2498
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
1✔
2499
        }
1✔
2500
}
2501

2502
func (cont *AciController) doendpointSliceUpdated(oldendpointslice *discovery.EndpointSlice,
2503
        newendpointslice *discovery.EndpointSlice) {
1✔
2504
        servicekey, valid := getServiceKey(newendpointslice)
1✔
2505
        if !valid {
1✔
2506
                return
×
2507
        }
×
2508
        oldIps := cont.getEndpointSliceIps(oldendpointslice)
1✔
2509
        newIps := cont.getEndpointSliceIps(newendpointslice)
1✔
2510
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
2511
                cont.indexMutex.Lock()
1✔
2512
                cont.queueIPNetPolUpdates(oldIps)
1✔
2513
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
2514
                cont.queueIPNetPolUpdates(newIps)
1✔
2515
                cont.indexMutex.Unlock()
1✔
2516
        }
1✔
2517

2518
        if !reflect.DeepEqual(oldendpointslice.Endpoints, newendpointslice.Endpoints) {
2✔
2519
                cont.queueEndpointSliceNetPolUpdates(oldendpointslice)
1✔
2520
                cont.queueEndpointSliceNetPolUpdates(newendpointslice)
1✔
2521
        }
1✔
2522
        cont.log.Debug("EndPointSlice Object Update: ", servicekey)
1✔
2523
        cont.queueServiceUpdateByKey(servicekey)
1✔
2524
}
2525

2526
func (cont *AciController) queueEndpointSliceNetPolUpdates(endpointslice *discovery.EndpointSlice) {
1✔
2527
        for _, endpoint := range endpointslice.Endpoints {
2✔
2528
                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" ||
1✔
2529
                        endpoint.TargetRef.Namespace == "" || endpoint.TargetRef.Name == "" {
2✔
2530
                        continue
1✔
2531
                }
2532
                if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
1✔
2533
                        continue
×
2534
                }
2535
                podkey := endpoint.TargetRef.Namespace + "/" + endpoint.TargetRef.Name
1✔
2536
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
2537
                ps := make(map[string]bool)
1✔
2538
                for _, npkey := range npkeys {
2✔
2539
                        cont.queueNetPolUpdateByKey(npkey)
1✔
2540
                }
1✔
2541
                // Process if the  any matching namedport wildcard policy is present
2542
                // ignore np already processed policies
2543
                cont.queueMatchingNamedNp(ps, podkey)
1✔
2544
        }
2545
}
2546

2547
func getServiceKey(endPointSlice *discovery.EndpointSlice) (string, bool) {
1✔
2548
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
1✔
2549
        if !ok {
1✔
2550
                return "", false
×
2551
        }
×
2552
        return endPointSlice.ObjectMeta.Namespace + "/" + serviceName, true
1✔
2553
}
2554

2555
func getServiceNameAndNs(endPointSlice *discovery.EndpointSlice) (string, string, bool) {
×
2556
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
×
2557
        if !ok {
×
2558
                return "", "", false
×
2559
        }
×
2560
        return serviceName, endPointSlice.ObjectMeta.Namespace, true
×
2561
}
2562

2563
// can be called with index lock
2564
func (sep *serviceEndpoint) UpdateServicesForNode(nodename string) {
1✔
2565
        cont := sep.cont
1✔
2566
        cache.ListAll(cont.endpointsIndexer, labels.Everything(),
1✔
2567
                func(endpointsobj interface{}) {
2✔
2568
                        endpoints := endpointsobj.(*v1.Endpoints)
1✔
2569
                        for _, subset := range endpoints.Subsets {
2✔
2570
                                for _, addr := range subset.Addresses {
2✔
2571
                                        if addr.NodeName != nil && *addr.NodeName == nodename {
2✔
2572
                                                servicekey, err :=
1✔
2573
                                                        cache.MetaNamespaceKeyFunc(endpointsobj.(*v1.Endpoints))
1✔
2574
                                                if err != nil {
1✔
2575
                                                        cont.log.Error("Could not create endpoints key: ", err)
×
2576
                                                        return
×
2577
                                                }
×
2578
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
2579
                                                return
1✔
2580
                                        }
2581
                                }
2582
                        }
2583
                })
2584
}
2585

2586
func (seps *serviceEndpointSlice) UpdateServicesForNode(nodename string) {
1✔
2587
        // 1. List all the endpointslice and check for matching nodename
1✔
2588
        // 2. if it matches trigger the Service update and mark it visited
1✔
2589
        cont := seps.cont
1✔
2590
        visited := make(map[string]bool)
1✔
2591
        cache.ListAll(cont.endpointSliceIndexer, labels.Everything(),
1✔
2592
                func(endpointSliceobj interface{}) {
2✔
2593
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2594
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2595
                                if endpoint.NodeName != nil && *endpoint.NodeName == nodename {
2✔
2596
                                        servicekey, valid := getServiceKey(endpointSlices)
1✔
2597
                                        if !valid {
1✔
2598
                                                return
×
2599
                                        }
×
2600
                                        if _, ok := visited[servicekey]; !ok {
2✔
2601
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
2602
                                                visited[servicekey] = true
1✔
2603
                                                return
1✔
2604
                                        }
1✔
2605
                                }
2606
                        }
2607
                })
2608
}
2609
func (cont *AciController) setNodeMap(nodeMap map[string]*metadata.ServiceEndpoint, nodeName string) {
1✔
2610
        nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
2611
        if !ok {
2✔
2612
                return
1✔
2613
        }
1✔
2614
        _, ok = cont.fabricPathForNode(nodeName)
1✔
2615
        if !ok {
2✔
2616
                return
1✔
2617
        }
1✔
2618
        nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
2619
}
2620

2621
// 2 cases when epslices corresponding to given service is presnt in delayedEpSlices:
2622
//  1. endpoint not present in delayedEpSlices of the service
2623
//  2. endpoint present in delayedEpSlices of the service but in not ready state
2624
//
2625
// indexMutex lock must be acquired before calling the function
2626
func (cont *AciController) isDelayedEndpoint(endpoint *discovery.Endpoint, svckey string) bool {
×
2627
        delayed := false
×
2628
        endpointips := cont.getEndpointSliceEpIps(endpoint)
×
2629
        for _, delayedepslices := range cont.delayedEpSlices {
×
2630
                if delayedepslices.ServiceKey == svckey {
×
2631
                        var found bool
×
2632
                        epslice := delayedepslices.OldEpSlice
×
2633
                        for ix := range epslice.Endpoints {
×
2634
                                epips := cont.getEndpointSliceEpIps(&epslice.Endpoints[ix])
×
2635
                                if reflect.DeepEqual(endpointips, epips) {
×
2636
                                        // case 2
×
2637
                                        if epslice.Endpoints[ix].Conditions.Ready != nil && !*epslice.Endpoints[ix].Conditions.Ready {
×
2638
                                                delayed = true
×
2639
                                        }
×
2640
                                        found = true
×
2641
                                }
2642
                        }
2643
                        // case 1
2644
                        if !found {
×
2645
                                delayed = true
×
2646
                        }
×
2647
                }
2648
        }
2649
        return delayed
×
2650
}
2651

2652
// set nodemap only if endoint is ready and not in delayedEpSlices
2653
func (cont *AciController) setNodeMapDelay(nodeMap map[string]*metadata.ServiceEndpoint,
2654
        endpoint *discovery.Endpoint, service *v1.Service) {
×
2655
        svckey, err := cache.MetaNamespaceKeyFunc(service)
×
2656
        if err != nil {
×
2657
                cont.log.Error("Could not create service key: ", err)
×
2658
                return
×
2659
        }
×
2660
        if cont.config.NoWaitForServiceEpReadiness ||
×
2661
                (endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready) {
×
2662
                if endpoint.NodeName != nil && *endpoint.NodeName != "" {
×
2663
                        // donot setNodeMap for endpoint if:
×
2664
                        //   endpoint is newly added
×
2665
                        //   endpoint status changed from not ready to ready
×
2666
                        if !cont.isDelayedEndpoint(endpoint, svckey) {
×
2667
                                cont.setNodeMap(nodeMap, *endpoint.NodeName)
×
2668
                        }
×
2669
                }
2670
        }
2671
}
2672

2673
func (sep *serviceEndpoint) GetnodesMetadata(key string,
2674
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
2675
        cont := sep.cont
1✔
2676
        endpointsobj, exists, err := cont.endpointsIndexer.GetByKey(key)
1✔
2677
        if err != nil {
1✔
2678
                cont.log.Error("Could not lookup endpoints for " +
×
2679
                        key + ": " + err.Error())
×
2680
        }
×
2681
        if exists && endpointsobj != nil {
2✔
2682
                endpoints := endpointsobj.(*v1.Endpoints)
1✔
2683
                for _, subset := range endpoints.Subsets {
2✔
2684
                        for _, addr := range subset.Addresses {
2✔
2685
                                if addr.NodeName == nil {
2✔
2686
                                        continue
1✔
2687
                                }
2688
                                cont.setNodeMap(nodeMap, *addr.NodeName)
1✔
2689
                        }
2690
                }
2691
        }
2692
        cont.log.Info("NodeMap: ", nodeMap)
1✔
2693
}
2694

2695
func (seps *serviceEndpointSlice) GetnodesMetadata(key string,
2696
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
2697
        cont := seps.cont
1✔
2698
        // 1. Get all the Endpoint slices matching the label service-name
1✔
2699
        // 2. update the node map matching with endpoints nodes name
1✔
2700
        label := map[string]string{discovery.LabelServiceName: service.ObjectMeta.Name}
1✔
2701
        selector := labels.SelectorFromSet(label)
1✔
2702
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2703
                func(endpointSliceobj interface{}) {
2✔
2704
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2705
                        for ix := range endpointSlices.Endpoints {
2✔
2706
                                if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
2707
                                        cont.setNodeMapDelay(nodeMap, &endpointSlices.Endpoints[ix], service)
×
2708
                                } else if cont.config.NoWaitForServiceEpReadiness ||
1✔
2709
                                        (endpointSlices.Endpoints[ix].Conditions.Ready != nil && *endpointSlices.Endpoints[ix].Conditions.Ready) {
2✔
2710
                                        if endpointSlices.Endpoints[ix].NodeName != nil && *endpointSlices.Endpoints[ix].NodeName != "" {
2✔
2711
                                                cont.setNodeMap(nodeMap, *endpointSlices.Endpoints[ix].NodeName)
1✔
2712
                                        }
1✔
2713
                                }
2714
                        }
2715
                })
2716
        cont.log.Debug("NodeMap: ", nodeMap)
1✔
2717
}
2718

2719
func (sep *serviceEndpoint) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2720
        cont := sep.cont
1✔
2721
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
2722
        if err != nil {
1✔
2723
                serviceLogger(cont.log, service).
×
2724
                        Error("Could not create service key: ", err)
×
2725
                return false
×
2726
        }
×
2727
        endpointsobj, _, err := cont.endpointsIndexer.GetByKey(key)
1✔
2728
        if err != nil {
1✔
2729
                cont.log.Error("Could not lookup endpoints for " +
×
2730
                        key + ": " + err.Error())
×
2731
                return false
×
2732
        }
×
2733
        if endpointsobj != nil {
2✔
2734
                for _, subset := range endpointsobj.(*v1.Endpoints).Subsets {
2✔
2735
                        for _, addr := range subset.Addresses {
2✔
2736
                                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" {
1✔
2737
                                        continue
×
2738
                                }
2739
                                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(),
1✔
2740
                                        addr.TargetRef.Name))
1✔
2741
                        }
2742
                }
2743
        }
2744
        return true
1✔
2745
}
2746

2747
func (seps *serviceEndpointSlice) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2748
        cont := seps.cont
1✔
2749
        label := map[string]string{discovery.LabelServiceName: service.ObjectMeta.Name}
1✔
2750
        selector := labels.SelectorFromSet(label)
1✔
2751
        epcount := 0
1✔
2752
        childs := make(map[string]struct{})
1✔
2753
        var exists = struct{}{}
1✔
2754
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2755
                func(endpointSliceobj interface{}) {
2✔
2756
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2757
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2758
                                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" {
1✔
2759
                                        continue
×
2760
                                }
2761
                                epcount++
1✔
2762
                                childs[endpoint.TargetRef.Name] = exists
1✔
2763
                                cont.log.Debug("EndPoint added: ", endpoint.TargetRef.Name)
1✔
2764
                        }
2765
                })
2766
        for child := range childs {
2✔
2767
                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(), child))
1✔
2768
        }
1✔
2769
        return epcount != 0
1✔
2770
}
2771

2772
func getProtocolStr(proto v1.Protocol) string {
1✔
2773
        var protostring string
1✔
2774
        switch proto {
1✔
2775
        case v1.ProtocolUDP:
1✔
2776
                protostring = "udp"
1✔
2777
        case v1.ProtocolTCP:
1✔
2778
                protostring = "tcp"
1✔
2779
        case v1.ProtocolSCTP:
×
2780
                protostring = "sctp"
×
2781
        default:
×
2782
                protostring = "tcp"
×
2783
        }
2784
        return protostring
1✔
2785
}
2786

2787
func (cont *AciController) removeIpFromIngressIPList(ingressIps *[]net.IP, ip net.IP) {
×
2788
        cont.returnServiceIps([]net.IP{ip})
×
2789
        index := -1
×
2790
        for i, v := range *ingressIps {
×
2791
                if v.Equal(ip) {
×
2792
                        index = i
×
2793
                        break
×
2794
                }
2795
        }
2796
        if index == -1 {
×
2797
                return
×
2798
        }
×
2799
        *ingressIps = append((*ingressIps)[:index], (*ingressIps)[index+1:]...)
×
2800
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc