• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 11758

30 Mar 2026 12:15PM UTC coverage: 63.271% (-0.04%) from 63.308%
11758

Pull #1697

travis-pro

allenantony-oc
Fix OpenStack system ID lookup for ACI 6.2.x

In ACI 6.2.x, opflexIDEp.containerName changed from node name
to VM UUID, breaking setOpenStackSystemId which filtered
by containerName.

Instead of relying on containerName for filtering,
the node's uplink MAC is now used.

The hostagent now writes the node's uplink MAC as a node
annotation at startup. The controller reads this MAC and queries
APIC for the OpenStack opflexIDEp with a matching MAC to extract
the system ID.
Pull Request #1697: Fix OpenStack system ID lookup for ACI 6.2.x

1 of 22 new or added lines in 2 files covered. (4.55%)

9 existing lines in 2 files now uncovered.

13585 of 21471 relevant lines covered (63.27%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.38
/pkg/controller/services.go
1
// Copyright 2016 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "errors"
19
        "fmt"
20
        "net"
21
        "reflect"
22
        "regexp"
23
        "sort"
24
        "strconv"
25
        "strings"
26
        "time"
27

28
        "github.com/noironetworks/aci-containers/pkg/apicapi"
29
        "github.com/noironetworks/aci-containers/pkg/metadata"
30
        "github.com/noironetworks/aci-containers/pkg/util"
31
        "github.com/sirupsen/logrus"
32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35
        "k8s.io/apimachinery/pkg/fields"
36
        "k8s.io/apimachinery/pkg/labels"
37
        "k8s.io/apimachinery/pkg/util/intstr"
38
        "k8s.io/client-go/kubernetes"
39
        "k8s.io/client-go/tools/cache"
40
)
41

42
// Default service contract scope value
43
const DefaultServiceContractScope = "context"
44

45
// Default service ext subnet scope - enable shared security
46
const DefaultServiceExtSubNetShared = false
47

48
func (cont *AciController) initEndpointsInformerFromClient(
49
        kubeClient kubernetes.Interface) {
×
50
        cont.initEndpointsInformerBase(
×
51
                cache.NewListWatchFromClient(
×
52
                        kubeClient.CoreV1().RESTClient(), "endpoints",
×
53
                        metav1.NamespaceAll, fields.Everything()))
×
54
}
×
55

56
func (cont *AciController) initEndpointSliceInformerFromClient(
57
        kubeClient kubernetes.Interface) {
×
58
        cont.initEndpointSliceInformerBase(
×
59
                cache.NewListWatchFromClient(
×
60
                        kubeClient.DiscoveryV1().RESTClient(), "endpointslices",
×
61
                        metav1.NamespaceAll, fields.Everything()))
×
62
}
×
63

64
func (cont *AciController) initEndpointSliceInformerBase(listWatch *cache.ListWatch) {
1✔
65
        cont.endpointSliceIndexer, cont.endpointSliceInformer = cache.NewIndexerInformer(
1✔
66
                listWatch, &discovery.EndpointSlice{}, 0,
1✔
67
                cache.ResourceEventHandlerFuncs{
1✔
68
                        AddFunc: func(obj interface{}) {
2✔
69
                                cont.endpointSliceAdded(obj)
1✔
70
                        },
1✔
71
                        UpdateFunc: func(oldobj interface{}, newobj interface{}) {
1✔
72
                                cont.endpointSliceUpdated(oldobj, newobj)
1✔
73
                        },
1✔
74
                        DeleteFunc: func(obj interface{}) {
1✔
75
                                cont.endpointSliceDeleted(obj)
1✔
76
                        },
1✔
77
                },
78
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
79
        )
80
}
81

82
func (cont *AciController) initEndpointsInformerBase(listWatch *cache.ListWatch) {
1✔
83
        cont.endpointsIndexer, cont.endpointsInformer = cache.NewIndexerInformer(
1✔
84
                listWatch, &v1.Endpoints{}, 0,
1✔
85
                cache.ResourceEventHandlerFuncs{
1✔
86
                        AddFunc: func(obj interface{}) {
2✔
87
                                cont.endpointsAdded(obj)
1✔
88
                        },
1✔
89
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
90
                                cont.endpointsUpdated(old, new)
1✔
91
                        },
1✔
92
                        DeleteFunc: func(obj interface{}) {
1✔
93
                                cont.endpointsDeleted(obj)
1✔
94
                        },
1✔
95
                },
96
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
97
        )
98
}
99

100
func (cont *AciController) initServiceInformerFromClient(
101
        kubeClient *kubernetes.Clientset) {
×
102
        cont.initServiceInformerBase(
×
103
                cache.NewListWatchFromClient(
×
104
                        kubeClient.CoreV1().RESTClient(), "services",
×
105
                        metav1.NamespaceAll, fields.Everything()))
×
106
}
×
107

108
func (cont *AciController) initServiceInformerBase(listWatch *cache.ListWatch) {
1✔
109
        cont.serviceIndexer, cont.serviceInformer = cache.NewIndexerInformer(
1✔
110
                listWatch, &v1.Service{}, 0,
1✔
111
                cache.ResourceEventHandlerFuncs{
1✔
112
                        AddFunc: func(obj interface{}) {
2✔
113
                                cont.serviceAdded(obj)
1✔
114
                        },
1✔
115
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
116
                                cont.serviceUpdated(old, new)
1✔
117
                        },
1✔
118
                        DeleteFunc: func(obj interface{}) {
×
119
                                cont.serviceDeleted(obj)
×
120
                        },
×
121
                },
122
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
123
        )
124
}
125

126
func serviceLogger(log *logrus.Logger, as *v1.Service) *logrus.Entry {
1✔
127
        return log.WithFields(logrus.Fields{
1✔
128
                "namespace": as.ObjectMeta.Namespace,
1✔
129
                "name":      as.ObjectMeta.Name,
1✔
130
                "type":      as.Spec.Type,
1✔
131
        })
1✔
132
}
1✔
133

134
func (cont *AciController) queueIPNetPolUpdates(ips map[string]bool) {
1✔
135
        for ipStr := range ips {
2✔
136
                ip := net.ParseIP(ipStr)
1✔
137
                if ip == nil {
1✔
138
                        continue
×
139
                }
140
                entries, err := cont.netPolSubnetIndex.ContainingNetworks(ip)
1✔
141
                if err != nil {
1✔
142
                        cont.log.Error("Corrupted network policy IP index, err: ", err)
×
143
                        return
×
144
                }
×
145
                for _, entry := range entries {
2✔
146
                        for npkey := range entry.(*ipIndexEntry).keys {
2✔
147
                                cont.queueNetPolUpdateByKey(npkey)
1✔
148
                        }
1✔
149
                }
150
        }
151
}
152

153
func (cont *AciController) queuePortNetPolUpdates(ports map[string]targetPort) {
1✔
154
        for portkey := range ports {
2✔
155
                entry := cont.targetPortIndex[portkey]
1✔
156
                if entry == nil {
2✔
157
                        continue
1✔
158
                }
159
                for npkey := range entry.networkPolicyKeys {
2✔
160
                        cont.queueNetPolUpdateByKey(npkey)
1✔
161
                }
1✔
162
        }
163
}
164

165
func (cont *AciController) queueNetPolForEpAddrs(addrs []v1.EndpointAddress) {
1✔
166
        for _, addr := range addrs {
2✔
167
                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" ||
1✔
168
                        addr.TargetRef.Namespace == "" || addr.TargetRef.Name == "" {
2✔
169
                        continue
1✔
170
                }
171
                podkey := addr.TargetRef.Namespace + "/" + addr.TargetRef.Name
1✔
172
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
173
                ps := make(map[string]bool)
1✔
174
                for _, npkey := range npkeys {
2✔
175
                        cont.queueNetPolUpdateByKey(npkey)
1✔
176
                        ps[npkey] = true
1✔
177
                }
1✔
178
                // Process if the  any matching namedport wildcard policy is present
179
                // ignore np already processed policies
180
                cont.queueMatchingNamedNp(ps, podkey)
1✔
181
        }
182
}
183

184
func (cont *AciController) queueMatchingNamedNp(served map[string]bool, podkey string) {
1✔
185
        cont.indexMutex.Lock()
1✔
186
        for npkey := range cont.nmPortNp {
2✔
187
                if _, ok := served[npkey]; !ok {
2✔
188
                        if cont.checkPodNmpMatchesNp(npkey, podkey) {
2✔
189
                                cont.queueNetPolUpdateByKey(npkey)
1✔
190
                        }
1✔
191
                }
192
        }
193
        cont.indexMutex.Unlock()
1✔
194
}
195
func (cont *AciController) queueEndpointsNetPolUpdates(endpoints *v1.Endpoints) {
1✔
196
        for _, subset := range endpoints.Subsets {
2✔
197
                cont.queueNetPolForEpAddrs(subset.Addresses)
1✔
198
                cont.queueNetPolForEpAddrs(subset.NotReadyAddresses)
1✔
199
        }
1✔
200
}
201

202
func (cont *AciController) returnServiceIps(ips []net.IP) {
1✔
203
        for _, ip := range ips {
2✔
204
                if ip.To4() != nil {
2✔
205
                        cont.serviceIps.DeallocateIp(ip)
1✔
206
                } else if ip.To16() != nil {
3✔
207
                        cont.serviceIps.DeallocateIp(ip)
1✔
208
                }
1✔
209
        }
210
}
211

212
func returnIps(pool *netIps, ips []net.IP) {
1✔
213
        for _, ip := range ips {
2✔
214
                if ip.To4() != nil {
2✔
215
                        pool.V4.AddIp(ip)
1✔
216
                } else if ip.To16() != nil {
3✔
217
                        pool.V6.AddIp(ip)
1✔
218
                }
1✔
219
        }
220
}
221

222
func (cont *AciController) staticMonPolName() string {
1✔
223
        return cont.aciNameForKey("monPol", cont.env.ServiceBd())
1✔
224
}
1✔
225

226
func (cont *AciController) staticMonPolDn() string {
1✔
227
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
228
                return fmt.Sprintf("uni/tn-%s/ipslaMonitoringPol-%s",
1✔
229
                        cont.config.AciVrfTenant, cont.staticMonPolName())
1✔
230
        }
1✔
231
        return ""
×
232
}
233

234
func (cont *AciController) staticServiceObjs() apicapi.ApicSlice {
1✔
235
        var serviceObjs apicapi.ApicSlice
1✔
236

1✔
237
        // Service bridge domain
1✔
238
        bdName := cont.aciNameForKey("bd", cont.env.ServiceBd())
1✔
239
        bd := apicapi.NewFvBD(cont.config.AciVrfTenant, bdName)
1✔
240
        if apicapi.ApicVersion >= "6.0(4c)" {
1✔
241
                bd.SetAttr("serviceBdRoutingDisable", "yes")
×
242
        }
×
243
        bd.SetAttr("arpFlood", "yes")
1✔
244
        bd.SetAttr("ipLearning", "no")
1✔
245
        bd.SetAttr("unkMacUcastAct", cont.config.UnknownMacUnicastAction)
1✔
246
        bdToOut := apicapi.NewRsBdToOut(bd.GetDn(), cont.config.AciL3Out)
1✔
247
        bd.AddChild(bdToOut)
1✔
248
        bdToVrf := apicapi.NewRsCtx(bd.GetDn(), cont.config.AciVrf)
1✔
249
        bd.AddChild(bdToVrf)
1✔
250

1✔
251
        bdn := bd.GetDn()
1✔
252
        for _, cidr := range cont.config.NodeServiceSubnets {
2✔
253
                sn := apicapi.NewFvSubnet(bdn, cidr)
1✔
254
                bd.AddChild(sn)
1✔
255
        }
1✔
256
        serviceObjs = append(serviceObjs, bd)
1✔
257

1✔
258
        // Service IP SLA monitoring policy
1✔
259
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
260
                monPol := apicapi.NewFvIPSLAMonitoringPol(cont.config.AciVrfTenant,
1✔
261
                        cont.staticMonPolName())
1✔
262
                monPol.SetAttr("slaFrequency",
1✔
263
                        strconv.Itoa(cont.config.AciServiceMonitorInterval))
1✔
264
                serviceObjs = append(serviceObjs, monPol)
1✔
265
        }
1✔
266

267
        return serviceObjs
1✔
268
}
269

270
func (cont *AciController) initStaticServiceObjs() {
1✔
271
        cont.apicConn.WriteApicObjects(cont.config.AciPrefix+"_service_static",
1✔
272
                cont.staticServiceObjs())
1✔
273
}
1✔
274

275
func (cont *AciController) updateServicesForNode(nodename string) {
1✔
276
        cont.serviceEndPoints.UpdateServicesForNode(nodename)
1✔
277
}
1✔
278

279
// must have index lock
280
func (cont *AciController) getActiveFabricPathDn(node string) string {
×
281
        var fabricPathDn string
×
282
        sz := len(cont.nodeOpflexDevice[node])
×
283
        for i := range cont.nodeOpflexDevice[node] {
×
284
                device := cont.nodeOpflexDevice[node][sz-1-i]
×
285
                if device.GetAttrStr("state") == "connected" {
×
286
                        fabricPathDn = device.GetAttrStr("fabricPathDn")
×
287
                        break
×
288
                }
289
        }
290
        return fabricPathDn
×
291
}
292

293
func (cont *AciController) getOpflexOdevCount(node string) (int, string) {
×
294
        fabricPathDnCount := make(map[string]int)
×
295
        var maxCount int
×
296
        var fabricPathDn string
×
297
        for _, device := range cont.nodeOpflexDevice[node] {
×
298
                fabricpathdn := device.GetAttrStr("fabricPathDn")
×
299
                count, ok := fabricPathDnCount[fabricpathdn]
×
300
                if ok {
×
301
                        fabricPathDnCount[fabricpathdn] = count + 1
×
302
                } else {
×
303
                        fabricPathDnCount[fabricpathdn] = 1
×
304
                }
×
305
                if fabricPathDnCount[fabricpathdn] >= maxCount {
×
306
                        maxCount = fabricPathDnCount[fabricpathdn]
×
307
                        fabricPathDn = fabricpathdn
×
308
                }
×
309
        }
310
        return maxCount, fabricPathDn
×
311
}
312

313
func deleteDevicesFromList(delDevices, devices apicapi.ApicSlice) apicapi.ApicSlice {
×
314
        var newDevices apicapi.ApicSlice
×
315
        for _, device := range devices {
×
316
                found := false
×
317
                for _, delDev := range delDevices {
×
318
                        if reflect.DeepEqual(delDev, device) {
×
319
                                found = true
×
320
                        }
×
321
                }
322
                if !found {
×
323
                        newDevices = append(newDevices, device)
×
324
                }
×
325
        }
326
        return newDevices
×
327
}
328

329
func (cont *AciController) getAciPodSubnet(pod string) (string, error) {
×
330
        podslice := strings.Split(pod, "-")
×
331
        if len(podslice) < 2 {
×
332
                return "", fmt.Errorf("Failed to get podid from pod")
×
333
        }
×
334
        podid := podslice[1]
×
335
        var subnet string
×
336
        args := []string{
×
337
                "query-target=self",
×
338
        }
×
339
        url := fmt.Sprintf("/api/node/mo/uni/controller/setuppol/setupp-%s.json?%s", podid, strings.Join(args, "&"))
×
340
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
341
        if err != nil {
×
342
                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
343
                return subnet, err
×
344
        }
×
345
        for _, obj := range apicresp.Imdata {
×
346
                for _, body := range obj {
×
347
                        tepPool, ok := body.Attributes["tepPool"].(string)
×
348
                        if ok {
×
349
                                subnet = tepPool
×
350
                                break
×
351
                        }
352
                }
353
        }
354
        return subnet, nil
×
355
}
356

357
func (cont *AciController) isSingleOpflexOdev(fabricPathDn string) (bool, error) {
×
358
        pathSlice := strings.Split(fabricPathDn, "/")
×
359
        if len(pathSlice) > 2 {
×
360
                path := pathSlice[2]
×
361
                // topology/<aci_pod_name>/paths-<id>/pathep-<iface> - fabricPathDn if
×
362
                // host is connnected via single link
×
363
                // topology/<aci_pod_name>/protpaths-<id_1>-<id_2>/pathep-<iface> - fabricPathDn if
×
364
                // host is connected to vpc pair
×
365
                if strings.Contains(path, "protpaths-") {
×
366
                        args := []string{
×
367
                                "query-target=self",
×
368
                        }
×
369
                        url := fmt.Sprintf("/api/node/class/vpcDom.json?%s", strings.Join(args, "&"))
×
370
                        apicresp, err := cont.apicConn.GetApicResponse(url)
×
371
                        if err != nil {
×
372
                                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
373
                                return false, err
×
374
                        }
×
375
                        nodeIdSlice := strings.Split(path, "-")
×
376
                        if len(nodeIdSlice) != 3 {
×
377
                                cont.log.Error("Invalid fabricPathDn ", fabricPathDn)
×
378
                                return false, fmt.Errorf("Invalid path in fabricPathDn %s", fabricPathDn)
×
379
                        }
×
380
                        // As host is connected to vpc pair, check if the status of any of the vpcDom is down
381
                        upCount := 0
×
382
                        for i, nodeId := range nodeIdSlice {
×
383
                                instDn := fmt.Sprintf("topology/%s/node-%s/sys/vpc/inst", pathSlice[1], nodeId)
×
384
                                if i == 0 {
×
385
                                        continue
×
386
                                }
387
                                for _, obj := range apicresp.Imdata {
×
388
                                        for _, body := range obj {
×
389
                                                dn, ok := body.Attributes["dn"].(string)
×
390
                                                if ok && strings.Contains(dn, instDn) {
×
391
                                                        peerSt, ok := body.Attributes["peerSt"].(string)
×
392
                                                        if ok && peerSt == "up" {
×
393
                                                                upCount++
×
394
                                                        }
×
395
                                                }
396
                                        }
397
                                }
398
                        }
399
                        if upCount < 2 {
×
400
                                return true, nil
×
401
                        }
×
402
                        return false, nil
×
403
                } else {
×
404
                        return true, nil
×
405
                }
×
406
        }
407
        return false, fmt.Errorf("Invalid fabricPathDn %s ", fabricPathDn)
×
408
}
409

410
func (cont *AciController) createAciPodAnnotation(node string) (aciPodAnnot, error) {
×
411
        odevCount, fabricPathDn := cont.getOpflexOdevCount(node)
×
412
        nodeAciPodAnnot := cont.nodeACIPod[node]
×
413
        isSingleOdev := false
×
414
        if odevCount == 1 {
×
415
                var err error
×
416
                isSingleOdev, err = cont.isSingleOpflexOdev(fabricPathDn)
×
417
                if err != nil {
×
418
                        cont.log.Error(err)
×
419
                        return nodeAciPodAnnot, err
×
420
                }
×
421
        }
422
        if (odevCount == 0) ||
×
423
                (odevCount == 1 && !isSingleOdev) {
×
424
                if nodeAciPodAnnot.aciPod != "none" {
×
425
                        if nodeAciPodAnnot.disconnectTime.IsZero() {
×
426
                                nodeAciPodAnnot.disconnectTime = time.Now()
×
427
                        } else {
×
428
                                currentTime := time.Now()
×
429
                                diff := currentTime.Sub(nodeAciPodAnnot.disconnectTime)
×
430
                                if diff.Seconds() > float64(cont.config.OpflexDeviceReconnectWaitTimeout) {
×
431
                                        nodeAciPodAnnot.aciPod = "none"
×
432
                                        nodeAciPodAnnot.disconnectTime = time.Time{}
×
433
                                }
×
434
                        }
435
                }
436
                return nodeAciPodAnnot, nil
×
437
        } else if (odevCount == 2) ||
×
438
                (odevCount == 1 && isSingleOdev) {
×
439
                // when there is already a connected opflex device,
×
440
                // fabricPathDn will have latest pod iformation
×
441
                // and annotation will be in the form pod-<podid>-<subnet of pod>
×
442
                nodeAciPodAnnot.disconnectTime = time.Time{}
×
443
                nodeAciPod := nodeAciPodAnnot.aciPod
×
444
                pathSlice := strings.Split(fabricPathDn, "/")
×
445
                if len(pathSlice) > 1 {
×
446
                        pod := pathSlice[1]
×
447

×
448
                        // when there is difference in pod info avaliable from fabricPathDn
×
449
                        // and what we have in cache, update info in cache and change annotation on node
×
450
                        if !strings.Contains(nodeAciPod, pod) {
×
451
                                subnet, err := cont.getAciPodSubnet(pod)
×
452
                                if err != nil {
×
453
                                        cont.log.Error("Failed to get subnet of aci pod ", err.Error())
×
454
                                        return nodeAciPodAnnot, err
×
455
                                } else {
×
456
                                        nodeAciPodAnnot.aciPod = pod + "-" + subnet
×
457
                                        return nodeAciPodAnnot, nil
×
458
                                }
×
459
                        } else {
×
460
                                return nodeAciPodAnnot, nil
×
461
                        }
×
462
                } else {
×
463
                        cont.log.Error("Invalid fabricPathDn of opflexOdev of node ", node)
×
464
                        return nodeAciPodAnnot, fmt.Errorf("Invalid fabricPathDn of opflexOdev")
×
465
                }
×
466
        }
467
        return nodeAciPodAnnot, fmt.Errorf("Failed to get annotation for node %s", node)
×
468
}
469

470
func (cont *AciController) createNodeAciPodAnnotation(node string) (aciPodAnnot, error) {
×
471
        odevCount, fabricPathDn := cont.getOpflexOdevCount(node)
×
472
        nodeAciPodAnnot := cont.nodeACIPodAnnot[node]
×
473
        isSingleOdev := false
×
474
        if odevCount == 1 {
×
475
                var err error
×
476
                isSingleOdev, err = cont.isSingleOpflexOdev(fabricPathDn)
×
477
                if err != nil {
×
478
                        cont.log.Error(err)
×
479
                        return nodeAciPodAnnot, err
×
480
                }
×
481
        }
482
        if (odevCount == 0) ||
×
483
                (odevCount == 1 && !isSingleOdev) {
×
484
                if nodeAciPodAnnot.aciPod != "none" {
×
485
                        nodeAciPodAnnot.aciPod = "none"
×
486
                }
×
487
                return nodeAciPodAnnot, nil
×
488
        } else if (odevCount == 2) ||
×
489
                (odevCount == 1 && isSingleOdev) {
×
490
                pathSlice := strings.Split(fabricPathDn, "/")
×
491
                if len(pathSlice) > 1 {
×
492

×
493
                        nodeAciPodAnnot.aciPod = pathSlice[1]
×
494
                        return nodeAciPodAnnot, nil
×
495
                } else {
×
496
                        cont.log.Error("Invalid fabricPathDn of opflexOdev of node ", node)
×
497
                        return nodeAciPodAnnot, fmt.Errorf("Invalid fabricPathDn of opflexOdev")
×
498
                }
×
499
        }
500
        return nodeAciPodAnnot, fmt.Errorf("Failed to get annotation for node %s", node)
×
501
}
502

503
func (cont *AciController) checkChangeOfOpflexOdevAciPod() {
×
504
        var nodeAnnotationUpdates []string
×
505
        cont.apicConn.SyncMutex.Lock()
×
506
        syncDone := cont.apicConn.SyncDone
×
507
        cont.apicConn.SyncMutex.Unlock()
×
508

×
509
        if !syncDone {
×
510
                return
×
511
        }
×
512

513
        cont.indexMutex.Lock()
×
514
        for node := range cont.nodeACIPodAnnot {
×
515
                annot, err := cont.createNodeAciPodAnnotation(node)
×
516
                if err != nil {
×
517
                        if strings.Contains(fmt.Sprint(err), "Failed to get annotation") {
×
518
                                now := time.Now()
×
519
                                if annot.lastErrorTime.IsZero() || now.Sub(annot.lastErrorTime).Seconds() >= 60 {
×
520
                                        annot.lastErrorTime = now
×
521
                                        cont.nodeACIPodAnnot[node] = annot
×
522
                                        cont.log.Error(err.Error())
×
523
                                }
×
524
                        } else {
×
525
                                cont.log.Error(err.Error())
×
526
                        }
×
527
                } else {
×
528
                        if annot != cont.nodeACIPodAnnot[node] {
×
529
                                cont.nodeACIPodAnnot[node] = annot
×
530
                                nodeAnnotationUpdates = append(nodeAnnotationUpdates, node)
×
531
                        }
×
532
                }
533
        }
534
        cont.indexMutex.Unlock()
×
535
        if len(nodeAnnotationUpdates) > 0 {
×
536
                for _, updatednode := range nodeAnnotationUpdates {
×
537
                        go cont.env.NodeAnnotationChanged(updatednode)
×
538
                }
×
539
        }
540
}
541

542
func (cont *AciController) checkChangeOfOdevAciPod() {
×
543
        var nodeAnnotationUpdates []string
×
544
        cont.apicConn.SyncMutex.Lock()
×
545
        syncDone := cont.apicConn.SyncDone
×
546
        cont.apicConn.SyncMutex.Unlock()
×
547

×
548
        if !syncDone {
×
549
                return
×
550
        }
×
551

552
        cont.indexMutex.Lock()
×
553
        for node := range cont.nodeACIPod {
×
554
                annot, err := cont.createAciPodAnnotation(node)
×
555
                if err != nil {
×
556
                        if strings.Contains(fmt.Sprint(err), "Failed to get annotation") {
×
557
                                now := time.Now()
×
558
                                if annot.lastErrorTime.IsZero() || now.Sub(annot.lastErrorTime).Seconds() >= 60 {
×
559
                                        annot.lastErrorTime = now
×
560
                                        cont.nodeACIPod[node] = annot
×
561
                                        cont.log.Error(err.Error())
×
562
                                }
×
563
                        } else {
×
564
                                cont.log.Error(err.Error())
×
565
                        }
×
566
                } else {
×
567
                        if annot != cont.nodeACIPod[node] {
×
568
                                cont.nodeACIPod[node] = annot
×
569
                                nodeAnnotationUpdates = append(nodeAnnotationUpdates, node)
×
570
                        }
×
571
                }
572
        }
573
        cont.indexMutex.Unlock()
×
574
        if len(nodeAnnotationUpdates) > 0 {
×
575
                for _, updatednode := range nodeAnnotationUpdates {
×
576
                        go cont.env.NodeAnnotationChanged(updatednode)
×
577
                }
×
578
        }
579
}
580

581
func (cont *AciController) deleteOldOpflexDevices() {
1✔
582
        var nodeUpdates []string
1✔
583
        cont.indexMutex.Lock()
1✔
584
        for node, devices := range cont.nodeOpflexDevice {
1✔
585
                var delDevices apicapi.ApicSlice
×
586
                fabricPathDn := cont.getActiveFabricPathDn(node)
×
587
                if fabricPathDn != "" {
×
588
                        for _, device := range devices {
×
589
                                if device.GetAttrStr("delete") == "true" && device.GetAttrStr("fabricPathDn") != fabricPathDn {
×
590
                                        deleteTimeStr := device.GetAttrStr("deleteTime")
×
591
                                        deleteTime, err := time.Parse(time.RFC3339, deleteTimeStr)
×
592
                                        if err != nil {
×
593
                                                cont.log.Error("Failed to parse opflex device delete time: ", err)
×
594
                                                continue
×
595
                                        }
596
                                        now := time.Now()
×
597
                                        diff := now.Sub(deleteTime)
×
598
                                        if diff.Seconds() >= cont.config.OpflexDeviceDeleteTimeout {
×
599
                                                delDevices = append(delDevices, device)
×
600
                                        }
×
601
                                }
602
                        }
603
                        if len(delDevices) > 0 {
×
604
                                newDevices := deleteDevicesFromList(delDevices, devices)
×
605
                                cont.nodeOpflexDevice[node] = newDevices
×
606
                                cont.log.Info("Opflex device list for node ", node, " after deleting stale entries: ", cont.nodeOpflexDevice[node])
×
607
                                if len(newDevices) == 0 {
×
608
                                        delete(cont.nodeOpflexDevice, node)
×
609
                                }
×
610
                                nodeUpdates = append(nodeUpdates, node)
×
611
                        }
612
                }
613
        }
614
        cont.indexMutex.Unlock()
1✔
615
        if len(nodeUpdates) > 0 {
1✔
616
                cont.postOpflexDeviceDelete(nodeUpdates)
×
617
        }
×
618
}
619

620
// must have index lock
621
func (cont *AciController) setDeleteFlagForOldDevices(node, fabricPathDn string) {
1✔
622
        for _, device := range cont.nodeOpflexDevice[node] {
2✔
623
                if device.GetAttrStr("fabricPathDn") != fabricPathDn {
1✔
624
                        t := time.Now()
×
625
                        device.SetAttr("delete", "true")
×
626
                        device.SetAttr("deleteTime", t.Format(time.RFC3339))
×
627
                }
×
628
        }
629
}
630

631
// must have index lock
632
func (cont *AciController) fabricPathForNode(name string) (string, bool) {
1✔
633
        sz := len(cont.nodeOpflexDevice[name])
1✔
634
        for i := range cont.nodeOpflexDevice[name] {
2✔
635
                device := cont.nodeOpflexDevice[name][sz-1-i]
1✔
636
                deviceState := device.GetAttrStr("state")
1✔
637
                if deviceState == "connected" {
2✔
638
                        if deviceState != device.GetAttrStr("prevState") {
2✔
639
                                cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Info("Processing fabric path for node ",
1✔
640
                                        "when connected device state is found")
1✔
641
                                device.SetAttr("prevState", deviceState)
1✔
642
                        }
1✔
643
                        fabricPathDn := device.GetAttrStr("fabricPathDn")
1✔
644
                        cont.setDeleteFlagForOldDevices(name, fabricPathDn)
1✔
645
                        return fabricPathDn, true
1✔
646
                } else {
1✔
647
                        device.SetAttr("prevState", deviceState)
1✔
648
                }
1✔
649
        }
650
        if sz > 0 {
2✔
651
                // When the opflex-device for a node changes, for example during a live migration,
1✔
652
                // we end up with both the old and the new device objects till the old object
1✔
653
                // ages out on APIC. The new object is at end of the devices list (see opflexDeviceChanged),
1✔
654
                // so we return the fabricPathDn of the last opflex-device.
1✔
655
                cont.fabricPathLogger(cont.nodeOpflexDevice[name][sz-1].GetAttrStr("hostName"),
1✔
656
                        cont.nodeOpflexDevice[name][sz-1]).Info("Processing fabricPathDn for node")
1✔
657
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("fabricPathDn"), true
1✔
658
        }
1✔
659
        return "", false
1✔
660
}
661

662
// must have index lock
663
func (cont *AciController) deviceMacForNode(name string) (string, bool) {
1✔
664
        sz := len(cont.nodeOpflexDevice[name])
1✔
665
        if sz > 0 {
2✔
666
                // When the opflex-device for a node changes, for example when the
1✔
667
                // node is recreated, we end up with both the old and the new
1✔
668
                // device objects till the old object ages out on APIC. The
1✔
669
                // new object is at end of the devices list (see
1✔
670
                // opflexDeviceChanged), so we return the MAC address of the
1✔
671
                // last opflex-device.
1✔
672
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("mac"), true
1✔
673
        }
1✔
674
        return "", false
1✔
675
}
676

677
func apicRedirectDst(rpDn string, ip string, mac string,
678
        descr string, healthGroupDn string, enablePbrTracking bool) apicapi.ApicObject {
1✔
679
        dst := apicapi.NewVnsRedirectDest(rpDn, ip, mac).SetAttr("descr", descr)
1✔
680
        if healthGroupDn != "" && enablePbrTracking {
2✔
681
                dst.AddChild(apicapi.NewVnsRsRedirectHealthGroup(dst.GetDn(),
1✔
682
                        healthGroupDn))
1✔
683
        }
1✔
684
        return dst
1✔
685
}
686

687
func (cont *AciController) apicRedirectPol(name string, tenantName string, nodes []string,
688
        nodeMap map[string]*metadata.ServiceEndpoint,
689
        monPolDn string, enablePbrTracking bool) (apicapi.ApicObject, string) {
1✔
690
        rp := apicapi.NewVnsSvcRedirectPol(tenantName, name)
1✔
691
        rp.SetAttr("thresholdDownAction", "deny")
1✔
692
        if cont.config.DisableResilientHashing {
1✔
693
                rp.SetAttr("resilientHashEnabled", "no")
×
694
        }
×
695
        rpDn := rp.GetDn()
1✔
696
        for _, node := range nodes {
2✔
697
                cont.indexMutex.Lock()
1✔
698
                serviceEp, ok := nodeMap[node]
1✔
699
                if !ok {
1✔
700
                        continue
×
701
                }
702
                if serviceEp.Ipv4 != nil {
2✔
703
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv4.String(),
1✔
704
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
1✔
705
                }
1✔
706
                if serviceEp.Ipv6 != nil {
1✔
707
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv6.String(),
×
708
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
×
709
                }
×
710
                cont.indexMutex.Unlock()
1✔
711
        }
712
        if monPolDn != "" && enablePbrTracking {
2✔
713
                rp.AddChild(apicapi.NewVnsRsIPSLAMonitoringPol(rpDn, monPolDn))
1✔
714
        }
1✔
715
        return rp, rpDn
1✔
716
}
717

718
func apicExtNetCreate(enDn string, ingress string, ipv4 bool,
719
        cidr bool, sharedSec bool) apicapi.ApicObject {
1✔
720
        if !cidr {
2✔
721
                if ipv4 {
2✔
722
                        ingress += "/32"
1✔
723
                } else {
1✔
724
                        ingress += "/128"
×
725
                }
×
726
        }
727
        subnet := apicapi.NewL3extSubnet(enDn, ingress, "", "")
1✔
728
        if sharedSec {
2✔
729
                subnet.SetAttr("scope", "import-security,shared-security")
1✔
730
        }
1✔
731
        return subnet
1✔
732
}
733

734
func apicExtNet(name string, tenantName string, l3Out string,
735
        ingresses []string, sharedSecurity bool, snat bool) apicapi.ApicObject {
1✔
736
        en := apicapi.NewL3extInstP(tenantName, l3Out, name)
1✔
737
        enDn := en.GetDn()
1✔
738
        if snat {
2✔
739
                en.AddChild(apicapi.NewFvRsCons(enDn, name))
1✔
740
        } else {
2✔
741
                en.AddChild(apicapi.NewFvRsProv(enDn, name))
1✔
742
        }
1✔
743

744
        for _, ingress := range ingresses {
2✔
745
                ip, _, _ := net.ParseCIDR(ingress)
1✔
746
                // If ingress is a subnet
1✔
747
                if ip != nil {
2✔
748
                        if ip != nil && ip.To4() != nil {
2✔
749
                                subnet := apicExtNetCreate(enDn, ingress, true, true, sharedSecurity)
1✔
750
                                en.AddChild(subnet)
1✔
751
                        } else if ip != nil && ip.To16() != nil {
1✔
752
                                subnet := apicExtNetCreate(enDn, ingress, false, true, sharedSecurity)
×
753
                                en.AddChild(subnet)
×
754
                        }
×
755
                } else {
1✔
756
                        // If ingress is an IP address
1✔
757
                        ip := net.ParseIP(ingress)
1✔
758
                        if ip != nil && ip.To4() != nil {
2✔
759
                                subnet := apicExtNetCreate(enDn, ingress, true, false, sharedSecurity)
1✔
760
                                en.AddChild(subnet)
1✔
761
                        } else if ip != nil && ip.To16() != nil {
1✔
762
                                subnet := apicExtNetCreate(enDn, ingress, false, false, sharedSecurity)
×
763
                                en.AddChild(subnet)
×
764
                        }
×
765
                }
766
        }
767
        return en
1✔
768
}
769

770
func apicDefaultEgCons(conName string, tenantName string,
771
        appProfile string, epg string) apicapi.ApicObject {
×
772
        enDn := fmt.Sprintf("uni/tn-%s/ap-%s/epg-%s", tenantName, appProfile, epg)
×
773
        return apicapi.NewFvRsCons(enDn, conName)
×
774
}
×
775

776
func apicExtNetCons(conName string, tenantName string,
777
        l3Out string, net string) apicapi.ApicObject {
1✔
778
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
779
        return apicapi.NewFvRsCons(enDn, conName)
1✔
780
}
1✔
781

782
func apicExtNetProv(conName string, tenantName string,
783
        l3Out string, net string) apicapi.ApicObject {
1✔
784
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
785
        return apicapi.NewFvRsProv(enDn, conName)
1✔
786
}
1✔
787

788
// Helper function to check if a string item exists in a slice
789
func stringInSlice(str string, list []string) bool {
1✔
790
        for _, v := range list {
2✔
791
                if v == str {
2✔
792
                        return true
1✔
793
                }
1✔
794
        }
795
        return false
×
796
}
797

798
func validScope(scope string) bool {
1✔
799
        validValues := []string{"", "context", "tenant", "global"}
1✔
800
        return stringInSlice(scope, validValues)
1✔
801
}
1✔
802

803
func (cont *AciController) getGraphNameFromContract(name, tenantName string) (string, error) {
×
804
        var graphName string
×
805
        args := []string{
×
806
                "query-target=subtree",
×
807
        }
×
808
        url := fmt.Sprintf("/api/node/mo/uni/tn-%s/brc-%s.json?%s", tenantName, name, strings.Join(args, "&"))
×
809
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
810
        if err != nil {
×
811
                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
812
                return graphName, err
×
813
        }
×
814
        for _, obj := range apicresp.Imdata {
×
815
                for class, body := range obj {
×
816
                        if class == "vzRsSubjGraphAtt" {
×
817
                                tnVnsAbsGraphName, ok := body.Attributes["tnVnsAbsGraphName"].(string)
×
818
                                if ok {
×
819
                                        graphName = tnVnsAbsGraphName
×
820
                                }
×
821
                                break
×
822
                        }
823
                }
824
        }
825
        cont.log.Debug("graphName: ", graphName)
×
826
        return graphName, err
×
827
}
828

829
func apicContract(conName string, tenantName string,
830
        graphName string, scopeName string, isSnatPbrFltrChain bool,
831
        customSGAnnot bool) apicapi.ApicObject {
1✔
832
        con := apicapi.NewVzBrCP(tenantName, conName)
1✔
833
        if scopeName != "" && scopeName != "context" {
2✔
834
                con.SetAttr("scope", scopeName)
1✔
835
        }
1✔
836
        cs := apicapi.NewVzSubj(con.GetDn(), "loadbalancedservice")
1✔
837
        csDn := cs.GetDn()
1✔
838
        if isSnatPbrFltrChain {
2✔
839
                cs.SetAttr("revFltPorts", "no")
1✔
840
                inTerm := apicapi.NewVzInTerm(csDn)
1✔
841
                outTerm := apicapi.NewVzOutTerm(csDn)
1✔
842
                inTerm.AddChild(apicapi.NewVzRsInTermGraphAtt(inTerm.GetDn(), graphName))
1✔
843
                inTerm.AddChild(apicapi.NewVzRsFiltAtt(inTerm.GetDn(), conName+"_fromCons-toProv"))
1✔
844
                outTerm.AddChild(apicapi.NewVzRsOutTermGraphAtt(outTerm.GetDn(), graphName))
1✔
845
                outTerm.AddChild(apicapi.NewVzRsFiltAtt(outTerm.GetDn(), conName+"_fromProv-toCons"))
1✔
846
                cs.AddChild(inTerm)
1✔
847
                cs.AddChild(outTerm)
1✔
848
        } else {
2✔
849
                cs.AddChild(apicapi.NewVzRsSubjGraphAtt(csDn, graphName, customSGAnnot))
1✔
850
                cs.AddChild(apicapi.NewVzRsSubjFiltAtt(csDn, conName))
1✔
851
        }
1✔
852
        con.AddChild(cs)
1✔
853
        return con
1✔
854
}
855

856
func apicDevCtx(name string, tenantName string,
857
        graphName string, deviceName string, bdName string, rpDn string, isSnatPbrFltrChain bool) apicapi.ApicObject {
1✔
858
        cc := apicapi.NewVnsLDevCtx(tenantName, name, graphName, "loadbalancer")
1✔
859
        ccDn := cc.GetDn()
1✔
860
        graphDn := fmt.Sprintf("uni/tn-%s/lDevVip-%s", tenantName, deviceName)
1✔
861
        lifDn := fmt.Sprintf("%s/lIf-%s", graphDn, "interface")
1✔
862
        bdDn := fmt.Sprintf("uni/tn-%s/BD-%s", tenantName, bdName)
1✔
863
        cc.AddChild(apicapi.NewVnsRsLDevCtxToLDev(ccDn, graphDn))
1✔
864
        rpDnBase := rpDn
1✔
865
        for _, ctxConn := range []string{"consumer", "provider"} {
2✔
866
                lifCtx := apicapi.NewVnsLIfCtx(ccDn, ctxConn)
1✔
867
                if isSnatPbrFltrChain {
2✔
868
                        if ctxConn == "consumer" {
2✔
869
                                rpDn = rpDnBase + "_Cons"
1✔
870
                        } else {
2✔
871
                                rpDn = rpDnBase + "_Prov"
1✔
872
                        }
1✔
873
                }
874
                lifCtxDn := lifCtx.GetDn()
1✔
875
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToSvcRedirectPol(lifCtxDn, rpDn))
1✔
876
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToBD(lifCtxDn, bdDn))
1✔
877
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToLIf(lifCtxDn, lifDn))
1✔
878
                cc.AddChild(lifCtx)
1✔
879
        }
880
        return cc
1✔
881
}
882

883
func apicFilterEntry(filterDn string, count string, p_start string,
884
        p_end string, protocol string, stateful string, snat bool, outTerm bool) apicapi.ApicObject {
1✔
885
        fe := apicapi.NewVzEntry(filterDn, count)
1✔
886
        fe.SetAttr("etherT", "ip")
1✔
887
        fe.SetAttr("prot", protocol)
1✔
888
        if snat {
2✔
889
                if outTerm {
2✔
890
                        if protocol == "tcp" {
2✔
891
                                fe.SetAttr("tcpRules", "est")
1✔
892
                        }
1✔
893
                        // Reverse the ports for outTerm
894
                        fe.SetAttr("dFromPort", p_start)
1✔
895
                        fe.SetAttr("dToPort", p_end)
1✔
896
                } else {
1✔
897
                        fe.SetAttr("sFromPort", p_start)
1✔
898
                        fe.SetAttr("sToPort", p_end)
1✔
899
                }
1✔
900
        } else {
1✔
901
                fe.SetAttr("dFromPort", p_start)
1✔
902
                fe.SetAttr("dToPort", p_end)
1✔
903
        }
1✔
904
        fe.SetAttr("stateful", stateful)
1✔
905
        return fe
1✔
906
}
907
func apicFilter(name string, tenantName string,
908
        portSpec []v1.ServicePort, snat bool, snatRange portRangeSnat) apicapi.ApicObject {
1✔
909
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
910
        filterDn := filter.GetDn()
1✔
911

1✔
912
        var i int
1✔
913
        var port v1.ServicePort
1✔
914
        for i, port = range portSpec {
2✔
915
                pstr := strconv.Itoa(int(port.Port))
1✔
916
                proto := getProtocolStr(port.Protocol)
1✔
917
                fe := apicFilterEntry(filterDn, strconv.Itoa(i), pstr,
1✔
918
                        pstr, proto, "no", false, false)
1✔
919
                filter.AddChild(fe)
1✔
920
        }
1✔
921

922
        if snat {
1✔
923
                portSpec := []portRangeSnat{snatRange}
×
924
                p_start := strconv.Itoa(portSpec[0].start)
×
925
                p_end := strconv.Itoa(portSpec[0].end)
×
926

×
927
                fe1 := apicFilterEntry(filterDn, strconv.Itoa(i+1), p_start,
×
928
                        p_end, "tcp", "no", false, false)
×
929
                filter.AddChild(fe1)
×
930
                fe2 := apicFilterEntry(filterDn, strconv.Itoa(i+2), p_start,
×
931
                        p_end, "udp", "no", false, false)
×
932
                filter.AddChild(fe2)
×
933
        }
×
934
        return filter
1✔
935
}
936

937
func apicFilterSnat(name string, tenantName string,
938
        portSpec []portRangeSnat, outTerm bool) apicapi.ApicObject {
1✔
939
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
940
        filterDn := filter.GetDn()
1✔
941

1✔
942
        p_start := strconv.Itoa(portSpec[0].start)
1✔
943
        p_end := strconv.Itoa(portSpec[0].end)
1✔
944

1✔
945
        fe := apicFilterEntry(filterDn, "0", p_start,
1✔
946
                p_end, "tcp", "no", true, outTerm)
1✔
947
        filter.AddChild(fe)
1✔
948
        fe1 := apicFilterEntry(filterDn, "1", p_start,
1✔
949
                p_end, "udp", "no", true, outTerm)
1✔
950
        filter.AddChild(fe1)
1✔
951

1✔
952
        return filter
1✔
953
}
1✔
954

955
func (cont *AciController) updateServiceDeviceInstance(key string,
956
        service *v1.Service) error {
1✔
957
        cont.indexMutex.Lock()
1✔
958
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
959
        cont.serviceEndPoints.GetnodesMetadata(key, service, nodeMap)
1✔
960
        cont.indexMutex.Unlock()
1✔
961

1✔
962
        var nodes []string
1✔
963
        for node := range nodeMap {
2✔
964
                nodes = append(nodes, node)
1✔
965
        }
1✔
966
        sort.Strings(nodes)
1✔
967
        name := cont.aciNameForKey("svc", key)
1✔
968
        var conScope string
1✔
969
        scopeVal, ok := service.ObjectMeta.Annotations[metadata.ServiceContractScopeAnnotation]
1✔
970
        if ok {
2✔
971
                normScopeVal := strings.ToLower(scopeVal)
1✔
972
                if !validScope(normScopeVal) {
1✔
973
                        errString := "Invalid service contract scope value provided " + scopeVal
×
974
                        err := errors.New(errString)
×
975
                        serviceLogger(cont.log, service).Error("Could not create contract: ", err)
×
976
                        return err
×
977
                } else {
1✔
978
                        conScope = normScopeVal
1✔
979
                }
1✔
980
        } else {
1✔
981
                conScope = DefaultServiceContractScope
1✔
982
        }
1✔
983

984
        var sharedSecurity bool
1✔
985
        if conScope == "global" {
2✔
986
                sharedSecurity = true
1✔
987
        } else {
2✔
988
                sharedSecurity = DefaultServiceExtSubNetShared
1✔
989
        }
1✔
990

991
        graphName := cont.aciNameForKey("svc", "global")
1✔
992
        deviceName := cont.aciNameForKey("svc", "global")
1✔
993
        _, customSGAnnPresent := service.ObjectMeta.Annotations[metadata.ServiceGraphNameAnnotation]
1✔
994
        if customSGAnnPresent {
1✔
995
                customSG, err := cont.getGraphNameFromContract(name, cont.config.AciVrfTenant)
×
996
                if err == nil {
×
997
                        graphName = customSG
×
998
                }
×
999
        }
1000
        cont.log.Debug("Using service graph ", graphName, " for service ", key)
1✔
1001

1✔
1002
        var serviceObjs apicapi.ApicSlice
1✔
1003
        if len(nodes) > 0 {
2✔
1004
                // 1. Service redirect policy
1✔
1005
                // The service redirect policy contains the MAC address
1✔
1006
                // and IP address of each of the service endpoints for
1✔
1007
                // each node that hosts a pod for this service.  The
1✔
1008
                // example below shows the case of two nodes.
1✔
1009
                rp, rpDn :=
1✔
1010
                        cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
1✔
1011
                                nodeMap, cont.staticMonPolDn(), cont.config.AciPbrTrackingNonSnat)
1✔
1012
                serviceObjs = append(serviceObjs, rp)
1✔
1013

1✔
1014
                // 2. Service graph contract and external network
1✔
1015
                // The service graph contract must be bound to the service
1✔
1016
                // graph.  This contract must be consumed by the default
1✔
1017
                // layer 3 network and provided by the service layer 3
1✔
1018
                // network.
1✔
1019
                {
2✔
1020
                        var ingresses []string
1✔
1021
                        for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
1022
                                ingresses = append(ingresses, ingress.IP)
1✔
1023
                        }
1✔
1024
                        serviceObjs = append(serviceObjs,
1✔
1025
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
1026
                                        cont.config.AciL3Out, ingresses, sharedSecurity, false))
1✔
1027
                }
1028

1029
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, false, customSGAnnPresent)
1✔
1030
                serviceObjs = append(serviceObjs, contract)
1✔
1031
                for _, net := range cont.config.AciExtNetworks {
2✔
1032
                        serviceObjs = append(serviceObjs,
1✔
1033
                                apicExtNetCons(name, cont.config.AciVrfTenant,
1✔
1034
                                        cont.config.AciL3Out, net))
1✔
1035
                }
1✔
1036

1037
                if cont.config.AddExternalContractToDefaultEPG && service.Spec.Type == v1.ServiceTypeLoadBalancer {
1✔
1038
                        defaultEpgTenant := cont.config.DefaultEg.PolicySpace
×
1039
                        defaultEpgStringSplit := strings.Split(cont.config.DefaultEg.Name, "|")
×
1040
                        var defaultEpgName, appProfile string
×
1041
                        if len(defaultEpgStringSplit) > 1 {
×
1042
                                appProfile = defaultEpgStringSplit[0]
×
1043
                                defaultEpgName = defaultEpgStringSplit[1]
×
1044
                        } else {
×
1045
                                appProfile = cont.config.AppProfile
×
1046
                                defaultEpgName = defaultEpgStringSplit[0]
×
1047
                        }
×
1048
                        serviceObjs = append(serviceObjs,
×
1049
                                apicDefaultEgCons(name, defaultEpgTenant, appProfile, defaultEpgName))
×
1050
                }
1051

1052
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
1053
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
1054

1✔
1055
                _, snat := cont.snatServices[key]
1✔
1056
                filter := apicFilter(name, cont.config.AciVrfTenant,
1✔
1057
                        service.Spec.Ports, snat, defaultPortRange)
1✔
1058
                serviceObjs = append(serviceObjs, filter)
1✔
1059

1✔
1060
                // 3. Device cluster context
1✔
1061
                // The logical device context binds the service contract
1✔
1062
                // to the redirect policy and the device cluster and
1✔
1063
                // bridge domain for the device cluster.
1✔
1064
                serviceObjs = append(serviceObjs,
1✔
1065
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName, deviceName,
1✔
1066
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, false))
1✔
1067
        }
1068

1069
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1070
        return nil
1✔
1071
}
1072

1073
func (cont *AciController) updateServiceDeviceInstanceSnat(key string) error {
1✔
1074
        nodeList := cont.nodeIndexer.List()
1✔
1075
        cont.indexMutex.Lock()
1✔
1076
        if len(cont.nodeServiceMetaCache) == 0 {
2✔
1077
                cont.indexMutex.Unlock()
1✔
1078
                return nil
1✔
1079
        }
1✔
1080
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
1081
        sort.Slice(nodeList, func(i, j int) bool {
2✔
1082
                nodeA := nodeList[i].(*v1.Node)
1✔
1083
                nodeB := nodeList[j].(*v1.Node)
1✔
1084
                return nodeA.ObjectMeta.Name < nodeB.ObjectMeta.Name
1✔
1085
        })
1✔
1086
        for itr, nodeItem := range nodeList {
2✔
1087
                if itr == cont.config.MaxSvcGraphNodes {
1✔
1088
                        break
×
1089
                }
1090
                node := nodeItem.(*v1.Node)
1✔
1091
                nodeName := node.ObjectMeta.Name
1✔
1092
                nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
1093
                if !ok {
2✔
1094
                        continue
1✔
1095
                }
1096
                _, ok = cont.fabricPathForNode(nodeName)
1✔
1097
                if !ok {
1✔
1098
                        continue
×
1099
                }
1100
                nodeLabels := node.ObjectMeta.Labels
1✔
1101
                excludeNode := cont.nodeLabelsInExcludeList(nodeLabels)
1✔
1102
                if excludeNode {
1✔
1103
                        continue
×
1104
                }
1105
                nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
1106
        }
1107
        cont.indexMutex.Unlock()
1✔
1108

1✔
1109
        var nodes []string
1✔
1110
        for node := range nodeMap {
2✔
1111
                nodes = append(nodes, node)
1✔
1112
        }
1✔
1113
        sort.Strings(nodes)
1✔
1114
        name := cont.aciNameForKey("snat", key)
1✔
1115
        var conScope = cont.config.SnatSvcContractScope
1✔
1116
        sharedSecurity := true
1✔
1117

1✔
1118
        graphName := cont.aciNameForKey("svc", "global")
1✔
1119
        var serviceObjs apicapi.ApicSlice
1✔
1120
        if len(nodes) > 0 {
2✔
1121
                // 1. Service redirect policy
1✔
1122
                // The service redirect policy contains the MAC address
1✔
1123
                // and IP address of each of the service endpoints for
1✔
1124
                // each node that hosts a pod for this service.
1✔
1125
                // For SNAT with the introduction of filter-chain usage, to work-around
1✔
1126
                // an APIC limitation, creating two PBR policies with same nodes.
1✔
1127
                var rpDn string
1✔
1128
                var rp apicapi.ApicObject
1✔
1129
                if cont.apicConn.SnatPbrFltrChain {
2✔
1130
                        rpCons, rpDnCons :=
1✔
1131
                                cont.apicRedirectPol(name+"_Cons", cont.config.AciVrfTenant, nodes,
1✔
1132
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
1133
                        serviceObjs = append(serviceObjs, rpCons)
1✔
1134
                        rpProv, _ :=
1✔
1135
                                cont.apicRedirectPol(name+"_Prov", cont.config.AciVrfTenant, nodes,
1✔
1136
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
1137
                        serviceObjs = append(serviceObjs, rpProv)
1✔
1138
                        rpDn = strings.TrimSuffix(rpDnCons, "_Cons")
1✔
1139
                } else {
1✔
1140
                        rp, rpDn =
×
1141
                                cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
×
1142
                                        nodeMap, cont.staticMonPolDn(), true)
×
1143
                        serviceObjs = append(serviceObjs, rp)
×
1144
                }
×
1145
                // 2. Service graph contract and external network
1146
                // The service graph contract must be bound to the
1147
                // service
1148
                // graph.  This contract must be consumed by the default
1149
                // layer 3 network and provided by the service layer 3
1150
                // network.
1151
                {
1✔
1152
                        var ingresses []string
1✔
1153
                        for _, policy := range cont.snatPolicyCache {
2✔
1154
                                ingresses = append(ingresses, policy.SnatIp...)
1✔
1155
                        }
1✔
1156
                        serviceObjs = append(serviceObjs,
1✔
1157
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
1158
                                        cont.config.AciL3Out, ingresses, sharedSecurity, true))
1✔
1159
                }
1160

1161
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, cont.apicConn.SnatPbrFltrChain, false)
1✔
1162
                serviceObjs = append(serviceObjs, contract)
1✔
1163

1✔
1164
                for _, net := range cont.config.AciExtNetworks {
2✔
1165
                        serviceObjs = append(serviceObjs,
1✔
1166
                                apicExtNetProv(name, cont.config.AciVrfTenant,
1✔
1167
                                        cont.config.AciL3Out, net))
1✔
1168
                }
1✔
1169

1170
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
1171
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
1172
                portSpec := []portRangeSnat{defaultPortRange}
1✔
1173
                if cont.apicConn.SnatPbrFltrChain {
2✔
1174
                        filterIn := apicFilterSnat(name+"_fromCons-toProv", cont.config.AciVrfTenant, portSpec, false)
1✔
1175
                        serviceObjs = append(serviceObjs, filterIn)
1✔
1176
                        filterOut := apicFilterSnat(name+"_fromProv-toCons", cont.config.AciVrfTenant, portSpec, true)
1✔
1177
                        serviceObjs = append(serviceObjs, filterOut)
1✔
1178
                } else {
1✔
1179
                        filter := apicFilterSnat(name, cont.config.AciVrfTenant, portSpec, false)
×
1180
                        serviceObjs = append(serviceObjs, filter)
×
1181
                }
×
1182
                // 3. Device cluster context
1183
                // The logical device context binds the service contract
1184
                // to the redirect policy and the device cluster and
1185
                // bridge domain for the device cluster.
1186
                serviceObjs = append(serviceObjs,
1✔
1187
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName, graphName,
1✔
1188
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, cont.apicConn.SnatPbrFltrChain))
1✔
1189
        }
1190
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1191
        return nil
1✔
1192
}
1193

1194
func (cont *AciController) nodeLabelsInExcludeList(Labels map[string]string) bool {
1✔
1195
        nodeSnatRedirectExclude := cont.config.NodeSnatRedirectExclude
1✔
1196

1✔
1197
        for _, nodeGroup := range nodeSnatRedirectExclude {
1✔
1198
                if len(nodeGroup.Labels) == 0 {
×
1199
                        continue
×
1200
                }
1201
                matchFound := true
×
1202
                for _, label := range nodeGroup.Labels {
×
1203
                        if _, ok := Labels["node-role.kubernetes.io/"+label]; !ok {
×
1204
                                matchFound = false
×
1205
                                break
×
1206
                        }
1207
                }
1208
                if matchFound {
×
1209
                        return true
×
1210
                }
×
1211
        }
1212
        return false
1✔
1213
}
1214

1215
func (cont *AciController) queueServiceUpdateByKey(key string) {
1✔
1216
        cont.serviceQueue.Add(key)
1✔
1217
}
1✔
1218

1219
func (cont *AciController) queueServiceUpdate(service *v1.Service) {
1✔
1220
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
1221
        if err != nil {
1✔
1222
                serviceLogger(cont.log, service).
×
1223
                        Error("Could not create service key: ", err)
×
1224
                return
×
1225
        }
×
1226
        cont.serviceQueue.Add(key)
1✔
1227
}
1228

1229
func apicDeviceCluster(name string, vrfTenant string,
1230
        physDom string, encap string,
1231
        nodes []string, nodeMap map[string]string) (apicapi.ApicObject, string) {
1✔
1232
        dc := apicapi.NewVnsLDevVip(vrfTenant, name)
1✔
1233
        dc.SetAttr("managed", "no")
1✔
1234
        dcDn := dc.GetDn()
1✔
1235
        dc.AddChild(apicapi.NewVnsRsALDevToPhysDomP(dcDn,
1✔
1236
                fmt.Sprintf("uni/phys-%s", physDom)))
1✔
1237
        lif := apicapi.NewVnsLIf(dcDn, "interface")
1✔
1238
        lif.SetAttr("encap", encap)
1✔
1239
        lifDn := lif.GetDn()
1✔
1240

1✔
1241
        for _, node := range nodes {
2✔
1242
                path, ok := nodeMap[node]
1✔
1243
                if !ok {
1✔
1244
                        continue
×
1245
                }
1246

1247
                cdev := apicapi.NewVnsCDev(dcDn, node)
1✔
1248
                cif := apicapi.NewVnsCif(cdev.GetDn(), "interface")
1✔
1249
                cif.AddChild(apicapi.NewVnsRsCIfPathAtt(cif.GetDn(), path))
1✔
1250
                cdev.AddChild(cif)
1✔
1251
                lif.AddChild(apicapi.NewVnsRsCIfAttN(lifDn, cif.GetDn()))
1✔
1252
                dc.AddChild(cdev)
1✔
1253
        }
1254

1255
        dc.AddChild(lif)
1✔
1256

1✔
1257
        return dc, dcDn
1✔
1258
}
1259

1260
func apicServiceGraph(name string, tenantName string,
1261
        dcDn string) apicapi.ApicObject {
1✔
1262
        sg := apicapi.NewVnsAbsGraph(tenantName, name)
1✔
1263
        sgDn := sg.GetDn()
1✔
1264
        var provDn string
1✔
1265
        var consDn string
1✔
1266
        var cTermDn string
1✔
1267
        var pTermDn string
1✔
1268
        {
2✔
1269
                an := apicapi.NewVnsAbsNode(sgDn, "loadbalancer")
1✔
1270
                an.SetAttr("managed", "no")
1✔
1271
                an.SetAttr("routingMode", "Redirect")
1✔
1272
                anDn := an.GetDn()
1✔
1273
                cons := apicapi.NewVnsAbsFuncConn(anDn, "consumer")
1✔
1274
                consDn = cons.GetDn()
1✔
1275
                an.AddChild(cons)
1✔
1276
                prov := apicapi.NewVnsAbsFuncConn(anDn, "provider")
1✔
1277
                provDn = prov.GetDn()
1✔
1278
                an.AddChild(prov)
1✔
1279
                an.AddChild(apicapi.NewVnsRsNodeToLDev(anDn, dcDn))
1✔
1280
                sg.AddChild(an)
1✔
1281
        }
1✔
1282
        {
1✔
1283
                tnc := apicapi.NewVnsAbsTermNodeCon(sgDn, "T1")
1✔
1284
                tncDn := tnc.GetDn()
1✔
1285
                cTerm := apicapi.NewVnsAbsTermConn(tncDn)
1✔
1286
                cTermDn = cTerm.GetDn()
1✔
1287
                tnc.AddChild(cTerm)
1✔
1288
                tnc.AddChild(apicapi.NewVnsInTerm(tncDn))
1✔
1289
                tnc.AddChild(apicapi.NewVnsOutTerm(tncDn))
1✔
1290
                sg.AddChild(tnc)
1✔
1291
        }
1✔
1292
        {
1✔
1293
                tnp := apicapi.NewVnsAbsTermNodeProv(sgDn, "T2")
1✔
1294
                tnpDn := tnp.GetDn()
1✔
1295
                pTerm := apicapi.NewVnsAbsTermConn(tnpDn)
1✔
1296
                pTermDn = pTerm.GetDn()
1✔
1297
                tnp.AddChild(pTerm)
1✔
1298
                tnp.AddChild(apicapi.NewVnsInTerm(tnpDn))
1✔
1299
                tnp.AddChild(apicapi.NewVnsOutTerm(tnpDn))
1✔
1300
                sg.AddChild(tnp)
1✔
1301
        }
1✔
1302
        {
1✔
1303
                acc := apicapi.NewVnsAbsConnection(sgDn, "C1")
1✔
1304
                acc.SetAttr("connDir", "provider")
1✔
1305
                accDn := acc.GetDn()
1✔
1306
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, consDn))
1✔
1307
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, cTermDn))
1✔
1308
                sg.AddChild(acc)
1✔
1309
        }
1✔
1310
        {
1✔
1311
                acp := apicapi.NewVnsAbsConnection(sgDn, "C2")
1✔
1312
                acp.SetAttr("connDir", "provider")
1✔
1313
                acpDn := acp.GetDn()
1✔
1314
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, provDn))
1✔
1315
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, pTermDn))
1✔
1316
                sg.AddChild(acp)
1✔
1317
        }
1✔
1318
        return sg
1✔
1319
}
1320
func (cont *AciController) updateDeviceCluster() {
1✔
1321
        nodeMap := make(map[string]string)
1✔
1322
        cont.indexMutex.Lock()
1✔
1323
        for node := range cont.nodeOpflexDevice {
2✔
1324
                cont.log.Debug("Processing node in nodeOpflexDevice cache : ", node)
1✔
1325
                fabricPath, ok := cont.fabricPathForNode(node)
1✔
1326
                if !ok {
2✔
1327
                        continue
1✔
1328
                }
1329
                nodeMap[node] = fabricPath
1✔
1330
        }
1331

1332
        // For clusters other than OpenShift On OpenStack,
1333
        // openStackFabricPathDnMap will be empty
1334
        for host, opflexOdevInfo := range cont.openStackFabricPathDnMap {
1✔
1335
                nodeMap[host] = opflexOdevInfo.fabricPathDn
×
1336
        }
×
1337

1338
        // For OpenShift On OpenStack clusters,
1339
        // hostFabricPathDnMap will be empty
1340
        for _, hostInfo := range cont.hostFabricPathDnMap {
1✔
1341
                if hostInfo.fabricPathDn != "" {
×
1342
                        nodeMap[hostInfo.host] = hostInfo.fabricPathDn
×
1343
                }
×
1344
        }
1345
        cont.indexMutex.Unlock()
1✔
1346

1✔
1347
        var nodes []string
1✔
1348
        for node := range nodeMap {
2✔
1349
                nodes = append(nodes, node)
1✔
1350
        }
1✔
1351
        sort.Strings(nodes)
1✔
1352

1✔
1353
        name := cont.aciNameForKey("svc", "global")
1✔
1354
        var serviceObjs apicapi.ApicSlice
1✔
1355

1✔
1356
        // 1. Device cluster:
1✔
1357
        // The device cluster is a set of physical paths that need to be
1✔
1358
        // created for each node in the cluster, that correspond to the
1✔
1359
        // service interface for each node.
1✔
1360
        dc, dcDn := apicDeviceCluster(name, cont.config.AciVrfTenant,
1✔
1361
                cont.config.AciServicePhysDom, cont.config.AciServiceEncap,
1✔
1362
                nodes, nodeMap)
1✔
1363
        serviceObjs = append(serviceObjs, dc)
1✔
1364

1✔
1365
        // 2. Service graph template
1✔
1366
        // The service graph controls how the traffic will be redirected.
1✔
1367
        // A service graph must be created for each device cluster.
1✔
1368
        serviceObjs = append(serviceObjs,
1✔
1369
                apicServiceGraph(name, cont.config.AciVrfTenant, dcDn))
1✔
1370

1✔
1371
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1372
}
1373

1374
func (cont *AciController) fabricPathLogger(node string,
1375
        obj apicapi.ApicObject) *logrus.Entry {
1✔
1376
        return cont.log.WithFields(logrus.Fields{
1✔
1377
                "fabricPath": obj.GetAttr("fabricPathDn"),
1✔
1378
                "mac":        obj.GetAttr("mac"),
1✔
1379
                "node":       node,
1✔
1380
                "obj":        obj,
1✔
1381
        })
1✔
1382
}
1✔
1383

1384
// getNodeUplinkMac returns the uplink MAC address of any node from the
1385
// opflex.cisco.com/uplink-mac annotation written by hostagent at startup.
NEW
1386
func (cont *AciController) getNodeUplinkMac() string {
×
NEW
1387
        nodeList := cont.nodeIndexer.List()
×
NEW
1388
        for _, obj := range nodeList {
×
NEW
1389
                node := obj.(*v1.Node)
×
NEW
1390
                if mac, ok := node.ObjectMeta.Annotations[metadata.UplinkMacAnnotation]; ok && mac != "" {
×
NEW
1391
                        return mac
×
NEW
1392
                }
×
1393
        }
NEW
1394
        return ""
×
1395
}
1396

1397
func (cont *AciController) setOpenStackSystemId() string {
×
1398

×
NEW
1399
        // 1) get opflexIDEp with mac == <node MAC of any one of the OpenShift nodes>
×
1400
        // 2) extract OpenStack system id from compHvDn attribute
×
1401
        //    comp/prov-OpenStack/ctrlr-[k8s-scale]-k8s-scale/hv-overcloud-novacompute-0 - sample compHvDn,
×
1402
        //    where k8s-scale is the system id
×
1403

×
1404
        var systemId string
×
NEW
1405

×
NEW
1406
        nodeMac := cont.getNodeUplinkMac()
×
NEW
1407

×
NEW
1408
        if nodeMac == "" {
×
NEW
1409
                cont.log.Warning("No node uplink MAC found in node annotations")
×
1410
                return systemId
×
1411
        }
×
1412

NEW
1413
        cont.log.Info("Using node MAC for OpenStack system id lookup: ", nodeMac)
×
NEW
1414

×
NEW
1415
        opflexIDEpFilter := fmt.Sprintf(
×
NEW
1416
                "query-target-filter=and(eq(opflexIDEp.mac,\"%s\"),wcard(opflexIDEp.compHvDn,\"prov-OpenStack\"))",
×
NEW
1417
                nodeMac)
×
NEW
1418
        url := fmt.Sprintf("/api/node/class/opflexIDEp.json?%s", opflexIDEpFilter)
×
1419
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
1420
        if err != nil {
×
1421
                cont.log.Error("Failed to get APIC response, err: ", err.Error())
×
1422
                return systemId
×
1423
        }
×
1424
        for _, obj := range apicresp.Imdata {
×
1425
                for _, body := range obj {
×
1426
                        compHvDn, ok := body.Attributes["compHvDn"].(string)
×
1427
                        if ok {
×
1428
                                systemId = compHvDn[strings.IndexByte(compHvDn, '[')+1 : strings.IndexByte(compHvDn, ']')]
×
1429
                                break
×
1430
                        }
1431
                }
1432
        }
1433
        cont.indexMutex.Lock()
×
1434
        cont.openStackSystemId = systemId
×
1435
        cont.log.Info("Setting OpenStack system id : ", cont.openStackSystemId)
×
1436
        cont.indexMutex.Unlock()
×
1437
        return systemId
×
1438
}
1439

1440
// Returns true when a new OpenStack opflexODev is added
1441
func (cont *AciController) openStackOpflexOdevUpdate(obj apicapi.ApicObject) bool {
×
1442

×
NEW
1443
        // If opflexOdev compHvDn contains comp/prov-OpenStack/ctrlr-[<systemid>]-<systemid>,
×
1444
        // it means that it is an OpenStack OpflexOdev which belongs to OpenStack with system id <systemid>
×
1445

×
1446
        var deviceClusterUpdate bool
×
1447
        compHvDn := obj.GetAttrStr("compHvDn")
×
1448
        if strings.Contains(compHvDn, "prov-OpenStack") {
×
1449
                cont.indexMutex.Lock()
×
1450
                systemId := cont.openStackSystemId
×
1451
                cont.indexMutex.Unlock()
×
1452
                if systemId == "" {
×
1453
                        systemId = cont.setOpenStackSystemId()
×
1454
                }
×
1455
                if systemId == "" {
×
1456
                        cont.log.Error("Failed  to get OpenStack system id")
×
1457
                        return deviceClusterUpdate
×
1458
                }
×
1459
                prefix := fmt.Sprintf("comp/prov-OpenStack/ctrlr-[%s]-%s", systemId, systemId)
×
1460
                if strings.Contains(compHvDn, prefix) {
×
1461
                        cont.log.Info("Received notification for OpenStack opflexODev update, hostName: ",
×
1462
                                obj.GetAttrStr("hostName"), " dn: ", obj.GetAttrStr("dn"))
×
1463
                        cont.indexMutex.Lock()
×
1464
                        opflexOdevInfo, ok := cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")]
×
1465
                        if ok {
×
1466
                                opflexOdevInfo.opflexODevDn[obj.GetAttrStr("dn")] = struct{}{}
×
1467
                                cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")] = opflexOdevInfo
×
1468
                        } else {
×
1469
                                var openstackopflexodevinfo openstackOpflexOdevInfo
×
1470
                                opflexODevDn := make(map[string]struct{})
×
1471
                                opflexODevDn[obj.GetAttrStr("dn")] = struct{}{}
×
1472
                                openstackopflexodevinfo.fabricPathDn = obj.GetAttrStr("fabricPathDn")
×
1473
                                openstackopflexodevinfo.opflexODevDn = opflexODevDn
×
1474
                                cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")] = openstackopflexodevinfo
×
1475
                                deviceClusterUpdate = true
×
1476
                        }
×
1477
                        cont.indexMutex.Unlock()
×
1478
                }
1479
        }
1480
        return deviceClusterUpdate
×
1481
}
1482

1483
func (cont *AciController) infraRtAttEntPDeleted(dn string) {
×
1484
        // dn format : uni/infra/attentp-k8s-scale-esxi-aaep/rtattEntP-[uni/infra/funcprof/accbundle-esxi1-vpc-ipg]
×
1485
        cont.log.Info("Processing delete of infraRtAttEntP: ", dn)
×
1486

×
1487
        // extract uni/infra/funcprof/accbundle-esxi1-vpc-ipg
×
1488
        re := regexp.MustCompile(`\[(.*?)\]`)
×
1489
        matches := re.FindStringSubmatch(dn)
×
1490

×
1491
        if len(matches) < 2 {
×
1492
                cont.log.Error("Failed to extract ipg from dn : ", dn)
×
1493
                return
×
1494
        }
×
1495
        tdn := matches[1]
×
1496

×
1497
        cont.indexMutex.Lock()
×
1498
        _, ok := cont.hostFabricPathDnMap[tdn]
×
1499
        if ok {
×
1500
                delete(cont.hostFabricPathDnMap, tdn)
×
1501
                cont.log.Info("Deleted ipg : ", tdn)
×
1502
        }
×
1503
        cont.indexMutex.Unlock()
×
1504

×
1505
        if ok {
×
1506
                cont.updateDeviceCluster()
×
1507
        }
×
1508
}
1509

1510
func (cont *AciController) vpcIfDeleted(dn string) {
×
1511
        var deleted bool
×
1512
        cont.indexMutex.Lock()
×
1513
        for tDn, hostInfo := range cont.hostFabricPathDnMap {
×
1514
                if _, present := hostInfo.vpcIfDn[dn]; present {
×
1515
                        cont.log.Info("Deleting vpcIf, dn :", dn)
×
1516
                        delete(hostInfo.vpcIfDn, dn)
×
1517
                        if len(hostInfo.vpcIfDn) == 0 {
×
1518
                                cont.log.Infof("Removing fabricPathDn(%s) of ipg : %s ", hostInfo.fabricPathDn, hostInfo.host)
×
1519
                                hostInfo.fabricPathDn = ""
×
1520
                                deleted = true
×
1521
                        }
×
1522
                        cont.hostFabricPathDnMap[tDn] = hostInfo
×
1523
                }
1524
        }
1525
        cont.indexMutex.Unlock()
×
1526
        if deleted {
×
1527
                cont.updateDeviceCluster()
×
1528
        }
×
1529
}
1530

1531
func (cont *AciController) vpcIfChanged(obj apicapi.ApicObject) {
×
1532
        if cont.updateHostFabricPathDnMap(obj) {
×
1533
                cont.updateDeviceCluster()
×
1534
        }
×
1535
}
1536

1537
func (cont *AciController) updateHostFabricPathDnMap(obj apicapi.ApicObject) bool {
×
1538
        var accBndlGrpDn, fabricPathDn, dn string
×
1539
        for _, body := range obj {
×
1540
                var ok bool
×
1541
                accBndlGrpDn, ok = body.Attributes["accBndlGrpDn"].(string)
×
1542
                if !ok || (ok && accBndlGrpDn == "") {
×
1543
                        cont.log.Error("accBndlGrpDn missing/empty in vpcIf")
×
1544
                        return false
×
1545
                }
×
1546
                fabricPathDn, ok = body.Attributes["fabricPathDn"].(string)
×
1547
                if !ok && (ok && fabricPathDn == "") {
×
1548
                        cont.log.Error("fabricPathDn missing/empty in vpcIf")
×
1549
                        return false
×
1550
                }
×
1551
                dn, ok = body.Attributes["dn"].(string)
×
1552
                if !ok && (ok && dn == "") {
×
1553
                        cont.log.Error("dn missing/empty in vpcIf")
×
1554
                        return false
×
1555
                }
×
1556
        }
1557
        var updated bool
×
1558
        cont.indexMutex.Lock()
×
1559
        // If accBndlGrpDn exists in hostFabricPathDnMap, the vpcIf belongs to the cluster AEP
×
1560
        hostInfo, exists := cont.hostFabricPathDnMap[accBndlGrpDn]
×
1561
        if exists {
×
1562
                if _, present := hostInfo.vpcIfDn[dn]; !present {
×
1563
                        hostInfo.vpcIfDn[dn] = struct{}{}
×
1564
                        cont.log.Infof("vpcIf processing, dn : %s, accBndlGrpDn: %s", dn, accBndlGrpDn)
×
1565
                }
×
1566
                if hostInfo.fabricPathDn != fabricPathDn {
×
1567
                        hostInfo.fabricPathDn = fabricPathDn
×
1568
                        cont.log.Info("Updated fabricPathDn of ipg :", hostInfo.host, " to: ", hostInfo.fabricPathDn)
×
1569
                        updated = true
×
1570
                }
×
1571
                cont.hostFabricPathDnMap[accBndlGrpDn] = hostInfo
×
1572
        }
1573
        cont.indexMutex.Unlock()
×
1574
        return updated
×
1575
}
1576

1577
func (cont *AciController) infraRtAttEntPChanged(obj apicapi.ApicObject) {
×
1578
        var tdn string
×
1579
        for _, body := range obj {
×
1580
                var ok bool
×
1581
                tdn, ok = body.Attributes["tDn"].(string)
×
1582
                if !ok || (ok && tdn == "") {
×
1583
                        cont.log.Error("tDn missing/empty in infraRtAttEntP")
×
1584
                        return
×
1585
                }
×
1586
        }
1587
        var updated bool
×
1588
        cont.log.Info("infraRtAttEntP updated, tDn : ", tdn)
×
1589

×
1590
        // tdn format for vpc : /uni/infra/funcprof/accbundle-esxi1-vpc-ipg
×
1591
        // tdn format for single leaf : /uni/infra/funcprof/accportgrp-IPG_CLIENT_SIM
×
1592

×
1593
        // Ignore processing of single leaf
×
1594
        if !strings.Contains(tdn, "/accbundle-") {
×
1595
                cont.log.Info("Skipping processing of infraRtAttEntP update, not applicable for non-VPC configuration: ", tdn)
×
1596
                return
×
1597
        }
×
1598

1599
        // extract esxi1-vpc-ipg
1600
        parts := strings.Split(tdn, "/")
×
1601
        lastPart := parts[len(parts)-1]
×
1602
        host := strings.TrimPrefix(lastPart, "accbundle-")
×
1603

×
1604
        // adding entry for ipg in hostFabricPathDnMap
×
1605
        cont.indexMutex.Lock()
×
1606
        _, exists := cont.hostFabricPathDnMap[tdn]
×
1607
        if !exists {
×
1608
                var hostInfo hostFabricInfo
×
1609
                hostInfo.host = host
×
1610
                hostInfo.vpcIfDn = make(map[string]struct{})
×
1611
                cont.hostFabricPathDnMap[tdn] = hostInfo
×
1612
        }
×
1613
        cont.indexMutex.Unlock()
×
1614

×
1615
        accBndlGrpFilter := fmt.Sprintf(`query-target-filter=and(eq(vpcIf.accBndlGrpDn,"%s"))`, tdn)
×
1616
        url := fmt.Sprintf("/api/class/vpcIf.json?%s", accBndlGrpFilter)
×
1617
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
1618
        if err != nil {
×
1619
                cont.log.Error("Failed to get APIC response, err: ", err.Error())
×
1620
                return
×
1621
        }
×
1622

1623
        for _, obj := range apicresp.Imdata {
×
1624
                if cont.updateHostFabricPathDnMap(obj) && !updated {
×
1625
                        updated = true
×
1626
                }
×
1627
        }
1628

1629
        if updated {
×
1630
                cont.updateDeviceCluster()
×
1631
        }
×
1632
        return
×
1633
}
1634

1635
func (cont *AciController) opflexDeviceChanged(obj apicapi.ApicObject) {
1✔
1636
        devType := obj.GetAttrStr("devType")
1✔
1637
        domName := obj.GetAttrStr("domName")
1✔
1638
        ctrlrName := obj.GetAttrStr("ctrlrName")
1✔
1639

1✔
1640
        if !cont.config.DisableServiceVlanPreprovisioning && strings.Contains(cont.config.Flavor, "openstack") {
1✔
1641
                if cont.openStackOpflexOdevUpdate(obj) {
×
1642
                        cont.log.Info("OpenStack opflexODev for ", obj.GetAttrStr("hostName"), " is added")
×
1643
                        cont.updateDeviceCluster()
×
1644
                }
×
1645
        }
1646
        if (devType == cont.env.OpFlexDeviceType()) && (domName == cont.config.AciVmmDomain) && (ctrlrName == cont.config.AciVmmController) {
2✔
1647
                cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Processing opflex device update")
1✔
1648
                if obj.GetAttrStr("state") == "disconnected" {
2✔
1649
                        cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Opflex device disconnected")
1✔
1650
                        cont.indexMutex.Lock()
1✔
1651
                        for node, devices := range cont.nodeOpflexDevice {
1✔
1652
                                if node == obj.GetAttrStr("hostName") {
×
1653
                                        for _, device := range devices {
×
1654
                                                if device.GetDn() == obj.GetDn() {
×
1655
                                                        device.SetAttr("state", "disconnected")
×
1656
                                                        cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Debug("Opflex device cache updated for disconnected node")
×
1657
                                                }
×
1658
                                        }
1659
                                        cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", devices)
×
1660
                                        break
×
1661
                                }
1662
                        }
1663
                        cont.indexMutex.Unlock()
1✔
1664
                        cont.updateDeviceCluster()
1✔
1665
                        return
1✔
1666
                }
1667
                var nodeUpdates []string
1✔
1668

1✔
1669
                cont.indexMutex.Lock()
1✔
1670
                nodefound := false
1✔
1671
                for node, devices := range cont.nodeOpflexDevice {
2✔
1672
                        found := false
1✔
1673

1✔
1674
                        if node == obj.GetAttrStr("hostName") {
2✔
1675
                                nodefound = true
1✔
1676
                        }
1✔
1677

1678
                        for i, device := range devices {
2✔
1679
                                if device.GetDn() != obj.GetDn() {
2✔
1680
                                        continue
1✔
1681
                                }
1682
                                found = true
1✔
1683

1✔
1684
                                if obj.GetAttrStr("hostName") != node {
2✔
1685
                                        cont.fabricPathLogger(node, device).
1✔
1686
                                                Debug("Moving opflex device from node")
1✔
1687

1✔
1688
                                        devices = append(devices[:i], devices[i+1:]...)
1✔
1689
                                        cont.nodeOpflexDevice[node] = devices
1✔
1690
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1691
                                        break
1✔
1692
                                } else if (device.GetAttrStr("mac") != obj.GetAttrStr("mac")) ||
1✔
1693
                                        (device.GetAttrStr("fabricPathDn") != obj.GetAttrStr("fabricPathDn")) ||
1✔
1694
                                        (device.GetAttrStr("state") != obj.GetAttrStr("state")) {
2✔
1695
                                        cont.fabricPathLogger(node, obj).
1✔
1696
                                                Debug("Updating opflex device")
1✔
1697

1✔
1698
                                        devices = append(append(devices[:i], devices[i+1:]...), obj)
1✔
1699
                                        cont.nodeOpflexDevice[node] = devices
1✔
1700
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1701
                                        break
1✔
1702
                                }
1703
                        }
1704
                        if !found && obj.GetAttrStr("hostName") == node {
2✔
1705
                                cont.fabricPathLogger(node, obj).
1✔
1706
                                        Debug("Appending opflex device")
1✔
1707

1✔
1708
                                devices = append(devices, obj)
1✔
1709
                                cont.nodeOpflexDevice[node] = devices
1✔
1710
                                nodeUpdates = append(nodeUpdates, node)
1✔
1711
                        }
1✔
1712
                }
1713
                if !nodefound {
2✔
1714
                        node := obj.GetAttrStr("hostName")
1✔
1715
                        cont.fabricPathLogger(node, obj).Debug("Adding opflex device")
1✔
1716
                        cont.nodeOpflexDevice[node] = apicapi.ApicSlice{obj}
1✔
1717
                        nodeUpdates = append(nodeUpdates, node)
1✔
1718
                }
1✔
1719
                cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", cont.nodeOpflexDevice[obj.GetAttrStr("hostName")])
1✔
1720
                cont.indexMutex.Unlock()
1✔
1721

1✔
1722
                for _, node := range nodeUpdates {
2✔
1723
                        cont.env.NodeServiceChanged(node)
1✔
1724
                        cont.erspanSyncOpflexDev()
1✔
1725
                }
1✔
1726
                cont.updateDeviceCluster()
1✔
1727
        }
1728
}
1729

1730
func (cont *AciController) postOpflexDeviceDelete(nodes []string) {
1✔
1731
        cont.updateDeviceCluster()
1✔
1732
        for _, node := range nodes {
2✔
1733
                cont.env.NodeServiceChanged(node)
1✔
1734
                cont.erspanSyncOpflexDev()
1✔
1735
        }
1✔
1736
}
1737

1738
func (cont *AciController) opflexDeviceDeleted(dn string) {
1✔
1739
        var nodeUpdates []string
1✔
1740
        var dnFound bool //to check if the dn belongs to this cluster
1✔
1741
        cont.log.Info("Processing opflex device delete notification of ", dn)
1✔
1742
        cont.indexMutex.Lock()
1✔
1743
        for node, devices := range cont.nodeOpflexDevice {
2✔
1744
                for i, device := range devices {
2✔
1745
                        if device.GetDn() != dn {
2✔
1746
                                continue
1✔
1747
                        }
1748
                        dnFound = true
1✔
1749
                        cont.fabricPathLogger(node, device).
1✔
1750
                                Debug("Deleting opflex device path")
1✔
1751
                        devices = append(devices[:i], devices[i+1:]...)
1✔
1752
                        cont.nodeOpflexDevice[node] = devices
1✔
1753
                        cont.log.Info("Deleted opflex device of node ", node, ": ", dn)
1✔
1754
                        nodeUpdates = append(nodeUpdates, node)
1✔
1755
                        break
1✔
1756
                }
1757
                if len(devices) == 0 {
2✔
1758
                        delete(cont.nodeOpflexDevice, node)
1✔
1759
                }
1✔
1760
        }
1761

1762
        // For clusters other than OpenShift On OpenStack,
1763
        // openStackFabricPathDnMap will be empty
1764
        for host, opflexOdevInfo := range cont.openStackFabricPathDnMap {
1✔
1765
                if _, ok := opflexOdevInfo.opflexODevDn[dn]; ok {
×
1766
                        cont.log.Info("Received OpenStack opflexODev delete notification for ", dn)
×
1767
                        delete(opflexOdevInfo.opflexODevDn, dn)
×
1768
                        if len(opflexOdevInfo.opflexODevDn) < 1 {
×
1769
                                delete(cont.openStackFabricPathDnMap, host)
×
1770
                                cont.log.Info("OpenStack opflexODev of host ", host, " is deleted from cache")
×
1771
                                dnFound = true
×
1772
                        } else {
×
1773
                                cont.openStackFabricPathDnMap[host] = opflexOdevInfo
×
1774
                        }
×
1775
                        break
×
1776
                }
1777
        }
1778
        cont.indexMutex.Unlock()
1✔
1779

1✔
1780
        if dnFound {
2✔
1781
                cont.postOpflexDeviceDelete(nodeUpdates)
1✔
1782
        }
1✔
1783
}
1784

1785
func (cont *AciController) writeApicSvc(key string, service *v1.Service) {
1✔
1786
        if cont.isCNOEnabled() {
1✔
1787
                return
×
1788
        }
×
1789
        aobj := apicapi.NewVmmInjectedSvc(cont.vmmDomainProvider(),
1✔
1790
                cont.config.AciVmmDomain, cont.config.AciVmmController,
1✔
1791
                service.Namespace, service.Name)
1✔
1792
        aobjDn := aobj.GetDn()
1✔
1793
        aobj.SetAttr("guid", string(service.UID))
1✔
1794

1✔
1795
        svcns := service.ObjectMeta.Namespace
1✔
1796
        _, exists, err := cont.namespaceIndexer.GetByKey(svcns)
1✔
1797
        if err != nil {
1✔
1798
                cont.log.Error("Failed to lookup ns : ", svcns, " ", err)
×
1799
                return
×
1800
        }
×
1801
        if !exists {
2✔
1802
                cont.log.Debug("Namespace of service ", service.ObjectMeta.Name, ": ", svcns, " doesn't exist, hence not sending an update to the APIC")
1✔
1803
                return
1✔
1804
        }
1✔
1805

1806
        if !cont.serviceEndPoints.SetServiceApicObject(aobj, service) {
2✔
1807
                return
1✔
1808
        }
1✔
1809
        var setApicSvcDnsName bool
1✔
1810
        if len(cont.config.ApicHosts) != 0 && apicapi.ApicVersion >= "5.1" {
1✔
1811
                setApicSvcDnsName = true
×
1812
        }
×
1813
        // APIC model only allows one of these
1814
        for _, ingress := range service.Status.LoadBalancer.Ingress {
1✔
1815
                if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
1816
                        aobj.SetAttr("lbIp", ingress.IP)
×
1817
                } else if ingress.Hostname != "" {
×
1818
                        ipList, err := net.LookupHost(ingress.Hostname)
×
1819
                        if err == nil && len(ipList) > 0 {
×
1820
                                aobj.SetAttr("lbIp", ipList[0])
×
1821
                        } else {
×
1822
                                cont.log.Errorf("Lookup: err: %v, ipList: %+v", err, ipList)
×
1823
                        }
×
1824
                }
1825
                break
×
1826
        }
1827
        if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != "None" {
2✔
1828
                aobj.SetAttr("clusterIp", service.Spec.ClusterIP)
1✔
1829
        }
1✔
1830

1831
        var t string
1✔
1832
        switch service.Spec.Type {
1✔
1833
        case v1.ServiceTypeClusterIP:
×
1834
                t = "clusterIp"
×
1835
        case v1.ServiceTypeNodePort:
×
1836
                t = "nodePort"
×
1837
        case v1.ServiceTypeLoadBalancer:
1✔
1838
                t = "loadBalancer"
1✔
1839
        case v1.ServiceTypeExternalName:
×
1840
                t = "externalName"
×
1841
        }
1842
        if t != "" {
2✔
1843
                aobj.SetAttr("type", t)
1✔
1844
        }
1✔
1845

1846
        if setApicSvcDnsName || cont.config.Flavor == "k8s-overlay" {
1✔
1847
                dnsName := fmt.Sprintf("%s.%s.svc.cluster.local", service.Name, service.Namespace)
×
1848

×
1849
                for _, ingress := range service.Status.LoadBalancer.Ingress {
×
1850
                        if ingress.Hostname != "" {
×
1851
                                aobj.SetAttr("dnsName", ingress.Hostname)
×
1852
                        } else if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
1853
                                aobj.SetAttr("dnsName", dnsName)
×
1854
                        }
×
1855
                }
1856
                if t == "clusterIp" || t == "nodePort" || t == "externalName" {
×
1857
                        aobj.SetAttr("dnsName", dnsName)
×
1858
                }
×
1859
        }
1860
        for _, port := range service.Spec.Ports {
2✔
1861
                proto := getProtocolStr(port.Protocol)
1✔
1862
                p := apicapi.NewVmmInjectedSvcPort(aobjDn,
1✔
1863
                        strconv.Itoa(int(port.Port)), proto, port.TargetPort.String())
1✔
1864
                p.SetAttr("nodePort", strconv.Itoa(int(port.NodePort)))
1✔
1865
                aobj.AddChild(p)
1✔
1866
        }
1✔
1867
        if cont.config.EnableVmmInjectedLabels && service.ObjectMeta.Labels != nil && apicapi.ApicVersion >= "5.2" {
1✔
1868
                for key, val := range service.ObjectMeta.Labels {
×
1869
                        newLabelKey := cont.aciNameForKey("label", key)
×
1870
                        label := apicapi.NewVmmInjectedLabel(aobj.GetDn(),
×
1871
                                newLabelKey, val)
×
1872
                        aobj.AddChild(label)
×
1873
                }
×
1874
        }
1875
        name := cont.aciNameForKey("service-vmm", key)
1✔
1876
        cont.log.Debug("Write Service Object: ", aobj)
1✔
1877
        cont.apicConn.WriteApicObjects(name, apicapi.ApicSlice{aobj})
1✔
1878
        cont.log.Debugf("svcObject: %+v", aobj)
1✔
1879
}
1880

1881
func removeAllConditions(conditions []metav1.Condition, conditionType string) []metav1.Condition {
1✔
1882
        i := 0
1✔
1883
        for _, cond := range conditions {
1✔
1884
                if cond.Type != conditionType {
×
1885
                        conditions[i] = cond
×
1886
                }
×
1887
        }
1888
        return conditions[:i]
1✔
1889
}
1890

1891
func (cont *AciController) updateServiceCondition(service *v1.Service, success bool, reason string, message string) bool {
1✔
1892
        conditionType := "LbIpamAllocation"
1✔
1893

1✔
1894
        var condition metav1.Condition
1✔
1895
        if success {
2✔
1896
                condition.Status = metav1.ConditionTrue
1✔
1897
        } else {
2✔
1898
                condition.Status = metav1.ConditionFalse
1✔
1899
                condition.Message = message
1✔
1900
        }
1✔
1901
        condition.Type = conditionType
1✔
1902
        condition.Reason = reason
1✔
1903
        condition.LastTransitionTime = metav1.Time{time.Now()}
1✔
1904
        for _, cond := range service.Status.Conditions {
2✔
1905
                if cond.Type == conditionType &&
1✔
1906
                        cond.Status == condition.Status &&
1✔
1907
                        cond.Message == condition.Message &&
1✔
1908
                        cond.Reason == condition.Reason {
2✔
1909
                        return false
1✔
1910
                }
1✔
1911
        }
1912

1913
        service.Status.Conditions = removeAllConditions(service.Status.Conditions, conditionType)
1✔
1914
        service.Status.Conditions = append(service.Status.Conditions, condition)
1✔
1915
        return true
1✔
1916
}
1917

1918
func (cont *AciController) validateRequestedIps(lbIpList []string) (net.IP, net.IP, bool) {
1✔
1919
        var ipv4, ipv6 net.IP
1✔
1920
        for _, lbIp := range lbIpList {
2✔
1921
                ip := net.ParseIP(lbIp)
1✔
1922
                if ip != nil {
2✔
1923
                        if ip.To4() != nil {
2✔
1924
                                if ipv4.Equal(net.IP{}) {
2✔
1925
                                        ipv4 = ip
1✔
1926
                                } else {
2✔
1927
                                        cont.log.Error("Annotation should have only one ipv4")
1✔
1928
                                        return ipv4, ipv6, false
1✔
1929
                                }
1✔
1930
                        } else if ip.To16() != nil {
2✔
1931
                                if ipv6.Equal(net.IP{}) {
2✔
1932
                                        ipv6 = ip
1✔
1933
                                } else {
2✔
1934
                                        cont.log.Error("Annotation should have only one ipv6")
1✔
1935
                                        return ipv4, ipv6, false
1✔
1936
                                }
1✔
1937
                        }
1938
                }
1939
        }
1940
        return ipv4, ipv6, true
1✔
1941
}
1942

1943
func (cont *AciController) returnUnusedStaticIngressIps(staticIngressIps, requestedIps []net.IP) {
1✔
1944
        for _, staticIp := range staticIngressIps {
2✔
1945
                found := false
1✔
1946
                for _, reqIp := range requestedIps {
2✔
1947
                        if reqIp.Equal(staticIp) {
2✔
1948
                                found = true
1✔
1949
                        }
1✔
1950
                }
1951
                if !found {
2✔
1952
                        returnIps(cont.staticServiceIps, []net.IP{staticIp})
1✔
1953
                }
1✔
1954
        }
1955
}
1956

1957
func (cont *AciController) allocateServiceIps(servicekey string,
1958
        service *v1.Service) bool {
1✔
1959
        logger := serviceLogger(cont.log, service)
1✔
1960
        cont.indexMutex.Lock()
1✔
1961
        meta, ok := cont.serviceMetaCache[servicekey]
1✔
1962
        if !ok {
2✔
1963
                meta = &serviceMeta{}
1✔
1964
                cont.serviceMetaCache[servicekey] = meta
1✔
1965

1✔
1966
                // Read any existing IPs and attempt to allocate them to the pod
1✔
1967
                for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
1968
                        ip := net.ParseIP(ingress.IP)
1✔
1969
                        if ip == nil {
1✔
1970
                                continue
×
1971
                        }
1972
                        if ip.To4() != nil {
2✔
1973
                                if cont.serviceIps.GetV4IpCache()[0].RemoveIp(ip) {
2✔
1974
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
1975
                                } else if cont.staticServiceIps.V4.RemoveIp(ip) {
3✔
1976
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
1977
                                }
1✔
1978
                        } else if ip.To16() != nil {
2✔
1979
                                if cont.serviceIps.GetV6IpCache()[0].RemoveIp(ip) {
2✔
1980
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
1981
                                } else if cont.staticServiceIps.V6.RemoveIp(ip) {
3✔
1982
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
1983
                                }
1✔
1984
                        }
1985
                }
1986
        }
1987

1988
        if !cont.serviceSyncEnabled {
2✔
1989
                cont.indexMutex.Unlock()
1✔
1990
                return false
1✔
1991
        }
1✔
1992

1993
        var requestedIps []net.IP
1✔
1994
        // try to give the requested load balancer IP to the pod
1✔
1995
        lbIps, ok := service.ObjectMeta.Annotations[metadata.LbIpAnnotation]
1✔
1996
        if ok {
2✔
1997
                lbIpList := strings.Split(lbIps, ",")
1✔
1998
                ipv4, ipv6, valid := cont.validateRequestedIps(lbIpList)
1✔
1999
                if valid {
2✔
2000
                        if ipv4 != nil {
2✔
2001
                                requestedIps = append(requestedIps, ipv4)
1✔
2002
                        }
1✔
2003
                        if ipv6 != nil {
2✔
2004
                                requestedIps = append(requestedIps, ipv6)
1✔
2005
                        }
1✔
2006
                } else {
1✔
2007
                        cont.returnServiceIps(meta.ingressIps)
1✔
2008
                        cont.log.Error("Invalid LB IP annotation for service ", servicekey)
1✔
2009
                        condUpdated := cont.updateServiceCondition(service, false, "InvalidAnnotation", "Invalid Loadbalancer IP annotation")
1✔
2010
                        if condUpdated {
2✔
2011
                                _, err := cont.updateServiceStatus(service)
1✔
2012
                                if err != nil {
1✔
2013
                                        logger.Error("Failed to update service status : ", err)
×
2014
                                        cont.indexMutex.Unlock()
×
2015
                                        return true
×
2016
                                }
×
2017
                        }
2018
                        cont.indexMutex.Unlock()
1✔
2019
                        return false
1✔
2020
                }
2021
        } else {
1✔
2022
                requestedIp := net.ParseIP(service.Spec.LoadBalancerIP)
1✔
2023
                if requestedIp != nil {
2✔
2024
                        requestedIps = append(requestedIps, requestedIp)
1✔
2025
                }
1✔
2026
        }
2027
        if len(requestedIps) > 0 {
2✔
2028
                meta.requestedIps = []net.IP{}
1✔
2029
                for _, requestedIp := range requestedIps {
2✔
2030
                        hasRequestedIp := false
1✔
2031
                        for _, ip := range meta.staticIngressIps {
2✔
2032
                                if reflect.DeepEqual(requestedIp, ip) {
2✔
2033
                                        hasRequestedIp = true
1✔
2034
                                }
1✔
2035
                        }
2036
                        if !hasRequestedIp {
2✔
2037
                                if requestedIp.To4() != nil &&
1✔
2038
                                        cont.staticServiceIps.V4.RemoveIp(requestedIp) {
2✔
2039
                                        hasRequestedIp = true
1✔
2040
                                } else if requestedIp.To16() != nil &&
2✔
2041
                                        cont.staticServiceIps.V6.RemoveIp(requestedIp) {
2✔
2042
                                        hasRequestedIp = true
1✔
2043
                                }
1✔
2044
                        }
2045
                        if hasRequestedIp {
2✔
2046
                                meta.requestedIps = append(meta.requestedIps, requestedIp)
1✔
2047
                        }
1✔
2048
                }
2049
                cont.returnUnusedStaticIngressIps(meta.staticIngressIps, meta.requestedIps)
1✔
2050
                meta.staticIngressIps = meta.requestedIps
1✔
2051
                cont.returnServiceIps(meta.ingressIps)
1✔
2052
                meta.ingressIps = nil
1✔
2053
                // If no requested ips are allocatable
1✔
2054
                if len(meta.requestedIps) < 1 {
2✔
2055
                        logger.Error("No Requested Ip addresses available for service ", servicekey)
1✔
2056
                        condUpdated := cont.updateServiceCondition(service, false, "RequestedIpsNotAllocatable", "The requested ips for loadbalancer service are not available or not in extern static range")
1✔
2057
                        if condUpdated {
2✔
2058
                                _, err := cont.updateServiceStatus(service)
1✔
2059
                                if err != nil {
1✔
2060
                                        cont.indexMutex.Unlock()
×
2061
                                        logger.Error("Failed to update service status: ", err)
×
2062
                                        return true
×
2063
                                }
×
2064
                        }
2065
                        cont.indexMutex.Unlock()
1✔
2066
                        return false
1✔
2067
                }
2068
        } else if len(meta.requestedIps) > 0 {
1✔
2069
                meta.requestedIps = nil
×
2070
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
×
2071
                meta.staticIngressIps = nil
×
2072
        }
×
2073
        ingressIps := make([]net.IP, 0)
1✔
2074
        ingressIps = append(ingressIps, meta.ingressIps...)
1✔
2075
        ingressIps = append(ingressIps, meta.staticIngressIps...)
1✔
2076

1✔
2077
        var ipv4, ipv6 net.IP
1✔
2078
        for _, ip := range ingressIps {
2✔
2079
                if ip.To4() != nil {
2✔
2080
                        ipv4 = ip
1✔
2081
                } else if ip.To16() != nil {
3✔
2082
                        ipv6 = ip
1✔
2083
                }
1✔
2084
        }
2085
        var clusterIPv4, clusterIPv6 net.IP
1✔
2086
        clusterIPs := append([]string{service.Spec.ClusterIP}, service.Spec.ClusterIPs...)
1✔
2087
        for _, ipStr := range clusterIPs {
2✔
2088
                ip := net.ParseIP(ipStr)
1✔
2089
                if ip == nil {
1✔
2090
                        continue
×
2091
                }
2092
                if ip.To4() != nil && clusterIPv4 == nil {
2✔
2093
                        clusterIPv4 = ip
1✔
2094
                } else if ip.To16() != nil && strings.Contains(ip.String(), ":") && clusterIPv6 == nil {
3✔
2095
                        clusterIPv6 = ip
1✔
2096
                }
1✔
2097
        }
2098
        if clusterIPv4 != nil && ipv4 == nil {
2✔
2099
                if len(requestedIps) < 1 {
2✔
2100
                        ipv4, _ = cont.serviceIps.AllocateIp(true)
1✔
2101
                        if ipv4 != nil {
2✔
2102
                                ingressIps = append(ingressIps, ipv4)
1✔
2103
                        }
1✔
2104
                }
2105
        } else if clusterIPv4 == nil && ipv4 != nil {
1✔
2106
                cont.removeIpFromIngressIPList(&ingressIps, ipv4)
×
2107
        }
×
2108

2109
        if clusterIPv6 != nil && ipv6 == nil {
2✔
2110
                if len(requestedIps) < 1 {
2✔
2111
                        ipv6, _ = cont.serviceIps.AllocateIp(false)
1✔
2112
                        if ipv6 != nil {
2✔
2113
                                ingressIps = append(ingressIps, ipv6)
1✔
2114
                        }
1✔
2115
                }
2116
        } else if clusterIPv6 == nil && ipv6 != nil {
1✔
2117
                cont.removeIpFromIngressIPList(&ingressIps, ipv6)
×
2118
        }
×
2119

2120
        if len(requestedIps) < 1 {
2✔
2121
                meta.ingressIps = ingressIps
1✔
2122
        }
1✔
2123
        if ipv4 == nil && ipv6 == nil {
2✔
2124
                logger.Error("No IP addresses available for service")
1✔
2125
                cont.indexMutex.Unlock()
1✔
2126
                return true
1✔
2127
        }
1✔
2128
        cont.indexMutex.Unlock()
1✔
2129
        var newIngress []v1.LoadBalancerIngress
1✔
2130
        for _, ip := range meta.ingressIps {
2✔
2131
                newIngress = append(newIngress, v1.LoadBalancerIngress{IP: ip.String()})
1✔
2132
        }
1✔
2133
        for _, ip := range meta.staticIngressIps {
2✔
2134
                newIngress = append(newIngress, v1.LoadBalancerIngress{IP: ip.String()})
1✔
2135
        }
1✔
2136

2137
        ipUpdated := false
1✔
2138
        if !reflect.DeepEqual(newIngress, service.Status.LoadBalancer.Ingress) {
2✔
2139
                service.Status.LoadBalancer.Ingress = newIngress
1✔
2140

1✔
2141
                logger.WithFields(logrus.Fields{
1✔
2142
                        "status": service.Status.LoadBalancer.Ingress,
1✔
2143
                }).Info("Updating service load balancer status")
1✔
2144

1✔
2145
                ipUpdated = true
1✔
2146
        }
1✔
2147

2148
        success := true
1✔
2149
        reason := "Success"
1✔
2150
        message := ""
1✔
2151
        if len(requestedIps) > 0 && len(requestedIps) != len(meta.staticIngressIps) {
1✔
2152
                success = false
×
2153
                reason = "OneIpNotAllocatable"
×
2154
                message = "One of the requested Ips is not allocatable"
×
2155
        }
×
2156
        condUpdated := cont.updateServiceCondition(service, success, reason, message)
1✔
2157
        if ipUpdated || condUpdated {
2✔
2158
                _, err := cont.updateServiceStatus(service)
1✔
2159
                if err != nil {
1✔
2160
                        logger.Error("Failed to update service status: ", err)
×
2161
                        return true
×
2162
                }
×
2163
        }
2164
        return false
1✔
2165
}
2166

2167
func (cont *AciController) handleServiceDelete(servicekey string) bool {
1✔
2168
        if cont.isCNOEnabled() {
1✔
2169
                return false
×
2170
        }
×
2171
        cont.clearLbService(servicekey)
1✔
2172
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("service-vmm",
1✔
2173
                servicekey))
1✔
2174
        return false
1✔
2175
}
2176

2177
func (cont *AciController) handleServiceUpdate(service *v1.Service) bool {
1✔
2178
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2179
        if err != nil {
1✔
2180
                serviceLogger(cont.log, service).
×
2181
                        Error("Could not create service key: ", err)
×
2182
                return false
×
2183
        }
×
2184
        if cont.isCNOEnabled() {
1✔
2185
                return false
×
2186
        }
×
2187
        var requeue bool
1✔
2188
        isLoadBalancer := service.Spec.Type == v1.ServiceTypeLoadBalancer
1✔
2189
        if isLoadBalancer {
2✔
2190
                if *cont.config.AllocateServiceIps {
2✔
2191
                        requeue = cont.allocateServiceIps(servicekey, service)
1✔
2192
                }
1✔
2193
                // Check if any existing SNAT policy (with no explicit SnatIp)
2194
                // matches this service. This handles the case where the SNAT
2195
                // policy was created before the service existed.
2196
                cont.indexMutex.Lock()
1✔
2197
                if _, exists := cont.snatServices[servicekey]; !exists {
2✔
2198
                        for _, policy := range cont.snatPolicyCache {
1✔
UNCOV
2199
                                if len(policy.SnatIp) == 0 {
×
UNCOV
2200
                                        if len(policy.Selector.Namespace) == 0 ||
×
UNCOV
2201
                                                policy.Selector.Namespace == service.ObjectMeta.Namespace {
×
UNCOV
2202
                                                selector := labels.SelectorFromSet(labels.Set(policy.Selector.Labels))
×
UNCOV
2203
                                                if selector.Matches(labels.Set(service.ObjectMeta.Labels)) {
×
UNCOV
2204
                                                        cont.snatServices[servicekey] = true
×
UNCOV
2205
                                                        break
×
2206
                                                }
2207
                                        }
2208
                                }
2209
                        }
2210
                }
2211
                if cont.serviceSyncEnabled {
2✔
2212
                        cont.indexMutex.Unlock()
1✔
2213
                        err = cont.updateServiceDeviceInstance(servicekey, service)
1✔
2214
                        if err != nil {
1✔
2215
                                serviceLogger(cont.log, service).
×
2216
                                        Error("Failed to update service device Instance: ", err)
×
2217
                                return true
×
2218
                        }
×
2219
                } else {
1✔
2220
                        cont.indexMutex.Unlock()
1✔
2221
                }
1✔
2222
        } else {
1✔
2223
                cont.clearLbService(servicekey)
1✔
2224
        }
1✔
2225
        cont.writeApicSvc(servicekey, service)
1✔
2226
        return requeue
1✔
2227
}
2228

2229
func (cont *AciController) clearLbService(servicekey string) {
1✔
2230
        cont.indexMutex.Lock()
1✔
2231
        if meta, ok := cont.serviceMetaCache[servicekey]; ok {
2✔
2232
                cont.returnServiceIps(meta.ingressIps)
1✔
2233
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
1✔
2234
                delete(cont.serviceMetaCache, servicekey)
1✔
2235
                delete(cont.snatServices, servicekey)
1✔
2236
        }
1✔
2237
        cont.indexMutex.Unlock()
1✔
2238
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("svc", servicekey))
1✔
2239
}
2240

2241
func getEndpointsIps(endpoints *v1.Endpoints) map[string]bool {
1✔
2242
        ips := make(map[string]bool)
1✔
2243
        for _, subset := range endpoints.Subsets {
2✔
2244
                for _, addr := range subset.Addresses {
2✔
2245
                        ips[addr.IP] = true
1✔
2246
                }
1✔
2247
                for _, addr := range subset.NotReadyAddresses {
1✔
2248
                        ips[addr.IP] = true
×
2249
                }
×
2250
        }
2251
        return ips
1✔
2252
}
2253

2254
func (cont *AciController) processServiceTargetPorts(service *v1.Service, svcKey string, old bool) map[string]targetPort {
1✔
2255
        ports := make(map[string]targetPort)
1✔
2256
        for _, port := range service.Spec.Ports {
2✔
2257
                var key string
1✔
2258
                portnums := make(map[int]bool)
1✔
2259

1✔
2260
                if port.TargetPort.Type == intstr.String {
2✔
2261
                        entry, exists := cont.namedPortServiceIndex[svcKey]
1✔
2262
                        if !old {
2✔
2263
                                if !exists {
2✔
2264
                                        cont.log.Debugf("Creating named port index for service: %s, port: %s", svcKey, port.Name)
1✔
2265
                                        newEntry := make(namedPortServiceIndexEntry)
1✔
2266
                                        entry = &newEntry
1✔
2267
                                }
1✔
2268
                                (*entry)[port.Name] = &namedPortServiceIndexPort{
1✔
2269
                                        targetPortName: port.TargetPort.String(),
1✔
2270
                                        resolvedPorts:  make(map[int]bool),
1✔
2271
                                }
1✔
2272
                                cont.namedPortServiceIndex[svcKey] = entry
1✔
2273
                        } else if exists {
2✔
2274
                                delete(*entry, port.Name)
1✔
2275
                                cont.log.Debugf("Removed named port index for service: %s port: %s, entry: %v", svcKey, port.Name, entry)
1✔
2276
                                if len(*entry) == 0 {
2✔
2277
                                        delete(cont.namedPortServiceIndex, svcKey)
1✔
2278
                                } else {
2✔
2279
                                        cont.namedPortServiceIndex[svcKey] = entry
1✔
2280
                                }
1✔
2281
                        }
2282
                        key = portProto(&port.Protocol) + "-name-" + port.TargetPort.String()
1✔
2283
                } else {
1✔
2284
                        portNum := port.TargetPort.IntValue()
1✔
2285
                        if portNum <= 0 {
2✔
2286
                                portNum = int(port.Port)
1✔
2287
                        }
1✔
2288
                        key = portProto(&port.Protocol) + "-num-" + strconv.Itoa(portNum)
1✔
2289
                        portnums[portNum] = true
1✔
2290
                }
2291

2292
                ports[key] = targetPort{
1✔
2293
                        proto: port.Protocol,
1✔
2294
                        ports: portnums,
1✔
2295
                }
1✔
2296
        }
2297
        return ports
1✔
2298
}
2299

2300
func (cont *AciController) endpointsAdded(obj interface{}) {
1✔
2301
        endpoints := obj.(*v1.Endpoints)
1✔
2302
        servicekey, err := cache.MetaNamespaceKeyFunc(obj.(*v1.Endpoints))
1✔
2303
        if err != nil {
1✔
2304
                cont.log.Error("Could not create service key: ", err)
×
2305
                return
×
2306
        }
×
2307

2308
        ips := getEndpointsIps(endpoints)
1✔
2309
        cont.indexMutex.Lock()
1✔
2310
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
2311
        cont.queueIPNetPolUpdates(ips)
1✔
2312
        cont.indexMutex.Unlock()
1✔
2313

1✔
2314
        cont.queueEndpointsNetPolUpdates(endpoints)
1✔
2315

1✔
2316
        cont.queueServiceUpdateByKey(servicekey)
1✔
2317
}
2318

2319
func (cont *AciController) endpointsDeleted(obj interface{}) {
1✔
2320
        endpoints, isEndpoints := obj.(*v1.Endpoints)
1✔
2321
        if !isEndpoints {
1✔
2322
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2323
                if !ok {
×
2324
                        cont.log.Error("Received unexpected object: ", obj)
×
2325
                        return
×
2326
                }
×
2327
                endpoints, ok = deletedState.Obj.(*v1.Endpoints)
×
2328
                if !ok {
×
2329
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpoints object: ", deletedState.Obj)
×
2330
                        return
×
2331
                }
×
2332
        }
2333
        servicekey, err := cache.MetaNamespaceKeyFunc(endpoints)
1✔
2334
        if err != nil {
1✔
2335
                cont.log.Error("Could not create service key: ", err)
×
2336
                return
×
2337
        }
×
2338

2339
        ips := getEndpointsIps(endpoints)
1✔
2340
        cont.indexMutex.Lock()
1✔
2341
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
2342
        cont.queueIPNetPolUpdates(ips)
1✔
2343
        cont.indexMutex.Unlock()
1✔
2344

1✔
2345
        cont.queueEndpointsNetPolUpdates(endpoints)
1✔
2346

1✔
2347
        cont.queueServiceUpdateByKey(servicekey)
1✔
2348
}
2349

2350
func (cont *AciController) endpointsUpdated(oldEps, newEps interface{}) {
1✔
2351
        oldendpoints := oldEps.(*v1.Endpoints)
1✔
2352
        newendpoints := newEps.(*v1.Endpoints)
1✔
2353
        servicekey, err := cache.MetaNamespaceKeyFunc(newendpoints)
1✔
2354
        if err != nil {
1✔
2355
                cont.log.Error("Could not create service key: ", err)
×
2356
                return
×
2357
        }
×
2358

2359
        oldIps := getEndpointsIps(oldendpoints)
1✔
2360
        newIps := getEndpointsIps(newendpoints)
1✔
2361
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
2362
                cont.indexMutex.Lock()
1✔
2363
                cont.queueIPNetPolUpdates(oldIps)
1✔
2364
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
2365
                cont.queueIPNetPolUpdates(newIps)
1✔
2366
                cont.indexMutex.Unlock()
1✔
2367
        }
1✔
2368

2369
        if !reflect.DeepEqual(oldendpoints.Subsets, newendpoints.Subsets) {
2✔
2370
                cont.queueEndpointsNetPolUpdates(oldendpoints)
1✔
2371
                cont.queueEndpointsNetPolUpdates(newendpoints)
1✔
2372
        }
1✔
2373

2374
        cont.queueServiceUpdateByKey(servicekey)
1✔
2375
}
2376

2377
func (cont *AciController) serviceAdded(obj interface{}) {
1✔
2378
        service := obj.(*v1.Service)
1✔
2379
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2380
        if err != nil {
1✔
2381
                serviceLogger(cont.log, service).
×
2382
                        Error("Could not create service key: ", err)
×
2383
                return
×
2384
        }
×
2385

2386
        cont.indexMutex.Lock()
1✔
2387
        ports := cont.processServiceTargetPorts(service, servicekey, false)
1✔
2388
        cont.queuePortNetPolUpdates(ports)
1✔
2389
        cont.updateTargetPortIndex(true, servicekey, nil, ports)
1✔
2390
        cont.indexMutex.Unlock()
1✔
2391

1✔
2392
        cont.queueServiceUpdateByKey(servicekey)
1✔
2393
}
2394

2395
func (cont *AciController) serviceUpdated(oldSvc, newSvc interface{}) {
1✔
2396
        oldservice := oldSvc.(*v1.Service)
1✔
2397
        newservice := newSvc.(*v1.Service)
1✔
2398
        servicekey, err := cache.MetaNamespaceKeyFunc(newservice)
1✔
2399
        if err != nil {
1✔
2400
                serviceLogger(cont.log, newservice).
×
2401
                        Error("Could not create service key: ", err)
×
2402
                return
×
2403
        }
×
2404
        if !reflect.DeepEqual(oldservice.Spec.Ports, newservice.Spec.Ports) {
1✔
2405
                cont.indexMutex.Lock()
×
2406
                oldPorts := cont.processServiceTargetPorts(oldservice, servicekey, true)
×
2407
                newPorts := cont.processServiceTargetPorts(newservice, servicekey, false)
×
2408
                cont.queuePortNetPolUpdates(oldPorts)
×
2409
                cont.updateTargetPortIndex(true, servicekey, oldPorts, newPorts)
×
2410
                cont.queuePortNetPolUpdates(newPorts)
×
2411
                cont.indexMutex.Unlock()
×
2412
        }
×
2413
        cont.queueServiceUpdateByKey(servicekey)
1✔
2414
}
2415

2416
func (cont *AciController) serviceDeleted(obj interface{}) {
1✔
2417
        service, isService := obj.(*v1.Service)
1✔
2418
        if !isService {
1✔
2419
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2420
                if !ok {
×
2421
                        serviceLogger(cont.log, service).
×
2422
                                Error("Received unexpected object: ", obj)
×
2423
                        return
×
2424
                }
×
2425
                service, ok = deletedState.Obj.(*v1.Service)
×
2426
                if !ok {
×
2427
                        serviceLogger(cont.log, service).
×
2428
                                Error("DeletedFinalStateUnknown contained non-Services object: ", deletedState.Obj)
×
2429
                        return
×
2430
                }
×
2431
        }
2432
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2433
        if err != nil {
1✔
2434
                serviceLogger(cont.log, service).
×
2435
                        Error("Could not create service key: ", err)
×
2436
                return
×
2437
        }
×
2438

2439
        cont.indexMutex.Lock()
1✔
2440
        ports := cont.processServiceTargetPorts(service, servicekey, true)
1✔
2441
        cont.updateTargetPortIndex(true, servicekey, ports, nil)
1✔
2442
        cont.queuePortNetPolUpdates(ports)
1✔
2443
        cont.indexMutex.Unlock()
1✔
2444

1✔
2445
        deletedServiceKey := "DELETED_" + servicekey
1✔
2446
        cont.queueServiceUpdateByKey(deletedServiceKey)
1✔
2447
}
2448

2449
func (cont *AciController) serviceFullSync() {
1✔
2450
        cache.ListAll(cont.serviceIndexer, labels.Everything(),
1✔
2451
                func(sobj interface{}) {
2✔
2452
                        cont.queueServiceUpdate(sobj.(*v1.Service))
1✔
2453
                })
1✔
2454
}
2455

2456
func (cont *AciController) getEndpointSliceIps(endpointSlice *discovery.EndpointSlice) map[string]bool {
1✔
2457
        ips := make(map[string]bool)
1✔
2458
        for _, endpoints := range endpointSlice.Endpoints {
2✔
2459
                for _, addr := range endpoints.Addresses {
2✔
2460
                        ips[addr] = true
1✔
2461
                }
1✔
2462
        }
2463
        return ips
1✔
2464
}
2465

2466
func (cont *AciController) notReadyEndpointPresent(endpointSlice *discovery.EndpointSlice) bool {
×
2467
        for _, endpoints := range endpointSlice.Endpoints {
×
2468
                if (endpoints.Conditions.Ready != nil && !*endpoints.Conditions.Ready) &&
×
2469
                        (endpoints.Conditions.Terminating == nil || !*endpoints.Conditions.Terminating) {
×
2470
                        return true
×
2471
                }
×
2472
        }
2473
        return false
×
2474
}
2475

2476
func (cont *AciController) getEndpointSliceEpIps(endpoints *discovery.Endpoint) map[string]bool {
×
2477
        ips := make(map[string]bool)
×
2478
        for _, addr := range endpoints.Addresses {
×
2479
                ips[addr] = true
×
2480
        }
×
2481
        return ips
×
2482
}
2483

2484
func (cont *AciController) processDelayedEpSlices() {
1✔
2485
        var processEps []DelayedEpSlice
1✔
2486
        cont.indexMutex.Lock()
1✔
2487
        for i := 0; i < len(cont.delayedEpSlices); i++ {
1✔
2488
                delayedepslice := cont.delayedEpSlices[i]
×
2489
                if time.Now().After(delayedepslice.DelayedTime) {
×
2490
                        var toprocess DelayedEpSlice
×
2491
                        err := util.DeepCopyObj(&delayedepslice, &toprocess)
×
2492
                        if err != nil {
×
2493
                                cont.log.Error(err)
×
2494
                                continue
×
2495
                        }
2496
                        processEps = append(processEps, toprocess)
×
2497
                        cont.delayedEpSlices = append(cont.delayedEpSlices[:i], cont.delayedEpSlices[i+1:]...)
×
2498
                }
2499
        }
2500

2501
        cont.indexMutex.Unlock()
1✔
2502
        for _, epslice := range processEps {
1✔
2503
                //ignore the epslice if newly added endpoint is not ready
×
2504
                if cont.notReadyEndpointPresent(epslice.NewEpSlice) {
×
2505
                        cont.log.Debug("Ignoring the update as the new endpoint is not ready : ", epslice.NewEpSlice)
×
2506
                } else {
×
2507
                        cont.log.Debug("Processing update of epslice : ", epslice.NewEpSlice)
×
2508
                        cont.doendpointSliceUpdated(epslice.OldEpSlice, epslice.NewEpSlice)
×
2509
                }
×
2510
        }
2511
}
2512

2513
func (cont *AciController) resolveServiceNamedPortFromEpSlice(epSlice *discovery.EndpointSlice, serviceKey string, old bool) {
1✔
2514
        indexEntry, ok := cont.namedPortServiceIndex[serviceKey]
1✔
2515
        if !ok {
2✔
2516
                return
1✔
2517
        }
1✔
2518
        for _, port := range epSlice.Ports {
2✔
2519
                if port.Name == nil || port.Port == nil {
1✔
2520
                        continue
×
2521
                }
2522
                if portEntry, ok := (*indexEntry)[*port.Name]; ok && portEntry != nil {
2✔
2523
                        portNum := int(*port.Port)
1✔
2524
                        if old {
2✔
2525
                                delete(portEntry.resolvedPorts, portNum)
1✔
2526
                                cont.log.Debugf("Deleting port: %d from service %s resolved target port. Resolved ports: %v", portNum, serviceKey, portEntry.resolvedPorts)
1✔
2527
                        } else {
2✔
2528
                                portEntry.resolvedPorts[portNum] = true
1✔
2529
                                cont.log.Debugf("Adding port: %d to service %s resolved target port. Resolved ports: %v", portNum, serviceKey, portEntry.resolvedPorts)
1✔
2530
                        }
1✔
2531
                        key := portProto(port.Protocol) + "-num-" + strconv.Itoa(portNum)
1✔
2532
                        targetPortIndexEntry := cont.targetPortIndex[key]
1✔
2533
                        if targetPortIndexEntry == nil && len(portEntry.resolvedPorts) == 1 {
2✔
2534
                                targetPortIndexEntry = &portIndexEntry{
1✔
2535
                                        port: targetPort{
1✔
2536
                                                proto: *port.Protocol,
1✔
2537
                                                ports: make(map[int]bool),
1✔
2538
                                        },
1✔
2539
                                        serviceKeys:       make(map[string]bool),
1✔
2540
                                        networkPolicyKeys: make(map[string]bool),
1✔
2541
                                }
1✔
2542
                                targetPortIndexEntry.port.ports[portNum] = true
1✔
2543
                                cont.targetPortIndex[key] = targetPortIndexEntry
1✔
2544
                        }
1✔
2545
                        if targetPortIndexEntry != nil {
2✔
2546
                                if len(portEntry.resolvedPorts) == 1 {
2✔
2547
                                        targetPortIndexEntry.serviceKeys[serviceKey] = true
1✔
2548
                                } else {
2✔
2549
                                        delete(targetPortIndexEntry.serviceKeys, serviceKey)
1✔
2550
                                }
1✔
2551
                        }
2552
                }
2553
        }
2554
}
2555
func (cont *AciController) endpointSliceAdded(obj interface{}) {
1✔
2556
        endpointslice, ok := obj.(*discovery.EndpointSlice)
1✔
2557
        if !ok {
1✔
2558
                cont.log.Error("error processing Endpointslice object: ", obj)
×
2559
                return
×
2560
        }
×
2561
        servicekey, valid := getServiceKey(endpointslice)
1✔
2562
        if !valid {
1✔
2563
                return
×
2564
        }
×
2565
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
2566
        cont.indexMutex.Lock()
1✔
2567
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
2568
        cont.resolveServiceNamedPortFromEpSlice(endpointslice, servicekey, false)
1✔
2569
        cont.queueIPNetPolUpdates(ips)
1✔
2570
        cont.indexMutex.Unlock()
1✔
2571

1✔
2572
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
2573

1✔
2574
        cont.queueServiceUpdateByKey(servicekey)
1✔
2575
        cont.log.Info("EndPointSlice Object Added: ", servicekey)
1✔
2576
}
2577

2578
func (cont *AciController) endpointSliceDeleted(obj interface{}) {
1✔
2579
        endpointslice, isEndpointslice := obj.(*discovery.EndpointSlice)
1✔
2580
        if !isEndpointslice {
1✔
2581
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2582
                if !ok {
×
2583
                        cont.log.Error("Received unexpected object: ", obj)
×
2584
                        return
×
2585
                }
×
2586
                endpointslice, ok = deletedState.Obj.(*discovery.EndpointSlice)
×
2587
                if !ok {
×
2588
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpointslice object: ", deletedState.Obj)
×
2589
                        return
×
2590
                }
×
2591
        }
2592
        servicekey, valid := getServiceKey(endpointslice)
1✔
2593
        if !valid {
1✔
2594
                return
×
2595
        }
×
2596
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
2597
        cont.indexMutex.Lock()
1✔
2598
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
2599
        cont.resolveServiceNamedPortFromEpSlice(endpointslice, servicekey, true)
1✔
2600
        cont.queueIPNetPolUpdates(ips)
1✔
2601
        cont.indexMutex.Unlock()
1✔
2602
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
2603
        cont.queueServiceUpdateByKey(servicekey)
1✔
2604
}
2605

2606
// Checks if the given service is present in the user configured list of services
2607
// for pbr delay and if present, returns the servie specific delay if configured
2608
func (cont *AciController) svcInAddDelayList(name, ns string) (int, bool) {
×
2609
        for _, svc := range cont.config.ServiceGraphEndpointAddDelay.Services {
×
2610
                if svc.Name == name && svc.Namespace == ns {
×
2611
                        return svc.Delay, true
×
2612
                }
×
2613
        }
2614
        return 0, false
×
2615
}
2616

2617
// Check if the endpointslice update notification has any deletion of enpoint
2618
func (cont *AciController) isDeleteEndpointSlice(oldendpointslice, newendpointslice *discovery.EndpointSlice) bool {
×
2619
        del := false
×
2620

×
2621
        // if any endpoint is removed from endpontslice
×
2622
        if len(newendpointslice.Endpoints) < len(oldendpointslice.Endpoints) {
×
2623
                del = true
×
2624
        }
×
2625

2626
        if !del {
×
2627
                // if any one of the endpoint is in terminating state
×
2628
                for _, endpoint := range newendpointslice.Endpoints {
×
2629
                        if endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating {
×
2630
                                del = true
×
2631
                                break
×
2632
                        }
2633
                }
2634
        }
2635
        if !del {
×
2636
                // if any one of endpoint moved from ready state to not-ready state
×
2637
                for ix := range oldendpointslice.Endpoints {
×
2638
                        oldips := cont.getEndpointSliceEpIps(&oldendpointslice.Endpoints[ix])
×
2639
                        for newIx := range newendpointslice.Endpoints {
×
2640
                                newips := cont.getEndpointSliceEpIps(&newendpointslice.Endpoints[newIx])
×
2641
                                if reflect.DeepEqual(oldips, newips) {
×
2642
                                        if (oldendpointslice.Endpoints[ix].Conditions.Ready != nil && *oldendpointslice.Endpoints[ix].Conditions.Ready) &&
×
2643
                                                (newendpointslice.Endpoints[newIx].Conditions.Ready != nil && !*newendpointslice.Endpoints[newIx].Conditions.Ready) {
×
2644
                                                del = true
×
2645
                                        }
×
2646
                                        break
×
2647
                                }
2648
                        }
2649
                }
2650
        }
2651
        return del
×
2652
}
2653

2654
func (cont *AciController) doendpointSliceUpdatedDelay(oldendpointslice *discovery.EndpointSlice,
2655
        newendpointslice *discovery.EndpointSlice) {
×
2656
        svc, ns, valid := getServiceNameAndNs(newendpointslice)
×
2657
        if !valid {
×
2658
                return
×
2659
        }
×
2660
        svckey, valid := getServiceKey(newendpointslice)
×
2661
        if !valid {
×
2662
                return
×
2663
        }
×
2664
        delay := cont.config.ServiceGraphEndpointAddDelay.Delay
×
2665
        svcDelay, exists := cont.svcInAddDelayList(svc, ns)
×
2666
        if svcDelay > 0 {
×
2667
                delay = svcDelay
×
2668
        }
×
2669
        delayedsvc := exists && delay > 0
×
2670
        if delayedsvc {
×
2671
                cont.log.Debug("Delay of ", delay, " seconds is applicable for svc :", svc, " in ns: ", ns)
×
2672
                var delayedepslice DelayedEpSlice
×
2673
                delayedepslice.OldEpSlice = oldendpointslice
×
2674
                delayedepslice.ServiceKey = svckey
×
2675
                delayedepslice.NewEpSlice = newendpointslice
×
2676
                currentTime := time.Now()
×
2677
                delayedepslice.DelayedTime = currentTime.Add(time.Duration(delay) * time.Second)
×
2678
                cont.indexMutex.Lock()
×
2679
                cont.delayedEpSlices = append(cont.delayedEpSlices, &delayedepslice)
×
2680
                cont.indexMutex.Unlock()
×
2681
        } else {
×
2682
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
2683
        }
×
2684

2685
        if delayedsvc && cont.isDeleteEndpointSlice(oldendpointslice, newendpointslice) {
×
2686
                cont.log.Debug("Proceeding by ignoring delay as the update is due to delete of endpoint")
×
2687
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
2688
        }
×
2689
}
2690

2691
func (cont *AciController) endpointSliceUpdated(oldobj, newobj interface{}) {
1✔
2692
        oldendpointslice, ok := oldobj.(*discovery.EndpointSlice)
1✔
2693
        if !ok {
1✔
2694
                cont.log.Error("error processing Endpointslice object: ", oldobj)
×
2695
                return
×
2696
        }
×
2697
        newendpointslice, ok := newobj.(*discovery.EndpointSlice)
1✔
2698
        if !ok {
1✔
2699
                cont.log.Error("error processing Endpointslice object: ", newobj)
×
2700
                return
×
2701
        }
×
2702
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
2703
                cont.doendpointSliceUpdatedDelay(oldendpointslice, newendpointslice)
×
2704
        } else {
1✔
2705
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
1✔
2706
        }
1✔
2707
}
2708

2709
func (cont *AciController) doendpointSliceUpdated(oldendpointslice *discovery.EndpointSlice,
2710
        newendpointslice *discovery.EndpointSlice) {
1✔
2711
        servicekey, valid := getServiceKey(newendpointslice)
1✔
2712
        if !valid {
1✔
2713
                return
×
2714
        }
×
2715
        oldIps := cont.getEndpointSliceIps(oldendpointslice)
1✔
2716
        newIps := cont.getEndpointSliceIps(newendpointslice)
1✔
2717
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
2718
                cont.indexMutex.Lock()
1✔
2719
                cont.resolveServiceNamedPortFromEpSlice(oldendpointslice, servicekey, true)
1✔
2720
                cont.resolveServiceNamedPortFromEpSlice(newendpointslice, servicekey, false)
1✔
2721
                cont.queueIPNetPolUpdates(oldIps)
1✔
2722
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
2723
                cont.queueIPNetPolUpdates(newIps)
1✔
2724
                cont.indexMutex.Unlock()
1✔
2725
        }
1✔
2726

2727
        if !reflect.DeepEqual(oldendpointslice.Endpoints, newendpointslice.Endpoints) {
2✔
2728
                cont.queueEndpointSliceNetPolUpdates(oldendpointslice)
1✔
2729
                cont.queueEndpointSliceNetPolUpdates(newendpointslice)
1✔
2730
        }
1✔
2731
        cont.log.Debug("EndPointSlice Object Update: ", servicekey)
1✔
2732
        cont.queueServiceUpdateByKey(servicekey)
1✔
2733
}
2734

2735
func (cont *AciController) queueEndpointSliceNetPolUpdates(endpointslice *discovery.EndpointSlice) {
1✔
2736
        for _, endpoint := range endpointslice.Endpoints {
2✔
2737
                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" ||
1✔
2738
                        endpoint.TargetRef.Namespace == "" || endpoint.TargetRef.Name == "" {
2✔
2739
                        continue
1✔
2740
                }
2741
                if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
1✔
2742
                        continue
×
2743
                }
2744
                podkey := endpoint.TargetRef.Namespace + "/" + endpoint.TargetRef.Name
1✔
2745
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
2746
                ps := make(map[string]bool)
1✔
2747
                for _, npkey := range npkeys {
2✔
2748
                        cont.queueNetPolUpdateByKey(npkey)
1✔
2749
                }
1✔
2750
                // Process if the  any matching namedport wildcard policy is present
2751
                // ignore np already processed policies
2752
                cont.queueMatchingNamedNp(ps, podkey)
1✔
2753
        }
2754
}
2755

2756
func getServiceKey(endPointSlice *discovery.EndpointSlice) (string, bool) {
1✔
2757
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
1✔
2758
        if !ok {
1✔
2759
                return "", false
×
2760
        }
×
2761
        return endPointSlice.ObjectMeta.Namespace + "/" + serviceName, true
1✔
2762
}
2763

2764
func getServiceNameAndNs(endPointSlice *discovery.EndpointSlice) (string, string, bool) {
×
2765
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
×
2766
        if !ok {
×
2767
                return "", "", false
×
2768
        }
×
2769
        return serviceName, endPointSlice.ObjectMeta.Namespace, true
×
2770
}
2771

2772
// can be called with index lock
2773
func (sep *serviceEndpoint) UpdateServicesForNode(nodename string) {
1✔
2774
        cont := sep.cont
1✔
2775
        cache.ListAll(cont.endpointsIndexer, labels.Everything(),
1✔
2776
                func(endpointsobj interface{}) {
2✔
2777
                        endpoints := endpointsobj.(*v1.Endpoints)
1✔
2778
                        for _, subset := range endpoints.Subsets {
2✔
2779
                                for _, addr := range subset.Addresses {
2✔
2780
                                        if addr.NodeName != nil && *addr.NodeName == nodename {
2✔
2781
                                                servicekey, err :=
1✔
2782
                                                        cache.MetaNamespaceKeyFunc(endpointsobj.(*v1.Endpoints))
1✔
2783
                                                if err != nil {
1✔
2784
                                                        cont.log.Error("Could not create endpoints key: ", err)
×
2785
                                                        return
×
2786
                                                }
×
2787
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
2788
                                                return
1✔
2789
                                        }
2790
                                }
2791
                        }
2792
                })
2793
}
2794

2795
func (seps *serviceEndpointSlice) UpdateServicesForNode(nodename string) {
1✔
2796
        // 1. List all the endpointslice and check for matching nodename
1✔
2797
        // 2. if it matches trigger the Service update and mark it visited
1✔
2798
        cont := seps.cont
1✔
2799
        visited := make(map[string]bool)
1✔
2800
        cache.ListAll(cont.endpointSliceIndexer, labels.Everything(),
1✔
2801
                func(endpointSliceobj interface{}) {
2✔
2802
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2803
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2804
                                if endpoint.NodeName != nil && *endpoint.NodeName == nodename {
2✔
2805
                                        servicekey, valid := getServiceKey(endpointSlices)
1✔
2806
                                        if !valid {
1✔
2807
                                                return
×
2808
                                        }
×
2809
                                        if _, ok := visited[servicekey]; !ok {
2✔
2810
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
2811
                                                visited[servicekey] = true
1✔
2812
                                                return
1✔
2813
                                        }
1✔
2814
                                }
2815
                        }
2816
                })
2817
}
2818
func (cont *AciController) setNodeMap(nodeMap map[string]*metadata.ServiceEndpoint, nodeName string) {
1✔
2819
        nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
2820
        if !ok {
2✔
2821
                return
1✔
2822
        }
1✔
2823
        _, ok = cont.fabricPathForNode(nodeName)
1✔
2824
        if !ok {
2✔
2825
                return
1✔
2826
        }
1✔
2827
        nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
2828
}
2829

2830
// 2 cases when epslices corresponding to given service is presnt in delayedEpSlices:
2831
//  1. endpoint not present in delayedEpSlices of the service
2832
//  2. endpoint present in delayedEpSlices of the service but in not ready state
2833
//
2834
// indexMutex lock must be acquired before calling the function
2835
func (cont *AciController) isDelayedEndpoint(endpoint *discovery.Endpoint, svckey string) bool {
×
2836
        delayed := false
×
2837
        endpointips := cont.getEndpointSliceEpIps(endpoint)
×
2838
        for _, delayedepslices := range cont.delayedEpSlices {
×
2839
                if delayedepslices.ServiceKey == svckey {
×
2840
                        var found bool
×
2841
                        epslice := delayedepslices.OldEpSlice
×
2842
                        for ix := range epslice.Endpoints {
×
2843
                                epips := cont.getEndpointSliceEpIps(&epslice.Endpoints[ix])
×
2844
                                if reflect.DeepEqual(endpointips, epips) {
×
2845
                                        // case 2
×
2846
                                        if epslice.Endpoints[ix].Conditions.Ready != nil && !*epslice.Endpoints[ix].Conditions.Ready {
×
2847
                                                delayed = true
×
2848
                                        }
×
2849
                                        found = true
×
2850
                                }
2851
                        }
2852
                        // case 1
2853
                        if !found {
×
2854
                                delayed = true
×
2855
                        }
×
2856
                }
2857
        }
2858
        return delayed
×
2859
}
2860

2861
// set nodemap only if endoint is ready and not in delayedEpSlices
2862
func (cont *AciController) setNodeMapDelay(nodeMap map[string]*metadata.ServiceEndpoint,
2863
        endpoint *discovery.Endpoint, service *v1.Service) {
×
2864
        svckey, err := cache.MetaNamespaceKeyFunc(service)
×
2865
        if err != nil {
×
2866
                cont.log.Error("Could not create service key: ", err)
×
2867
                return
×
2868
        }
×
2869
        if cont.config.NoWaitForServiceEpReadiness ||
×
2870
                (endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready) {
×
2871
                if endpoint.NodeName != nil && *endpoint.NodeName != "" {
×
2872
                        // donot setNodeMap for endpoint if:
×
2873
                        //   endpoint is newly added
×
2874
                        //   endpoint status changed from not ready to ready
×
2875
                        if !cont.isDelayedEndpoint(endpoint, svckey) {
×
2876
                                cont.setNodeMap(nodeMap, *endpoint.NodeName)
×
2877
                        }
×
2878
                }
2879
        }
2880
}
2881

2882
func (sep *serviceEndpoint) GetnodesMetadata(key string,
2883
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
2884
        cont := sep.cont
1✔
2885
        endpointsobj, exists, err := cont.endpointsIndexer.GetByKey(key)
1✔
2886
        if err != nil {
1✔
2887
                cont.log.Error("Could not lookup endpoints for " +
×
2888
                        key + ": " + err.Error())
×
2889
        }
×
2890
        if exists && endpointsobj != nil {
2✔
2891
                endpoints := endpointsobj.(*v1.Endpoints)
1✔
2892
                for _, subset := range endpoints.Subsets {
2✔
2893
                        for _, addr := range subset.Addresses {
2✔
2894
                                if addr.NodeName == nil {
2✔
2895
                                        continue
1✔
2896
                                }
2897
                                cont.setNodeMap(nodeMap, *addr.NodeName)
1✔
2898
                        }
2899
                }
2900
        }
2901
        cont.log.Info("NodeMap: ", nodeMap)
1✔
2902
}
2903

2904
func (seps *serviceEndpointSlice) GetnodesMetadata(key string,
2905
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
2906
        cont := seps.cont
1✔
2907
        // 1. Get all the Endpoint slices matching the label service-name
1✔
2908
        // 2. update the node map matching with endpoints nodes name
1✔
2909
        label := map[string]string{discovery.LabelServiceName: service.ObjectMeta.Name}
1✔
2910
        selector := labels.SelectorFromSet(label)
1✔
2911
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2912
                func(endpointSliceobj interface{}) {
2✔
2913
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2914
                        for ix := range endpointSlices.Endpoints {
2✔
2915
                                if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
2916
                                        cont.setNodeMapDelay(nodeMap, &endpointSlices.Endpoints[ix], service)
×
2917
                                } else if cont.config.NoWaitForServiceEpReadiness ||
1✔
2918
                                        (endpointSlices.Endpoints[ix].Conditions.Ready != nil && *endpointSlices.Endpoints[ix].Conditions.Ready) {
2✔
2919
                                        if endpointSlices.Endpoints[ix].NodeName != nil && *endpointSlices.Endpoints[ix].NodeName != "" {
2✔
2920
                                                cont.setNodeMap(nodeMap, *endpointSlices.Endpoints[ix].NodeName)
1✔
2921
                                        }
1✔
2922
                                }
2923
                        }
2924
                })
2925
        cont.log.Debug("NodeMap: ", nodeMap)
1✔
2926
}
2927

2928
func (sep *serviceEndpoint) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2929
        cont := sep.cont
1✔
2930
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
2931
        if err != nil {
1✔
2932
                serviceLogger(cont.log, service).
×
2933
                        Error("Could not create service key: ", err)
×
2934
                return false
×
2935
        }
×
2936
        endpointsobj, _, err := cont.endpointsIndexer.GetByKey(key)
1✔
2937
        if err != nil {
1✔
2938
                cont.log.Error("Could not lookup endpoints for " +
×
2939
                        key + ": " + err.Error())
×
2940
                return false
×
2941
        }
×
2942
        if endpointsobj != nil {
2✔
2943
                for _, subset := range endpointsobj.(*v1.Endpoints).Subsets {
2✔
2944
                        for _, addr := range subset.Addresses {
2✔
2945
                                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" {
1✔
2946
                                        continue
×
2947
                                }
2948
                                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(),
1✔
2949
                                        addr.TargetRef.Name))
1✔
2950
                        }
2951
                }
2952
        }
2953
        return true
1✔
2954
}
2955

2956
func (seps *serviceEndpointSlice) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2957
        cont := seps.cont
1✔
2958
        label := map[string]string{discovery.LabelServiceName: service.ObjectMeta.Name}
1✔
2959
        selector := labels.SelectorFromSet(label)
1✔
2960
        epcount := 0
1✔
2961
        childs := make(map[string]struct{})
1✔
2962
        var exists = struct{}{}
1✔
2963
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2964
                func(endpointSliceobj interface{}) {
2✔
2965
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2966
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2967
                                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" {
1✔
2968
                                        continue
×
2969
                                }
2970
                                epcount++
1✔
2971
                                childs[endpoint.TargetRef.Name] = exists
1✔
2972
                                cont.log.Debug("EndPoint added: ", endpoint.TargetRef.Name)
1✔
2973
                        }
2974
                })
2975
        for child := range childs {
2✔
2976
                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(), child))
1✔
2977
        }
1✔
2978
        return epcount != 0
1✔
2979
}
2980

2981
func getProtocolStr(proto v1.Protocol) string {
1✔
2982
        var protostring string
1✔
2983
        switch proto {
1✔
2984
        case v1.ProtocolUDP:
1✔
2985
                protostring = "udp"
1✔
2986
        case v1.ProtocolTCP:
1✔
2987
                protostring = "tcp"
1✔
2988
        case v1.ProtocolSCTP:
×
2989
                protostring = "sctp"
×
2990
        default:
×
2991
                protostring = "tcp"
×
2992
        }
2993
        return protostring
1✔
2994
}
2995

2996
func (cont *AciController) removeIpFromIngressIPList(ingressIps *[]net.IP, ip net.IP) {
×
2997
        cont.returnServiceIps([]net.IP{ip})
×
2998
        index := -1
×
2999
        for i, v := range *ingressIps {
×
3000
                if v.Equal(ip) {
×
3001
                        index = i
×
3002
                        break
×
3003
                }
3004
        }
3005
        if index == -1 {
×
3006
                return
×
3007
        }
×
3008
        *ingressIps = append((*ingressIps)[:index], (*ingressIps)[index+1:]...)
×
3009
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc