• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 11453

08 Dec 2025 09:35AM UTC coverage: 63.152% (+0.02%) from 63.133%
11453

push

travis-pro

maheshkurund
Remove stale opflexOdevs

Sometimes after TOR reboots, its observed that we don't receive
opflexOdev delete and thus its entry remains in controller's cache.
This stale cache affects the way we get aci-annotation for the node.
As part of this change, we remove stale odev and that is determined
based on its modTs value.

37 of 41 new or added lines in 1 file covered. (90.24%)

7 existing lines in 2 files now uncovered.

13406 of 21228 relevant lines covered (63.15%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.19
/pkg/controller/services.go
1
// Copyright 2016 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "errors"
19
        "fmt"
20
        "net"
21
        "reflect"
22
        "regexp"
23
        "sort"
24
        "strconv"
25
        "strings"
26
        "time"
27

28
        "github.com/noironetworks/aci-containers/pkg/apicapi"
29
        "github.com/noironetworks/aci-containers/pkg/metadata"
30
        "github.com/noironetworks/aci-containers/pkg/util"
31
        "github.com/sirupsen/logrus"
32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35
        "k8s.io/apimachinery/pkg/fields"
36
        "k8s.io/apimachinery/pkg/labels"
37
        "k8s.io/client-go/kubernetes"
38
        "k8s.io/client-go/tools/cache"
39
)
40

41
// Default service contract scope value
42
const DefaultServiceContractScope = "context"
43

44
// Default service ext subnet scope - enable shared security
45
const DefaultServiceExtSubNetShared = false
46

47
func (cont *AciController) initEndpointsInformerFromClient(
48
        kubeClient kubernetes.Interface) {
×
49
        cont.initEndpointsInformerBase(
×
50
                cache.NewListWatchFromClient(
×
51
                        kubeClient.CoreV1().RESTClient(), "endpoints",
×
52
                        metav1.NamespaceAll, fields.Everything()))
×
53
}
×
54

55
func (cont *AciController) initEndpointSliceInformerFromClient(
56
        kubeClient kubernetes.Interface) {
×
57
        cont.initEndpointSliceInformerBase(
×
58
                cache.NewListWatchFromClient(
×
59
                        kubeClient.DiscoveryV1().RESTClient(), "endpointslices",
×
60
                        metav1.NamespaceAll, fields.Everything()))
×
61
}
×
62

63
func (cont *AciController) initEndpointSliceInformerBase(listWatch *cache.ListWatch) {
1✔
64
        cont.endpointSliceIndexer, cont.endpointSliceInformer = cache.NewIndexerInformer(
1✔
65
                listWatch, &discovery.EndpointSlice{}, 0,
1✔
66
                cache.ResourceEventHandlerFuncs{
1✔
67
                        AddFunc: func(obj interface{}) {
2✔
68
                                cont.endpointSliceAdded(obj)
1✔
69
                        },
1✔
70
                        UpdateFunc: func(oldobj interface{}, newobj interface{}) {
1✔
71
                                cont.endpointSliceUpdated(oldobj, newobj)
1✔
72
                        },
1✔
73
                        DeleteFunc: func(obj interface{}) {
1✔
74
                                cont.endpointSliceDeleted(obj)
1✔
75
                        },
1✔
76
                },
77
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
78
        )
79
}
80

81
func (cont *AciController) initEndpointsInformerBase(listWatch *cache.ListWatch) {
1✔
82
        cont.endpointsIndexer, cont.endpointsInformer = cache.NewIndexerInformer(
1✔
83
                listWatch, &v1.Endpoints{}, 0,
1✔
84
                cache.ResourceEventHandlerFuncs{
1✔
85
                        AddFunc: func(obj interface{}) {
2✔
86
                                cont.endpointsAdded(obj)
1✔
87
                        },
1✔
88
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
89
                                cont.endpointsUpdated(old, new)
1✔
90
                        },
1✔
91
                        DeleteFunc: func(obj interface{}) {
1✔
92
                                cont.endpointsDeleted(obj)
1✔
93
                        },
1✔
94
                },
95
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
96
        )
97
}
98

99
func (cont *AciController) initServiceInformerFromClient(
100
        kubeClient *kubernetes.Clientset) {
×
101
        cont.initServiceInformerBase(
×
102
                cache.NewListWatchFromClient(
×
103
                        kubeClient.CoreV1().RESTClient(), "services",
×
104
                        metav1.NamespaceAll, fields.Everything()))
×
105
}
×
106

107
func (cont *AciController) initServiceInformerBase(listWatch *cache.ListWatch) {
1✔
108
        cont.serviceIndexer, cont.serviceInformer = cache.NewIndexerInformer(
1✔
109
                listWatch, &v1.Service{}, 0,
1✔
110
                cache.ResourceEventHandlerFuncs{
1✔
111
                        AddFunc: func(obj interface{}) {
2✔
112
                                cont.serviceAdded(obj)
1✔
113
                        },
1✔
114
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
115
                                cont.serviceUpdated(old, new)
1✔
116
                        },
1✔
117
                        DeleteFunc: func(obj interface{}) {
×
118
                                cont.serviceDeleted(obj)
×
119
                        },
×
120
                },
121
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
122
        )
123
}
124

125
func serviceLogger(log *logrus.Logger, as *v1.Service) *logrus.Entry {
1✔
126
        return log.WithFields(logrus.Fields{
1✔
127
                "namespace": as.ObjectMeta.Namespace,
1✔
128
                "name":      as.ObjectMeta.Name,
1✔
129
                "type":      as.Spec.Type,
1✔
130
        })
1✔
131
}
1✔
132

133
func (cont *AciController) queueIPNetPolUpdates(ips map[string]bool) {
1✔
134
        for ipStr := range ips {
2✔
135
                ip := net.ParseIP(ipStr)
1✔
136
                if ip == nil {
1✔
137
                        continue
×
138
                }
139
                entries, err := cont.netPolSubnetIndex.ContainingNetworks(ip)
1✔
140
                if err != nil {
1✔
141
                        cont.log.Error("Corrupted network policy IP index, err: ", err)
×
142
                        return
×
143
                }
×
144
                for _, entry := range entries {
2✔
145
                        for npkey := range entry.(*ipIndexEntry).keys {
2✔
146
                                cont.queueNetPolUpdateByKey(npkey)
1✔
147
                        }
1✔
148
                }
149
        }
150
}
151

152
func (cont *AciController) queuePortNetPolUpdates(ports map[string]targetPort) {
1✔
153
        for portkey := range ports {
2✔
154
                entry := cont.targetPortIndex[portkey]
1✔
155
                if entry == nil {
2✔
156
                        continue
1✔
157
                }
158
                for npkey := range entry.networkPolicyKeys {
2✔
159
                        cont.queueNetPolUpdateByKey(npkey)
1✔
160
                }
1✔
161
        }
162
}
163

164
func (cont *AciController) queueNetPolForEpAddrs(addrs []v1.EndpointAddress) {
1✔
165
        for _, addr := range addrs {
2✔
166
                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" ||
1✔
167
                        addr.TargetRef.Namespace == "" || addr.TargetRef.Name == "" {
2✔
168
                        continue
1✔
169
                }
170
                podkey := addr.TargetRef.Namespace + "/" + addr.TargetRef.Name
1✔
171
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
172
                ps := make(map[string]bool)
1✔
173
                for _, npkey := range npkeys {
2✔
174
                        cont.queueNetPolUpdateByKey(npkey)
1✔
175
                        ps[npkey] = true
1✔
176
                }
1✔
177
                // Process if the  any matching namedport wildcard policy is present
178
                // ignore np already processed policies
179
                cont.queueMatchingNamedNp(ps, podkey)
1✔
180
        }
181
}
182

183
func (cont *AciController) queueMatchingNamedNp(served map[string]bool, podkey string) {
1✔
184
        cont.indexMutex.Lock()
1✔
185
        for npkey := range cont.nmPortNp {
2✔
186
                if _, ok := served[npkey]; !ok {
2✔
187
                        if cont.checkPodNmpMatchesNp(npkey, podkey) {
2✔
188
                                cont.queueNetPolUpdateByKey(npkey)
1✔
189
                        }
1✔
190
                }
191
        }
192
        cont.indexMutex.Unlock()
1✔
193
}
194
func (cont *AciController) queueEndpointsNetPolUpdates(endpoints *v1.Endpoints) {
1✔
195
        for _, subset := range endpoints.Subsets {
2✔
196
                cont.queueNetPolForEpAddrs(subset.Addresses)
1✔
197
                cont.queueNetPolForEpAddrs(subset.NotReadyAddresses)
1✔
198
        }
1✔
199
}
200

201
func (cont *AciController) returnServiceIps(ips []net.IP) {
1✔
202
        for _, ip := range ips {
2✔
203
                if ip.To4() != nil {
2✔
204
                        cont.serviceIps.DeallocateIp(ip)
1✔
205
                } else if ip.To16() != nil {
3✔
206
                        cont.serviceIps.DeallocateIp(ip)
1✔
207
                }
1✔
208
        }
209
}
210

211
func returnIps(pool *netIps, ips []net.IP) {
1✔
212
        for _, ip := range ips {
2✔
213
                if ip.To4() != nil {
2✔
214
                        pool.V4.AddIp(ip)
1✔
215
                } else if ip.To16() != nil {
3✔
216
                        pool.V6.AddIp(ip)
1✔
217
                }
1✔
218
        }
219
}
220

221
func (cont *AciController) staticMonPolName() string {
1✔
222
        return cont.aciNameForKey("monPol", cont.env.ServiceBd())
1✔
223
}
1✔
224

225
func (cont *AciController) staticMonPolDn() string {
1✔
226
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
227
                return fmt.Sprintf("uni/tn-%s/ipslaMonitoringPol-%s",
1✔
228
                        cont.config.AciVrfTenant, cont.staticMonPolName())
1✔
229
        }
1✔
230
        return ""
×
231
}
232

233
func (cont *AciController) staticServiceObjs() apicapi.ApicSlice {
1✔
234
        var serviceObjs apicapi.ApicSlice
1✔
235

1✔
236
        // Service bridge domain
1✔
237
        bdName := cont.aciNameForKey("bd", cont.env.ServiceBd())
1✔
238
        bd := apicapi.NewFvBD(cont.config.AciVrfTenant, bdName)
1✔
239
        if apicapi.ApicVersion >= "6.0(4c)" {
1✔
240
                bd.SetAttr("serviceBdRoutingDisable", "yes")
×
241
        }
×
242
        bd.SetAttr("arpFlood", "yes")
1✔
243
        bd.SetAttr("ipLearning", "no")
1✔
244
        bd.SetAttr("unkMacUcastAct", cont.config.UnknownMacUnicastAction)
1✔
245
        bdToOut := apicapi.NewRsBdToOut(bd.GetDn(), cont.config.AciL3Out)
1✔
246
        bd.AddChild(bdToOut)
1✔
247
        bdToVrf := apicapi.NewRsCtx(bd.GetDn(), cont.config.AciVrf)
1✔
248
        bd.AddChild(bdToVrf)
1✔
249

1✔
250
        bdn := bd.GetDn()
1✔
251
        for _, cidr := range cont.config.NodeServiceSubnets {
2✔
252
                sn := apicapi.NewFvSubnet(bdn, cidr)
1✔
253
                bd.AddChild(sn)
1✔
254
        }
1✔
255
        serviceObjs = append(serviceObjs, bd)
1✔
256

1✔
257
        // Service IP SLA monitoring policy
1✔
258
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
259
                monPol := apicapi.NewFvIPSLAMonitoringPol(cont.config.AciVrfTenant,
1✔
260
                        cont.staticMonPolName())
1✔
261
                monPol.SetAttr("slaFrequency",
1✔
262
                        strconv.Itoa(cont.config.AciServiceMonitorInterval))
1✔
263
                serviceObjs = append(serviceObjs, monPol)
1✔
264
        }
1✔
265

266
        return serviceObjs
1✔
267
}
268

269
func (cont *AciController) initStaticServiceObjs() {
1✔
270
        cont.apicConn.WriteApicObjects(cont.config.AciPrefix+"_service_static",
1✔
271
                cont.staticServiceObjs())
1✔
272
}
1✔
273

274
func (cont *AciController) updateServicesForNode(nodename string) {
1✔
275
        cont.serviceEndPoints.UpdateServicesForNode(nodename)
1✔
276
}
1✔
277

278
// must have index lock
279
func (cont *AciController) getActiveFabricPathDn(node string) string {
×
280
        var fabricPathDn string
×
281
        sz := len(cont.nodeOpflexDevice[node])
×
282
        for i := range cont.nodeOpflexDevice[node] {
×
283
                device := cont.nodeOpflexDevice[node][sz-1-i]
×
284
                if device.GetAttrStr("state") == "connected" {
×
285
                        fabricPathDn = device.GetAttrStr("fabricPathDn")
×
286
                        break
×
287
                }
288
        }
289
        return fabricPathDn
×
290
}
291

292
func (cont *AciController) getOpflexOdevCount(node string) (int, string) {
×
293
        fabricPathDnCount := make(map[string]int)
×
294
        var maxCount int
×
295
        var fabricPathDn string
×
296
        for _, device := range cont.nodeOpflexDevice[node] {
×
297
                fabricpathdn := device.GetAttrStr("fabricPathDn")
×
298
                count, ok := fabricPathDnCount[fabricpathdn]
×
299
                if ok {
×
300
                        fabricPathDnCount[fabricpathdn] = count + 1
×
301
                } else {
×
302
                        fabricPathDnCount[fabricpathdn] = 1
×
303
                }
×
304
                if fabricPathDnCount[fabricpathdn] >= maxCount {
×
305
                        maxCount = fabricPathDnCount[fabricpathdn]
×
306
                        fabricPathDn = fabricpathdn
×
307
                }
×
308
        }
309
        return maxCount, fabricPathDn
×
310
}
311

312
func deleteDevicesFromList(delDevices, devices apicapi.ApicSlice) apicapi.ApicSlice {
×
313
        var newDevices apicapi.ApicSlice
×
314
        for _, device := range devices {
×
315
                found := false
×
316
                for _, delDev := range delDevices {
×
317
                        if reflect.DeepEqual(delDev, device) {
×
318
                                found = true
×
319
                        }
×
320
                }
321
                if !found {
×
322
                        newDevices = append(newDevices, device)
×
323
                }
×
324
        }
325
        return newDevices
×
326
}
327

328
func (cont *AciController) getAciPodSubnet(pod string) (string, error) {
×
329
        podslice := strings.Split(pod, "-")
×
330
        if len(podslice) < 2 {
×
331
                return "", fmt.Errorf("Failed to get podid from pod")
×
332
        }
×
333
        podid := podslice[1]
×
334
        var subnet string
×
335
        args := []string{
×
336
                "query-target=self",
×
337
        }
×
338
        url := fmt.Sprintf("/api/node/mo/uni/controller/setuppol/setupp-%s.json?%s", podid, strings.Join(args, "&"))
×
339
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
340
        if err != nil {
×
341
                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
342
                return subnet, err
×
343
        }
×
344
        for _, obj := range apicresp.Imdata {
×
345
                for _, body := range obj {
×
346
                        tepPool, ok := body.Attributes["tepPool"].(string)
×
347
                        if ok {
×
348
                                subnet = tepPool
×
349
                                break
×
350
                        }
351
                }
352
        }
353
        return subnet, nil
×
354
}
355

356
func (cont *AciController) isSingleOpflexOdev(fabricPathDn string) (bool, error) {
×
357
        pathSlice := strings.Split(fabricPathDn, "/")
×
358
        if len(pathSlice) > 2 {
×
359
                path := pathSlice[2]
×
360
                // topology/<aci_pod_name>/paths-<id>/pathep-<iface> - fabricPathDn if
×
361
                // host is connnected via single link
×
362
                // topology/<aci_pod_name>/protpaths-<id_1>-<id_2>/pathep-<iface> - fabricPathDn if
×
363
                // host is connected to vpc pair
×
364
                if strings.Contains(path, "protpaths-") {
×
365
                        args := []string{
×
366
                                "query-target=self",
×
367
                        }
×
368
                        url := fmt.Sprintf("/api/node/class/vpcDom.json?%s", strings.Join(args, "&"))
×
369
                        apicresp, err := cont.apicConn.GetApicResponse(url)
×
370
                        if err != nil {
×
371
                                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
372
                                return false, err
×
373
                        }
×
374
                        nodeIdSlice := strings.Split(path, "-")
×
375
                        if len(nodeIdSlice) != 3 {
×
376
                                cont.log.Error("Invalid fabricPathDn ", fabricPathDn)
×
377
                                return false, fmt.Errorf("Invalid path in fabricPathDn %s", fabricPathDn)
×
378
                        }
×
379
                        // As host is connected to vpc pair, check if the status of any of the vpcDom is down
380
                        upCount := 0
×
381
                        for i, nodeId := range nodeIdSlice {
×
382
                                instDn := fmt.Sprintf("topology/%s/node-%s/sys/vpc/inst", pathSlice[1], nodeId)
×
383
                                if i == 0 {
×
384
                                        continue
×
385
                                }
386
                                for _, obj := range apicresp.Imdata {
×
387
                                        for _, body := range obj {
×
388
                                                dn, ok := body.Attributes["dn"].(string)
×
389
                                                if ok && strings.Contains(dn, instDn) {
×
390
                                                        peerSt, ok := body.Attributes["peerSt"].(string)
×
391
                                                        if ok && peerSt == "up" {
×
392
                                                                upCount++
×
393
                                                        }
×
394
                                                }
395
                                        }
396
                                }
397
                        }
398
                        if upCount < 2 {
×
399
                                return true, nil
×
400
                        }
×
401
                        return false, nil
×
402
                } else {
×
403
                        return true, nil
×
404
                }
×
405
        }
406
        return false, fmt.Errorf("Invalid fabricPathDn %s ", fabricPathDn)
×
407
}
408

409
func (cont *AciController) createAciPodAnnotation(node string) (aciPodAnnot, error) {
×
410
        odevCount, fabricPathDn := cont.getOpflexOdevCount(node)
×
411
        nodeAciPodAnnot := cont.nodeACIPod[node]
×
412
        isSingleOdev := false
×
413
        if odevCount == 1 {
×
414
                var err error
×
415
                isSingleOdev, err = cont.isSingleOpflexOdev(fabricPathDn)
×
416
                if err != nil {
×
417
                        cont.log.Error(err)
×
418
                        return nodeAciPodAnnot, err
×
419
                }
×
420
        }
421
        if (odevCount == 0) ||
×
422
                (odevCount == 1 && !isSingleOdev) {
×
423
                if nodeAciPodAnnot.aciPod != "none" {
×
424
                        if nodeAciPodAnnot.disconnectTime.IsZero() {
×
425
                                nodeAciPodAnnot.disconnectTime = time.Now()
×
426
                        } else {
×
427
                                currentTime := time.Now()
×
428
                                diff := currentTime.Sub(nodeAciPodAnnot.disconnectTime)
×
429
                                if diff.Seconds() > float64(cont.config.OpflexDeviceReconnectWaitTimeout) {
×
430
                                        nodeAciPodAnnot.aciPod = "none"
×
431
                                        nodeAciPodAnnot.disconnectTime = time.Time{}
×
432
                                }
×
433
                        }
434
                }
435
                return nodeAciPodAnnot, nil
×
436
        } else if (odevCount == 2) ||
×
437
                (odevCount == 1 && isSingleOdev) {
×
438
                // when there is already a connected opflex device,
×
439
                // fabricPathDn will have latest pod iformation
×
440
                // and annotation will be in the form pod-<podid>-<subnet of pod>
×
441
                nodeAciPodAnnot.disconnectTime = time.Time{}
×
442
                nodeAciPod := nodeAciPodAnnot.aciPod
×
443
                pathSlice := strings.Split(fabricPathDn, "/")
×
444
                if len(pathSlice) > 1 {
×
445
                        pod := pathSlice[1]
×
446

×
447
                        // when there is difference in pod info avaliable from fabricPathDn
×
448
                        // and what we have in cache, update info in cache and change annotation on node
×
449
                        if !strings.Contains(nodeAciPod, pod) {
×
450
                                subnet, err := cont.getAciPodSubnet(pod)
×
451
                                if err != nil {
×
452
                                        cont.log.Error("Failed to get subnet of aci pod ", err.Error())
×
453
                                        return nodeAciPodAnnot, err
×
454
                                } else {
×
455
                                        nodeAciPodAnnot.aciPod = pod + "-" + subnet
×
456
                                        return nodeAciPodAnnot, nil
×
457
                                }
×
458
                        } else {
×
459
                                return nodeAciPodAnnot, nil
×
460
                        }
×
461
                } else {
×
462
                        cont.log.Error("Invalid fabricPathDn of opflexOdev of node ", node)
×
463
                        return nodeAciPodAnnot, fmt.Errorf("Invalid fabricPathDn of opflexOdev")
×
464
                }
×
465
        }
466
        return nodeAciPodAnnot, fmt.Errorf("Failed to get annotation for node %s", node)
×
467
}
468

469
func (cont *AciController) createNodeAciPodAnnotation(node string) (aciPodAnnot, error) {
×
470
        odevCount, fabricPathDn := cont.getOpflexOdevCount(node)
×
471
        nodeAciPodAnnot := cont.nodeACIPodAnnot[node]
×
472
        isSingleOdev := false
×
473
        if odevCount == 1 {
×
474
                var err error
×
475
                isSingleOdev, err = cont.isSingleOpflexOdev(fabricPathDn)
×
476
                if err != nil {
×
477
                        cont.log.Error(err)
×
478
                        return nodeAciPodAnnot, err
×
479
                }
×
480
        }
481
        if (odevCount == 0) ||
×
482
                (odevCount == 1 && !isSingleOdev) {
×
483
                if nodeAciPodAnnot.aciPod != "none" {
×
484
                        nodeAciPodAnnot.aciPod = "none"
×
485
                }
×
486
                return nodeAciPodAnnot, nil
×
487
        } else if (odevCount == 2) ||
×
488
                (odevCount == 1 && isSingleOdev) {
×
489
                pathSlice := strings.Split(fabricPathDn, "/")
×
490
                if len(pathSlice) > 1 {
×
491

×
492
                        nodeAciPodAnnot.aciPod = pathSlice[1]
×
493
                        return nodeAciPodAnnot, nil
×
494
                } else {
×
495
                        cont.log.Error("Invalid fabricPathDn of opflexOdev of node ", node)
×
496
                        return nodeAciPodAnnot, fmt.Errorf("Invalid fabricPathDn of opflexOdev")
×
497
                }
×
498
        }
499
        return nodeAciPodAnnot, fmt.Errorf("Failed to get annotation for node %s", node)
×
500
}
501

502
func (cont *AciController) checkChangeOfOpflexOdevAciPod() {
×
503
        var nodeAnnotationUpdates []string
×
504
        cont.apicConn.SyncMutex.Lock()
×
505
        syncDone := cont.apicConn.SyncDone
×
506
        cont.apicConn.SyncMutex.Unlock()
×
507

×
508
        if !syncDone {
×
509
                return
×
510
        }
×
511

512
        cont.indexMutex.Lock()
×
513
        for node := range cont.nodeACIPodAnnot {
×
514
                annot, err := cont.createNodeAciPodAnnotation(node)
×
515
                if err != nil {
×
516
                        if strings.Contains(fmt.Sprint(err), "Failed to get annotation") {
×
517
                                now := time.Now()
×
518
                                if annot.lastErrorTime.IsZero() || now.Sub(annot.lastErrorTime).Seconds() >= 60 {
×
519
                                        annot.lastErrorTime = now
×
520
                                        cont.nodeACIPodAnnot[node] = annot
×
521
                                        cont.log.Error(err.Error())
×
522
                                }
×
523
                        } else {
×
524
                                cont.log.Error(err.Error())
×
525
                        }
×
526
                } else {
×
527
                        if annot != cont.nodeACIPodAnnot[node] {
×
528
                                cont.nodeACIPodAnnot[node] = annot
×
529
                                nodeAnnotationUpdates = append(nodeAnnotationUpdates, node)
×
530
                        }
×
531
                }
532
        }
533
        cont.indexMutex.Unlock()
×
534
        if len(nodeAnnotationUpdates) > 0 {
×
535
                for _, updatednode := range nodeAnnotationUpdates {
×
536
                        go cont.env.NodeAnnotationChanged(updatednode)
×
537
                }
×
538
        }
539
}
540

541
func (cont *AciController) checkChangeOfOdevAciPod() {
×
542
        var nodeAnnotationUpdates []string
×
543
        cont.apicConn.SyncMutex.Lock()
×
544
        syncDone := cont.apicConn.SyncDone
×
545
        cont.apicConn.SyncMutex.Unlock()
×
546

×
547
        if !syncDone {
×
548
                return
×
549
        }
×
550

551
        cont.indexMutex.Lock()
×
552
        for node := range cont.nodeACIPod {
×
553
                annot, err := cont.createAciPodAnnotation(node)
×
554
                if err != nil {
×
555
                        if strings.Contains(fmt.Sprint(err), "Failed to get annotation") {
×
556
                                now := time.Now()
×
557
                                if annot.lastErrorTime.IsZero() || now.Sub(annot.lastErrorTime).Seconds() >= 60 {
×
558
                                        annot.lastErrorTime = now
×
559
                                        cont.nodeACIPod[node] = annot
×
560
                                        cont.log.Error(err.Error())
×
561
                                }
×
562
                        } else {
×
563
                                cont.log.Error(err.Error())
×
564
                        }
×
565
                } else {
×
566
                        if annot != cont.nodeACIPod[node] {
×
567
                                cont.nodeACIPod[node] = annot
×
568
                                nodeAnnotationUpdates = append(nodeAnnotationUpdates, node)
×
569
                        }
×
570
                }
571
        }
572
        cont.indexMutex.Unlock()
×
573
        if len(nodeAnnotationUpdates) > 0 {
×
574
                for _, updatednode := range nodeAnnotationUpdates {
×
575
                        go cont.env.NodeAnnotationChanged(updatednode)
×
576
                }
×
577
        }
578
}
579

580
func (cont *AciController) getStaleOpflexODevs(devices apicapi.ApicSlice) apicapi.ApicSlice {
1✔
581
        var staleDevices apicapi.ApicSlice
1✔
582
        deviceMap := make(map[string]apicapi.ApicObject)
1✔
583

1✔
584
        // Group devices by DN prefix and track the latest one by modTs
1✔
585
        for _, device := range devices {
2✔
586
                dn := device.GetAttrStr("dn")
1✔
587
                dnPrefix := extractDNPrefix(dn)
1✔
588

1✔
589
                if existing, exists := deviceMap[dnPrefix]; exists {
2✔
590
                        // Mark device as stale if an existing device with the same
1✔
591
                        // DN prefix has a newer modTs.
1✔
592
                        if isNewer(device, existing) {
2✔
593
                                staleDevices = append(staleDevices, existing)
1✔
594
                                deviceMap[dnPrefix] = device
1✔
595
                        } else {
2✔
596
                                staleDevices = append(staleDevices, device)
1✔
597
                        }
1✔
598
                } else {
1✔
599
                        deviceMap[dnPrefix] = device
1✔
600
                }
1✔
601
        }
602
        return staleDevices
1✔
603
}
604

605
// Helper function to extract DN prefix without devId
606
func extractDNPrefix(dn string) string {
1✔
607
        lastSlash := strings.LastIndex(dn, "/")
1✔
608
        if lastSlash == -1 {
1✔
NEW
609
                return dn
×
NEW
610
        }
×
611
        return dn[:lastSlash]
1✔
612
}
613

614
// Helper function to compare if device1 is newer than device2.
615
func isNewer(device1, device2 apicapi.ApicObject) bool {
1✔
616
        modTs1 := device1.GetAttrStr("modTs")
1✔
617
        modTs2 := device2.GetAttrStr("modTs")
1✔
618

1✔
619
        t1, err1 := time.Parse(time.RFC3339Nano, modTs1)
1✔
620
        t2, err2 := time.Parse(time.RFC3339Nano, modTs2)
1✔
621
        if err1 != nil || err2 != nil {
2✔
622
                // Fallback to string comparison if parsing fails.
1✔
623
                return modTs1 > modTs2
1✔
624
        }
1✔
625
        return t1.After(t2)
1✔
626
}
627

628
func (cont *AciController) deleteOldOpflexDevices() {
1✔
629
        var nodeUpdates []string
1✔
630
        cont.indexMutex.Lock()
1✔
631
        for node, devices := range cont.nodeOpflexDevice {
1✔
632
                var delDevices apicapi.ApicSlice
×
633
                fabricPathDn := cont.getActiveFabricPathDn(node)
×
634
                if fabricPathDn != "" {
×
NEW
635
                        staleDevices := cont.getStaleOpflexODevs(devices)
×
636
                        for _, device := range devices {
×
637
                                if device.GetAttrStr("delete") == "true" && device.GetAttrStr("fabricPathDn") != fabricPathDn {
×
638
                                        deleteTimeStr := device.GetAttrStr("deleteTime")
×
639
                                        deleteTime, err := time.Parse(time.RFC3339, deleteTimeStr)
×
640
                                        if err != nil {
×
641
                                                cont.log.Error("Failed to parse opflex device delete time: ", err)
×
642
                                                continue
×
643
                                        }
644
                                        now := time.Now()
×
645
                                        diff := now.Sub(deleteTime)
×
646
                                        if diff.Seconds() >= cont.config.OpflexDeviceDeleteTimeout {
×
647
                                                delDevices = append(delDevices, device)
×
648
                                        }
×
649
                                }
650
                        }
NEW
651
                        delDevices = append(delDevices, staleDevices...)
×
652
                        if len(delDevices) > 0 {
×
653
                                newDevices := deleteDevicesFromList(delDevices, devices)
×
654
                                cont.nodeOpflexDevice[node] = newDevices
×
655
                                cont.log.Info("Opflex device list for node ", node, " after deleting stale entries: ", cont.nodeOpflexDevice[node])
×
656
                                if len(newDevices) == 0 {
×
657
                                        delete(cont.nodeOpflexDevice, node)
×
658
                                }
×
659
                                nodeUpdates = append(nodeUpdates, node)
×
660
                        }
661
                }
662
        }
663
        cont.indexMutex.Unlock()
1✔
664
        if len(nodeUpdates) > 0 {
1✔
665
                cont.postOpflexDeviceDelete(nodeUpdates)
×
666
        }
×
667
}
668

669
// must have index lock
670
func (cont *AciController) setDeleteFlagForOldDevices(node, fabricPathDn string) {
1✔
671
        for _, device := range cont.nodeOpflexDevice[node] {
2✔
672
                if device.GetAttrStr("fabricPathDn") != fabricPathDn {
1✔
673
                        t := time.Now()
×
674
                        device.SetAttr("delete", "true")
×
675
                        device.SetAttr("deleteTime", t.Format(time.RFC3339))
×
676
                }
×
677
        }
678
}
679

680
// must have index lock
681
func (cont *AciController) fabricPathForNode(name string) (string, bool) {
1✔
682
        sz := len(cont.nodeOpflexDevice[name])
1✔
683
        for i := range cont.nodeOpflexDevice[name] {
2✔
684
                device := cont.nodeOpflexDevice[name][sz-1-i]
1✔
685
                deviceState := device.GetAttrStr("state")
1✔
686
                if deviceState == "connected" {
2✔
687
                        if deviceState != device.GetAttrStr("prevState") {
2✔
688
                                cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Info("Processing fabric path for node ",
1✔
689
                                        "when connected device state is found")
1✔
690
                                device.SetAttr("prevState", deviceState)
1✔
691
                        }
1✔
692
                        fabricPathDn := device.GetAttrStr("fabricPathDn")
1✔
693
                        cont.setDeleteFlagForOldDevices(name, fabricPathDn)
1✔
694
                        return fabricPathDn, true
1✔
695
                } else {
1✔
696
                        device.SetAttr("prevState", deviceState)
1✔
697
                }
1✔
698
        }
699
        if sz > 0 {
2✔
700
                // When the opflex-device for a node changes, for example during a live migration,
1✔
701
                // we end up with both the old and the new device objects till the old object
1✔
702
                // ages out on APIC. The new object is at end of the devices list (see opflexDeviceChanged),
1✔
703
                // so we return the fabricPathDn of the last opflex-device.
1✔
704
                cont.fabricPathLogger(cont.nodeOpflexDevice[name][sz-1].GetAttrStr("hostName"),
1✔
705
                        cont.nodeOpflexDevice[name][sz-1]).Info("Processing fabricPathDn for node")
1✔
706
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("fabricPathDn"), true
1✔
707
        }
1✔
708
        return "", false
1✔
709
}
710

711
// must have index lock
712
func (cont *AciController) deviceMacForNode(name string) (string, bool) {
1✔
713
        sz := len(cont.nodeOpflexDevice[name])
1✔
714
        if sz > 0 {
2✔
715
                // When the opflex-device for a node changes, for example when the
1✔
716
                // node is recreated, we end up with both the old and the new
1✔
717
                // device objects till the old object ages out on APIC. The
1✔
718
                // new object is at end of the devices list (see
1✔
719
                // opflexDeviceChanged), so we return the MAC address of the
1✔
720
                // last opflex-device.
1✔
721
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("mac"), true
1✔
722
        }
1✔
723
        return "", false
1✔
724
}
725

726
func apicRedirectDst(rpDn string, ip string, mac string,
727
        descr string, healthGroupDn string, enablePbrTracking bool) apicapi.ApicObject {
1✔
728
        dst := apicapi.NewVnsRedirectDest(rpDn, ip, mac).SetAttr("descr", descr)
1✔
729
        if healthGroupDn != "" && enablePbrTracking {
2✔
730
                dst.AddChild(apicapi.NewVnsRsRedirectHealthGroup(dst.GetDn(),
1✔
731
                        healthGroupDn))
1✔
732
        }
1✔
733
        return dst
1✔
734
}
735

736
func (cont *AciController) apicRedirectPol(name string, tenantName string, nodes []string,
737
        nodeMap map[string]*metadata.ServiceEndpoint,
738
        monPolDn string, enablePbrTracking bool) (apicapi.ApicObject, string) {
1✔
739
        rp := apicapi.NewVnsSvcRedirectPol(tenantName, name)
1✔
740
        rp.SetAttr("thresholdDownAction", "deny")
1✔
741
        if cont.config.DisableResilientHashing {
1✔
742
                rp.SetAttr("resilientHashEnabled", "no")
×
743
        }
×
744
        rpDn := rp.GetDn()
1✔
745
        for _, node := range nodes {
2✔
746
                cont.indexMutex.Lock()
1✔
747
                serviceEp, ok := nodeMap[node]
1✔
748
                if !ok {
1✔
749
                        continue
×
750
                }
751
                if serviceEp.Ipv4 != nil {
2✔
752
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv4.String(),
1✔
753
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
1✔
754
                }
1✔
755
                if serviceEp.Ipv6 != nil {
1✔
756
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv6.String(),
×
757
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
×
758
                }
×
759
                cont.indexMutex.Unlock()
1✔
760
        }
761
        if monPolDn != "" && enablePbrTracking {
2✔
762
                rp.AddChild(apicapi.NewVnsRsIPSLAMonitoringPol(rpDn, monPolDn))
1✔
763
        }
1✔
764
        return rp, rpDn
1✔
765
}
766

767
func apicExtNetCreate(enDn string, ingress string, ipv4 bool,
768
        cidr bool, sharedSec bool) apicapi.ApicObject {
1✔
769
        if !cidr {
2✔
770
                if ipv4 {
2✔
771
                        ingress += "/32"
1✔
772
                } else {
1✔
773
                        ingress += "/128"
×
774
                }
×
775
        }
776
        subnet := apicapi.NewL3extSubnet(enDn, ingress, "", "")
1✔
777
        if sharedSec {
2✔
778
                subnet.SetAttr("scope", "import-security,shared-security")
1✔
779
        }
1✔
780
        return subnet
1✔
781
}
782

783
func apicExtNet(name string, tenantName string, l3Out string,
784
        ingresses []string, sharedSecurity bool, snat bool) apicapi.ApicObject {
1✔
785
        en := apicapi.NewL3extInstP(tenantName, l3Out, name)
1✔
786
        enDn := en.GetDn()
1✔
787
        if snat {
2✔
788
                en.AddChild(apicapi.NewFvRsCons(enDn, name))
1✔
789
        } else {
2✔
790
                en.AddChild(apicapi.NewFvRsProv(enDn, name))
1✔
791
        }
1✔
792

793
        for _, ingress := range ingresses {
2✔
794
                ip, _, _ := net.ParseCIDR(ingress)
1✔
795
                // If ingress is a subnet
1✔
796
                if ip != nil {
2✔
797
                        if ip != nil && ip.To4() != nil {
2✔
798
                                subnet := apicExtNetCreate(enDn, ingress, true, true, sharedSecurity)
1✔
799
                                en.AddChild(subnet)
1✔
800
                        } else if ip != nil && ip.To16() != nil {
1✔
801
                                subnet := apicExtNetCreate(enDn, ingress, false, true, sharedSecurity)
×
802
                                en.AddChild(subnet)
×
803
                        }
×
804
                } else {
1✔
805
                        // If ingress is an IP address
1✔
806
                        ip := net.ParseIP(ingress)
1✔
807
                        if ip != nil && ip.To4() != nil {
2✔
808
                                subnet := apicExtNetCreate(enDn, ingress, true, false, sharedSecurity)
1✔
809
                                en.AddChild(subnet)
1✔
810
                        } else if ip != nil && ip.To16() != nil {
1✔
811
                                subnet := apicExtNetCreate(enDn, ingress, false, false, sharedSecurity)
×
812
                                en.AddChild(subnet)
×
813
                        }
×
814
                }
815
        }
816
        return en
1✔
817
}
818

819
func apicDefaultEgCons(conName string, tenantName string,
820
        appProfile string, epg string) apicapi.ApicObject {
×
821
        enDn := fmt.Sprintf("uni/tn-%s/ap-%s/epg-%s", tenantName, appProfile, epg)
×
822
        return apicapi.NewFvRsCons(enDn, conName)
×
823
}
×
824

825
func apicExtNetCons(conName string, tenantName string,
826
        l3Out string, net string) apicapi.ApicObject {
1✔
827
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
828
        return apicapi.NewFvRsCons(enDn, conName)
1✔
829
}
1✔
830

831
func apicExtNetProv(conName string, tenantName string,
832
        l3Out string, net string) apicapi.ApicObject {
1✔
833
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
834
        return apicapi.NewFvRsProv(enDn, conName)
1✔
835
}
1✔
836

837
// Helper function to check if a string item exists in a slice
838
func stringInSlice(str string, list []string) bool {
1✔
839
        for _, v := range list {
2✔
840
                if v == str {
2✔
841
                        return true
1✔
842
                }
1✔
843
        }
844
        return false
×
845
}
846

847
func validScope(scope string) bool {
1✔
848
        validValues := []string{"", "context", "tenant", "global"}
1✔
849
        return stringInSlice(scope, validValues)
1✔
850
}
1✔
851

852
func (cont *AciController) getGraphNameFromContract(name, tenantName string) (string, error) {
×
853
        var graphName string
×
854
        args := []string{
×
855
                "query-target=subtree",
×
856
        }
×
857
        url := fmt.Sprintf("/api/node/mo/uni/tn-%s/brc-%s.json?%s", tenantName, name, strings.Join(args, "&"))
×
858
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
859
        if err != nil {
×
860
                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
861
                return graphName, err
×
862
        }
×
863
        for _, obj := range apicresp.Imdata {
×
864
                for class, body := range obj {
×
865
                        if class == "vzRsSubjGraphAtt" {
×
866
                                tnVnsAbsGraphName, ok := body.Attributes["tnVnsAbsGraphName"].(string)
×
867
                                if ok {
×
868
                                        graphName = tnVnsAbsGraphName
×
869
                                }
×
870
                                break
×
871
                        }
872
                }
873
        }
874
        cont.log.Debug("graphName: ", graphName)
×
875
        return graphName, err
×
876
}
877

878
func apicContract(conName string, tenantName string,
879
        graphName string, scopeName string, isSnatPbrFltrChain bool,
880
        customSGAnnot bool) apicapi.ApicObject {
1✔
881
        con := apicapi.NewVzBrCP(tenantName, conName)
1✔
882
        if scopeName != "" && scopeName != "context" {
2✔
883
                con.SetAttr("scope", scopeName)
1✔
884
        }
1✔
885
        cs := apicapi.NewVzSubj(con.GetDn(), "loadbalancedservice")
1✔
886
        csDn := cs.GetDn()
1✔
887
        if isSnatPbrFltrChain {
2✔
888
                cs.SetAttr("revFltPorts", "no")
1✔
889
                inTerm := apicapi.NewVzInTerm(csDn)
1✔
890
                outTerm := apicapi.NewVzOutTerm(csDn)
1✔
891
                inTerm.AddChild(apicapi.NewVzRsInTermGraphAtt(inTerm.GetDn(), graphName))
1✔
892
                inTerm.AddChild(apicapi.NewVzRsFiltAtt(inTerm.GetDn(), conName+"_fromCons-toProv"))
1✔
893
                outTerm.AddChild(apicapi.NewVzRsOutTermGraphAtt(outTerm.GetDn(), graphName))
1✔
894
                outTerm.AddChild(apicapi.NewVzRsFiltAtt(outTerm.GetDn(), conName+"_fromProv-toCons"))
1✔
895
                cs.AddChild(inTerm)
1✔
896
                cs.AddChild(outTerm)
1✔
897
        } else {
2✔
898
                cs.AddChild(apicapi.NewVzRsSubjGraphAtt(csDn, graphName, customSGAnnot))
1✔
899
                cs.AddChild(apicapi.NewVzRsSubjFiltAtt(csDn, conName))
1✔
900
        }
1✔
901
        con.AddChild(cs)
1✔
902
        return con
1✔
903
}
904

905
func apicDevCtx(name string, tenantName string,
906
        graphName string, deviceName string, bdName string, rpDn string, isSnatPbrFltrChain bool) apicapi.ApicObject {
1✔
907
        cc := apicapi.NewVnsLDevCtx(tenantName, name, graphName, "loadbalancer")
1✔
908
        ccDn := cc.GetDn()
1✔
909
        graphDn := fmt.Sprintf("uni/tn-%s/lDevVip-%s", tenantName, deviceName)
1✔
910
        lifDn := fmt.Sprintf("%s/lIf-%s", graphDn, "interface")
1✔
911
        bdDn := fmt.Sprintf("uni/tn-%s/BD-%s", tenantName, bdName)
1✔
912
        cc.AddChild(apicapi.NewVnsRsLDevCtxToLDev(ccDn, graphDn))
1✔
913
        rpDnBase := rpDn
1✔
914
        for _, ctxConn := range []string{"consumer", "provider"} {
2✔
915
                lifCtx := apicapi.NewVnsLIfCtx(ccDn, ctxConn)
1✔
916
                if isSnatPbrFltrChain {
2✔
917
                        if ctxConn == "consumer" {
2✔
918
                                rpDn = rpDnBase + "_Cons"
1✔
919
                        } else {
2✔
920
                                rpDn = rpDnBase + "_Prov"
1✔
921
                        }
1✔
922
                }
923
                lifCtxDn := lifCtx.GetDn()
1✔
924
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToSvcRedirectPol(lifCtxDn, rpDn))
1✔
925
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToBD(lifCtxDn, bdDn))
1✔
926
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToLIf(lifCtxDn, lifDn))
1✔
927
                cc.AddChild(lifCtx)
1✔
928
        }
929
        return cc
1✔
930
}
931

932
func apicFilterEntry(filterDn string, count string, p_start string,
933
        p_end string, protocol string, stateful string, snat bool, outTerm bool) apicapi.ApicObject {
1✔
934
        fe := apicapi.NewVzEntry(filterDn, count)
1✔
935
        fe.SetAttr("etherT", "ip")
1✔
936
        fe.SetAttr("prot", protocol)
1✔
937
        if snat {
2✔
938
                if outTerm {
2✔
939
                        if protocol == "tcp" {
2✔
940
                                fe.SetAttr("tcpRules", "est")
1✔
941
                        }
1✔
942
                        // Reverse the ports for outTerm
943
                        fe.SetAttr("dFromPort", p_start)
1✔
944
                        fe.SetAttr("dToPort", p_end)
1✔
945
                } else {
1✔
946
                        fe.SetAttr("sFromPort", p_start)
1✔
947
                        fe.SetAttr("sToPort", p_end)
1✔
948
                }
1✔
949
        } else {
1✔
950
                fe.SetAttr("dFromPort", p_start)
1✔
951
                fe.SetAttr("dToPort", p_end)
1✔
952
        }
1✔
953
        fe.SetAttr("stateful", stateful)
1✔
954
        return fe
1✔
955
}
956
func apicFilter(name string, tenantName string,
957
        portSpec []v1.ServicePort, snat bool, snatRange portRangeSnat) apicapi.ApicObject {
1✔
958
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
959
        filterDn := filter.GetDn()
1✔
960

1✔
961
        var i int
1✔
962
        var port v1.ServicePort
1✔
963
        for i, port = range portSpec {
2✔
964
                pstr := strconv.Itoa(int(port.Port))
1✔
965
                proto := getProtocolStr(port.Protocol)
1✔
966
                fe := apicFilterEntry(filterDn, strconv.Itoa(i), pstr,
1✔
967
                        pstr, proto, "no", false, false)
1✔
968
                filter.AddChild(fe)
1✔
969
        }
1✔
970

971
        if snat {
1✔
972
                portSpec := []portRangeSnat{snatRange}
×
973
                p_start := strconv.Itoa(portSpec[0].start)
×
974
                p_end := strconv.Itoa(portSpec[0].end)
×
975

×
976
                fe1 := apicFilterEntry(filterDn, strconv.Itoa(i+1), p_start,
×
977
                        p_end, "tcp", "no", false, false)
×
978
                filter.AddChild(fe1)
×
979
                fe2 := apicFilterEntry(filterDn, strconv.Itoa(i+2), p_start,
×
980
                        p_end, "udp", "no", false, false)
×
981
                filter.AddChild(fe2)
×
982
        }
×
983
        return filter
1✔
984
}
985

986
func apicFilterSnat(name string, tenantName string,
987
        portSpec []portRangeSnat, outTerm bool) apicapi.ApicObject {
1✔
988
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
989
        filterDn := filter.GetDn()
1✔
990

1✔
991
        p_start := strconv.Itoa(portSpec[0].start)
1✔
992
        p_end := strconv.Itoa(portSpec[0].end)
1✔
993

1✔
994
        fe := apicFilterEntry(filterDn, "0", p_start,
1✔
995
                p_end, "tcp", "no", true, outTerm)
1✔
996
        filter.AddChild(fe)
1✔
997
        fe1 := apicFilterEntry(filterDn, "1", p_start,
1✔
998
                p_end, "udp", "no", true, outTerm)
1✔
999
        filter.AddChild(fe1)
1✔
1000

1✔
1001
        return filter
1✔
1002
}
1✔
1003

1004
func (cont *AciController) updateServiceDeviceInstance(key string,
1005
        service *v1.Service) error {
1✔
1006
        cont.indexMutex.Lock()
1✔
1007
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
1008
        cont.serviceEndPoints.GetnodesMetadata(key, service, nodeMap)
1✔
1009
        cont.indexMutex.Unlock()
1✔
1010

1✔
1011
        var nodes []string
1✔
1012
        for node := range nodeMap {
2✔
1013
                nodes = append(nodes, node)
1✔
1014
        }
1✔
1015
        sort.Strings(nodes)
1✔
1016
        name := cont.aciNameForKey("svc", key)
1✔
1017
        var conScope string
1✔
1018
        scopeVal, ok := service.ObjectMeta.Annotations[metadata.ServiceContractScopeAnnotation]
1✔
1019
        if ok {
2✔
1020
                normScopeVal := strings.ToLower(scopeVal)
1✔
1021
                if !validScope(normScopeVal) {
1✔
1022
                        errString := "Invalid service contract scope value provided " + scopeVal
×
1023
                        err := errors.New(errString)
×
1024
                        serviceLogger(cont.log, service).Error("Could not create contract: ", err)
×
1025
                        return err
×
1026
                } else {
1✔
1027
                        conScope = normScopeVal
1✔
1028
                }
1✔
1029
        } else {
1✔
1030
                conScope = DefaultServiceContractScope
1✔
1031
        }
1✔
1032

1033
        var sharedSecurity bool
1✔
1034
        if conScope == "global" {
2✔
1035
                sharedSecurity = true
1✔
1036
        } else {
2✔
1037
                sharedSecurity = DefaultServiceExtSubNetShared
1✔
1038
        }
1✔
1039

1040
        graphName := cont.aciNameForKey("svc", "global")
1✔
1041
        deviceName := cont.aciNameForKey("svc", "global")
1✔
1042
        _, customSGAnnPresent := service.ObjectMeta.Annotations[metadata.ServiceGraphNameAnnotation]
1✔
1043
        if customSGAnnPresent {
1✔
1044
                customSG, err := cont.getGraphNameFromContract(name, cont.config.AciVrfTenant)
×
1045
                if err == nil {
×
1046
                        graphName = customSG
×
1047
                }
×
1048
        }
1049
        cont.log.Debug("Using service graph ", graphName, " for service ", key)
1✔
1050

1✔
1051
        var serviceObjs apicapi.ApicSlice
1✔
1052
        if len(nodes) > 0 {
2✔
1053
                // 1. Service redirect policy
1✔
1054
                // The service redirect policy contains the MAC address
1✔
1055
                // and IP address of each of the service endpoints for
1✔
1056
                // each node that hosts a pod for this service.  The
1✔
1057
                // example below shows the case of two nodes.
1✔
1058
                rp, rpDn :=
1✔
1059
                        cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
1✔
1060
                                nodeMap, cont.staticMonPolDn(), cont.config.AciPbrTrackingNonSnat)
1✔
1061
                serviceObjs = append(serviceObjs, rp)
1✔
1062

1✔
1063
                // 2. Service graph contract and external network
1✔
1064
                // The service graph contract must be bound to the service
1✔
1065
                // graph.  This contract must be consumed by the default
1✔
1066
                // layer 3 network and provided by the service layer 3
1✔
1067
                // network.
1✔
1068
                {
2✔
1069
                        var ingresses []string
1✔
1070
                        for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
1071
                                ingresses = append(ingresses, ingress.IP)
1✔
1072
                        }
1✔
1073
                        serviceObjs = append(serviceObjs,
1✔
1074
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
1075
                                        cont.config.AciL3Out, ingresses, sharedSecurity, false))
1✔
1076
                }
1077

1078
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, false, customSGAnnPresent)
1✔
1079
                serviceObjs = append(serviceObjs, contract)
1✔
1080
                for _, net := range cont.config.AciExtNetworks {
2✔
1081
                        serviceObjs = append(serviceObjs,
1✔
1082
                                apicExtNetCons(name, cont.config.AciVrfTenant,
1✔
1083
                                        cont.config.AciL3Out, net))
1✔
1084
                }
1✔
1085

1086
                if cont.config.AddExternalContractToDefaultEPG && service.Spec.Type == v1.ServiceTypeLoadBalancer {
1✔
1087
                        defaultEpgTenant := cont.config.DefaultEg.PolicySpace
×
1088
                        defaultEpgStringSplit := strings.Split(cont.config.DefaultEg.Name, "|")
×
1089
                        var defaultEpgName, appProfile string
×
1090
                        if len(defaultEpgStringSplit) > 1 {
×
1091
                                appProfile = defaultEpgStringSplit[0]
×
1092
                                defaultEpgName = defaultEpgStringSplit[1]
×
1093
                        } else {
×
1094
                                appProfile = cont.config.AppProfile
×
1095
                                defaultEpgName = defaultEpgStringSplit[0]
×
1096
                        }
×
1097
                        serviceObjs = append(serviceObjs,
×
1098
                                apicDefaultEgCons(name, defaultEpgTenant, appProfile, defaultEpgName))
×
1099
                }
1100

1101
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
1102
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
1103

1✔
1104
                _, snat := cont.snatServices[key]
1✔
1105
                filter := apicFilter(name, cont.config.AciVrfTenant,
1✔
1106
                        service.Spec.Ports, snat, defaultPortRange)
1✔
1107
                serviceObjs = append(serviceObjs, filter)
1✔
1108

1✔
1109
                // 3. Device cluster context
1✔
1110
                // The logical device context binds the service contract
1✔
1111
                // to the redirect policy and the device cluster and
1✔
1112
                // bridge domain for the device cluster.
1✔
1113
                serviceObjs = append(serviceObjs,
1✔
1114
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName, deviceName,
1✔
1115
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, false))
1✔
1116
        }
1117

1118
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1119
        return nil
1✔
1120
}
1121

1122
func (cont *AciController) updateServiceDeviceInstanceSnat(key string) error {
1✔
1123
        nodeList := cont.nodeIndexer.List()
1✔
1124
        cont.indexMutex.Lock()
1✔
1125
        if len(cont.nodeServiceMetaCache) == 0 {
2✔
1126
                cont.indexMutex.Unlock()
1✔
1127
                return nil
1✔
1128
        }
1✔
1129
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
1130
        sort.Slice(nodeList, func(i, j int) bool {
2✔
1131
                nodeA := nodeList[i].(*v1.Node)
1✔
1132
                nodeB := nodeList[j].(*v1.Node)
1✔
1133
                return nodeA.ObjectMeta.Name < nodeB.ObjectMeta.Name
1✔
1134
        })
1✔
1135
        for itr, nodeItem := range nodeList {
2✔
1136
                if itr == cont.config.MaxSvcGraphNodes {
1✔
1137
                        break
×
1138
                }
1139
                node := nodeItem.(*v1.Node)
1✔
1140
                nodeName := node.ObjectMeta.Name
1✔
1141
                nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
1142
                if !ok {
2✔
1143
                        continue
1✔
1144
                }
1145
                _, ok = cont.fabricPathForNode(nodeName)
1✔
1146
                if !ok {
1✔
1147
                        continue
×
1148
                }
1149
                nodeLabels := node.ObjectMeta.Labels
1✔
1150
                excludeNode := cont.nodeLabelsInExcludeList(nodeLabels)
1✔
1151
                if excludeNode {
1✔
1152
                        continue
×
1153
                }
1154
                nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
1155
        }
1156
        cont.indexMutex.Unlock()
1✔
1157

1✔
1158
        var nodes []string
1✔
1159
        for node := range nodeMap {
2✔
1160
                nodes = append(nodes, node)
1✔
1161
        }
1✔
1162
        sort.Strings(nodes)
1✔
1163
        name := cont.aciNameForKey("snat", key)
1✔
1164
        var conScope = cont.config.SnatSvcContractScope
1✔
1165
        sharedSecurity := true
1✔
1166

1✔
1167
        graphName := cont.aciNameForKey("svc", "global")
1✔
1168
        var serviceObjs apicapi.ApicSlice
1✔
1169
        if len(nodes) > 0 {
2✔
1170
                // 1. Service redirect policy
1✔
1171
                // The service redirect policy contains the MAC address
1✔
1172
                // and IP address of each of the service endpoints for
1✔
1173
                // each node that hosts a pod for this service.
1✔
1174
                // For SNAT with the introduction of filter-chain usage, to work-around
1✔
1175
                // an APIC limitation, creating two PBR policies with same nodes.
1✔
1176
                var rpDn string
1✔
1177
                var rp apicapi.ApicObject
1✔
1178
                if cont.apicConn.SnatPbrFltrChain {
2✔
1179
                        rpCons, rpDnCons :=
1✔
1180
                                cont.apicRedirectPol(name+"_Cons", cont.config.AciVrfTenant, nodes,
1✔
1181
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
1182
                        serviceObjs = append(serviceObjs, rpCons)
1✔
1183
                        rpProv, _ :=
1✔
1184
                                cont.apicRedirectPol(name+"_Prov", cont.config.AciVrfTenant, nodes,
1✔
1185
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
1186
                        serviceObjs = append(serviceObjs, rpProv)
1✔
1187
                        rpDn = strings.TrimSuffix(rpDnCons, "_Cons")
1✔
1188
                } else {
1✔
1189
                        rp, rpDn =
×
1190
                                cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
×
1191
                                        nodeMap, cont.staticMonPolDn(), true)
×
1192
                        serviceObjs = append(serviceObjs, rp)
×
1193
                }
×
1194
                // 2. Service graph contract and external network
1195
                // The service graph contract must be bound to the
1196
                // service
1197
                // graph.  This contract must be consumed by the default
1198
                // layer 3 network and provided by the service layer 3
1199
                // network.
1200
                {
1✔
1201
                        var ingresses []string
1✔
1202
                        for _, policy := range cont.snatPolicyCache {
2✔
1203
                                ingresses = append(ingresses, policy.SnatIp...)
1✔
1204
                        }
1✔
1205
                        serviceObjs = append(serviceObjs,
1✔
1206
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
1207
                                        cont.config.AciL3Out, ingresses, sharedSecurity, true))
1✔
1208
                }
1209

1210
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, cont.apicConn.SnatPbrFltrChain, false)
1✔
1211
                serviceObjs = append(serviceObjs, contract)
1✔
1212

1✔
1213
                for _, net := range cont.config.AciExtNetworks {
2✔
1214
                        serviceObjs = append(serviceObjs,
1✔
1215
                                apicExtNetProv(name, cont.config.AciVrfTenant,
1✔
1216
                                        cont.config.AciL3Out, net))
1✔
1217
                }
1✔
1218

1219
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
1220
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
1221
                portSpec := []portRangeSnat{defaultPortRange}
1✔
1222
                if cont.apicConn.SnatPbrFltrChain {
2✔
1223
                        filterIn := apicFilterSnat(name+"_fromCons-toProv", cont.config.AciVrfTenant, portSpec, false)
1✔
1224
                        serviceObjs = append(serviceObjs, filterIn)
1✔
1225
                        filterOut := apicFilterSnat(name+"_fromProv-toCons", cont.config.AciVrfTenant, portSpec, true)
1✔
1226
                        serviceObjs = append(serviceObjs, filterOut)
1✔
1227
                } else {
1✔
1228
                        filter := apicFilterSnat(name, cont.config.AciVrfTenant, portSpec, false)
×
1229
                        serviceObjs = append(serviceObjs, filter)
×
1230
                }
×
1231
                // 3. Device cluster context
1232
                // The logical device context binds the service contract
1233
                // to the redirect policy and the device cluster and
1234
                // bridge domain for the device cluster.
1235
                serviceObjs = append(serviceObjs,
1✔
1236
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName, graphName,
1✔
1237
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, cont.apicConn.SnatPbrFltrChain))
1✔
1238
        }
1239
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1240
        return nil
1✔
1241
}
1242

1243
func (cont *AciController) nodeLabelsInExcludeList(Labels map[string]string) bool {
1✔
1244
        nodeSnatRedirectExclude := cont.config.NodeSnatRedirectExclude
1✔
1245

1✔
1246
        for _, nodeGroup := range nodeSnatRedirectExclude {
1✔
1247
                if len(nodeGroup.Labels) == 0 {
×
1248
                        continue
×
1249
                }
1250
                matchFound := true
×
1251
                for _, label := range nodeGroup.Labels {
×
1252
                        if _, ok := Labels["node-role.kubernetes.io/"+label]; !ok {
×
1253
                                matchFound = false
×
1254
                                break
×
1255
                        }
1256
                }
1257
                if matchFound {
×
1258
                        return true
×
1259
                }
×
1260
        }
1261
        return false
1✔
1262
}
1263

1264
func (cont *AciController) queueServiceUpdateByKey(key string) {
1✔
1265
        cont.serviceQueue.Add(key)
1✔
1266
}
1✔
1267

1268
func (cont *AciController) queueServiceUpdate(service *v1.Service) {
1✔
1269
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
1270
        if err != nil {
1✔
1271
                serviceLogger(cont.log, service).
×
1272
                        Error("Could not create service key: ", err)
×
1273
                return
×
1274
        }
×
1275
        cont.serviceQueue.Add(key)
1✔
1276
}
1277

1278
func apicDeviceCluster(name string, vrfTenant string,
1279
        physDom string, encap string,
1280
        nodes []string, nodeMap map[string]string) (apicapi.ApicObject, string) {
1✔
1281
        dc := apicapi.NewVnsLDevVip(vrfTenant, name)
1✔
1282
        dc.SetAttr("managed", "no")
1✔
1283
        dcDn := dc.GetDn()
1✔
1284
        dc.AddChild(apicapi.NewVnsRsALDevToPhysDomP(dcDn,
1✔
1285
                fmt.Sprintf("uni/phys-%s", physDom)))
1✔
1286
        lif := apicapi.NewVnsLIf(dcDn, "interface")
1✔
1287
        lif.SetAttr("encap", encap)
1✔
1288
        lifDn := lif.GetDn()
1✔
1289

1✔
1290
        for _, node := range nodes {
2✔
1291
                path, ok := nodeMap[node]
1✔
1292
                if !ok {
1✔
1293
                        continue
×
1294
                }
1295

1296
                cdev := apicapi.NewVnsCDev(dcDn, node)
1✔
1297
                cif := apicapi.NewVnsCif(cdev.GetDn(), "interface")
1✔
1298
                cif.AddChild(apicapi.NewVnsRsCIfPathAtt(cif.GetDn(), path))
1✔
1299
                cdev.AddChild(cif)
1✔
1300
                lif.AddChild(apicapi.NewVnsRsCIfAttN(lifDn, cif.GetDn()))
1✔
1301
                dc.AddChild(cdev)
1✔
1302
        }
1303

1304
        dc.AddChild(lif)
1✔
1305

1✔
1306
        return dc, dcDn
1✔
1307
}
1308

1309
func apicServiceGraph(name string, tenantName string,
1310
        dcDn string) apicapi.ApicObject {
1✔
1311
        sg := apicapi.NewVnsAbsGraph(tenantName, name)
1✔
1312
        sgDn := sg.GetDn()
1✔
1313
        var provDn string
1✔
1314
        var consDn string
1✔
1315
        var cTermDn string
1✔
1316
        var pTermDn string
1✔
1317
        {
2✔
1318
                an := apicapi.NewVnsAbsNode(sgDn, "loadbalancer")
1✔
1319
                an.SetAttr("managed", "no")
1✔
1320
                an.SetAttr("routingMode", "Redirect")
1✔
1321
                anDn := an.GetDn()
1✔
1322
                cons := apicapi.NewVnsAbsFuncConn(anDn, "consumer")
1✔
1323
                consDn = cons.GetDn()
1✔
1324
                an.AddChild(cons)
1✔
1325
                prov := apicapi.NewVnsAbsFuncConn(anDn, "provider")
1✔
1326
                provDn = prov.GetDn()
1✔
1327
                an.AddChild(prov)
1✔
1328
                an.AddChild(apicapi.NewVnsRsNodeToLDev(anDn, dcDn))
1✔
1329
                sg.AddChild(an)
1✔
1330
        }
1✔
1331
        {
1✔
1332
                tnc := apicapi.NewVnsAbsTermNodeCon(sgDn, "T1")
1✔
1333
                tncDn := tnc.GetDn()
1✔
1334
                cTerm := apicapi.NewVnsAbsTermConn(tncDn)
1✔
1335
                cTermDn = cTerm.GetDn()
1✔
1336
                tnc.AddChild(cTerm)
1✔
1337
                tnc.AddChild(apicapi.NewVnsInTerm(tncDn))
1✔
1338
                tnc.AddChild(apicapi.NewVnsOutTerm(tncDn))
1✔
1339
                sg.AddChild(tnc)
1✔
1340
        }
1✔
1341
        {
1✔
1342
                tnp := apicapi.NewVnsAbsTermNodeProv(sgDn, "T2")
1✔
1343
                tnpDn := tnp.GetDn()
1✔
1344
                pTerm := apicapi.NewVnsAbsTermConn(tnpDn)
1✔
1345
                pTermDn = pTerm.GetDn()
1✔
1346
                tnp.AddChild(pTerm)
1✔
1347
                tnp.AddChild(apicapi.NewVnsInTerm(tnpDn))
1✔
1348
                tnp.AddChild(apicapi.NewVnsOutTerm(tnpDn))
1✔
1349
                sg.AddChild(tnp)
1✔
1350
        }
1✔
1351
        {
1✔
1352
                acc := apicapi.NewVnsAbsConnection(sgDn, "C1")
1✔
1353
                acc.SetAttr("connDir", "provider")
1✔
1354
                accDn := acc.GetDn()
1✔
1355
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, consDn))
1✔
1356
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, cTermDn))
1✔
1357
                sg.AddChild(acc)
1✔
1358
        }
1✔
1359
        {
1✔
1360
                acp := apicapi.NewVnsAbsConnection(sgDn, "C2")
1✔
1361
                acp.SetAttr("connDir", "provider")
1✔
1362
                acpDn := acp.GetDn()
1✔
1363
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, provDn))
1✔
1364
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, pTermDn))
1✔
1365
                sg.AddChild(acp)
1✔
1366
        }
1✔
1367
        return sg
1✔
1368
}
1369
func (cont *AciController) updateDeviceCluster() {
1✔
1370
        nodeMap := make(map[string]string)
1✔
1371
        cont.indexMutex.Lock()
1✔
1372
        for node := range cont.nodeOpflexDevice {
2✔
1373
                cont.log.Debug("Processing node in nodeOpflexDevice cache : ", node)
1✔
1374
                fabricPath, ok := cont.fabricPathForNode(node)
1✔
1375
                if !ok {
2✔
1376
                        continue
1✔
1377
                }
1378
                nodeMap[node] = fabricPath
1✔
1379
        }
1380

1381
        // For clusters other than OpenShift On OpenStack,
1382
        // openStackFabricPathDnMap will be empty
1383
        for host, opflexOdevInfo := range cont.openStackFabricPathDnMap {
1✔
1384
                nodeMap[host] = opflexOdevInfo.fabricPathDn
×
1385
        }
×
1386

1387
        // For OpenShift On OpenStack clusters,
1388
        // hostFabricPathDnMap will be empty
1389
        for _, hostInfo := range cont.hostFabricPathDnMap {
1✔
1390
                if hostInfo.fabricPathDn != "" {
×
1391
                        nodeMap[hostInfo.host] = hostInfo.fabricPathDn
×
1392
                }
×
1393
        }
1394
        cont.indexMutex.Unlock()
1✔
1395

1✔
1396
        var nodes []string
1✔
1397
        for node := range nodeMap {
2✔
1398
                nodes = append(nodes, node)
1✔
1399
        }
1✔
1400
        sort.Strings(nodes)
1✔
1401

1✔
1402
        name := cont.aciNameForKey("svc", "global")
1✔
1403
        var serviceObjs apicapi.ApicSlice
1✔
1404

1✔
1405
        // 1. Device cluster:
1✔
1406
        // The device cluster is a set of physical paths that need to be
1✔
1407
        // created for each node in the cluster, that correspond to the
1✔
1408
        // service interface for each node.
1✔
1409
        dc, dcDn := apicDeviceCluster(name, cont.config.AciVrfTenant,
1✔
1410
                cont.config.AciServicePhysDom, cont.config.AciServiceEncap,
1✔
1411
                nodes, nodeMap)
1✔
1412
        serviceObjs = append(serviceObjs, dc)
1✔
1413

1✔
1414
        // 2. Service graph template
1✔
1415
        // The service graph controls how the traffic will be redirected.
1✔
1416
        // A service graph must be created for each device cluster.
1✔
1417
        serviceObjs = append(serviceObjs,
1✔
1418
                apicServiceGraph(name, cont.config.AciVrfTenant, dcDn))
1✔
1419

1✔
1420
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1421
}
1422

1423
func (cont *AciController) fabricPathLogger(node string,
1424
        obj apicapi.ApicObject) *logrus.Entry {
1✔
1425
        return cont.log.WithFields(logrus.Fields{
1✔
1426
                "fabricPath": obj.GetAttr("fabricPathDn"),
1✔
1427
                "mac":        obj.GetAttr("mac"),
1✔
1428
                "node":       node,
1✔
1429
                "obj":        obj,
1✔
1430
        })
1✔
1431
}
1✔
1432

1433
func (cont *AciController) setOpenStackSystemId() string {
×
1434

×
1435
        // 1) get opflexIDEp with containerName == <node name of any one of the openshift nodes>
×
1436
        // 2) extract OpenStack system id from compHvDn attribute
×
1437
        //    comp/prov-OpenStack/ctrlr-[k8s-scale]-k8s-scale/hv-overcloud-novacompute-0 - sample compHvDn,
×
1438
        //    where k8s-scale is the system id
×
1439

×
1440
        var systemId string
×
1441
        nodeList := cont.nodeIndexer.List()
×
1442
        if len(nodeList) < 1 {
×
1443
                return systemId
×
1444
        }
×
1445
        node := nodeList[0].(*v1.Node)
×
1446
        nodeName := node.ObjectMeta.Name
×
1447
        opflexIDEpFilter := fmt.Sprintf("query-target-filter=and(eq(opflexIDEp.containerName,\"%s\"))", nodeName)
×
1448
        opflexIDEpArgs := []string{
×
1449
                opflexIDEpFilter,
×
1450
        }
×
1451
        url := fmt.Sprintf("/api/node/class/opflexIDEp.json?%s", strings.Join(opflexIDEpArgs, "&"))
×
1452
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
1453
        if err != nil {
×
1454
                cont.log.Error("Failed to get APIC response, err: ", err.Error())
×
1455
                return systemId
×
1456
        }
×
1457
        for _, obj := range apicresp.Imdata {
×
1458
                for _, body := range obj {
×
1459
                        compHvDn, ok := body.Attributes["compHvDn"].(string)
×
1460
                        if ok {
×
1461
                                systemId = compHvDn[strings.IndexByte(compHvDn, '[')+1 : strings.IndexByte(compHvDn, ']')]
×
1462
                                break
×
1463
                        }
1464
                }
1465
        }
1466
        cont.indexMutex.Lock()
×
1467
        cont.openStackSystemId = systemId
×
1468
        cont.log.Info("Setting OpenStack system id : ", cont.openStackSystemId)
×
1469
        cont.indexMutex.Unlock()
×
1470
        return systemId
×
1471
}
1472

1473
// Returns true when a new OpenStack opflexODev is added
1474
func (cont *AciController) openStackOpflexOdevUpdate(obj apicapi.ApicObject) bool {
×
1475

×
1476
        // If opflexOdev compHvDn contains comp/prov-OpenShift/ctrlr-[<systemid>]-<systemid>,
×
1477
        // it means that it is an OpenStack OpflexOdev which belongs to OpenStack with system id <systemid>
×
1478

×
1479
        var deviceClusterUpdate bool
×
1480
        compHvDn := obj.GetAttrStr("compHvDn")
×
1481
        if strings.Contains(compHvDn, "prov-OpenStack") {
×
1482
                cont.indexMutex.Lock()
×
1483
                systemId := cont.openStackSystemId
×
1484
                cont.indexMutex.Unlock()
×
1485
                if systemId == "" {
×
1486
                        systemId = cont.setOpenStackSystemId()
×
1487
                }
×
1488
                if systemId == "" {
×
1489
                        cont.log.Error("Failed  to get OpenStack system id")
×
1490
                        return deviceClusterUpdate
×
1491
                }
×
1492
                prefix := fmt.Sprintf("comp/prov-OpenStack/ctrlr-[%s]-%s", systemId, systemId)
×
1493
                if strings.Contains(compHvDn, prefix) {
×
1494
                        cont.log.Info("Received notification for OpenStack opflexODev update, hostName: ",
×
1495
                                obj.GetAttrStr("hostName"), " dn: ", obj.GetAttrStr("dn"))
×
1496
                        cont.indexMutex.Lock()
×
1497
                        opflexOdevInfo, ok := cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")]
×
1498
                        if ok {
×
1499
                                opflexOdevInfo.opflexODevDn[obj.GetAttrStr("dn")] = struct{}{}
×
1500
                                cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")] = opflexOdevInfo
×
1501
                        } else {
×
1502
                                var openstackopflexodevinfo openstackOpflexOdevInfo
×
1503
                                opflexODevDn := make(map[string]struct{})
×
1504
                                opflexODevDn[obj.GetAttrStr("dn")] = struct{}{}
×
1505
                                openstackopflexodevinfo.fabricPathDn = obj.GetAttrStr("fabricPathDn")
×
1506
                                openstackopflexodevinfo.opflexODevDn = opflexODevDn
×
1507
                                cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")] = openstackopflexodevinfo
×
1508
                                deviceClusterUpdate = true
×
1509
                        }
×
1510
                        cont.indexMutex.Unlock()
×
1511
                }
1512
        }
1513
        return deviceClusterUpdate
×
1514
}
1515

1516
func (cont *AciController) infraRtAttEntPDeleted(dn string) {
×
1517
        // dn format : uni/infra/attentp-k8s-scale-esxi-aaep/rtattEntP-[uni/infra/funcprof/accbundle-esxi1-vpc-ipg]
×
1518
        cont.log.Info("Processing delete of infraRtAttEntP: ", dn)
×
1519

×
1520
        // extract uni/infra/funcprof/accbundle-esxi1-vpc-ipg
×
1521
        re := regexp.MustCompile(`\[(.*?)\]`)
×
1522
        matches := re.FindStringSubmatch(dn)
×
1523

×
1524
        if len(matches) < 2 {
×
1525
                cont.log.Error("Failed to extract ipg from dn : ", dn)
×
1526
                return
×
1527
        }
×
1528
        tdn := matches[1]
×
1529

×
1530
        cont.indexMutex.Lock()
×
1531
        _, ok := cont.hostFabricPathDnMap[tdn]
×
1532
        if ok {
×
1533
                delete(cont.hostFabricPathDnMap, tdn)
×
1534
                cont.log.Info("Deleted ipg : ", tdn)
×
1535
        }
×
1536
        cont.indexMutex.Unlock()
×
1537

×
1538
        if ok {
×
1539
                cont.updateDeviceCluster()
×
1540
        }
×
1541
}
1542

1543
func (cont *AciController) vpcIfDeleted(dn string) {
×
1544
        var deleted bool
×
1545
        cont.indexMutex.Lock()
×
1546
        for tDn, hostInfo := range cont.hostFabricPathDnMap {
×
1547
                if _, present := hostInfo.vpcIfDn[dn]; present {
×
1548
                        cont.log.Info("Deleting vpcIf, dn :", dn)
×
1549
                        delete(hostInfo.vpcIfDn, dn)
×
1550
                        if len(hostInfo.vpcIfDn) == 0 {
×
1551
                                cont.log.Infof("Removing fabricPathDn(%s) of ipg : %s ", hostInfo.fabricPathDn, hostInfo.host)
×
1552
                                hostInfo.fabricPathDn = ""
×
1553
                                deleted = true
×
1554
                        }
×
1555
                        cont.hostFabricPathDnMap[tDn] = hostInfo
×
1556
                }
1557
        }
1558
        cont.indexMutex.Unlock()
×
1559
        if deleted {
×
1560
                cont.updateDeviceCluster()
×
1561
        }
×
1562
}
1563

1564
func (cont *AciController) vpcIfChanged(obj apicapi.ApicObject) {
×
1565
        if cont.updateHostFabricPathDnMap(obj) {
×
1566
                cont.updateDeviceCluster()
×
1567
        }
×
1568
}
1569

1570
func (cont *AciController) updateHostFabricPathDnMap(obj apicapi.ApicObject) bool {
×
1571
        var accBndlGrpDn, fabricPathDn, dn string
×
1572
        for _, body := range obj {
×
1573
                var ok bool
×
1574
                accBndlGrpDn, ok = body.Attributes["accBndlGrpDn"].(string)
×
1575
                if !ok || (ok && accBndlGrpDn == "") {
×
1576
                        cont.log.Error("accBndlGrpDn missing/empty in vpcIf")
×
1577
                        return false
×
1578
                }
×
1579
                fabricPathDn, ok = body.Attributes["fabricPathDn"].(string)
×
1580
                if !ok && (ok && fabricPathDn == "") {
×
1581
                        cont.log.Error("fabricPathDn missing/empty in vpcIf")
×
1582
                        return false
×
1583
                }
×
1584
                dn, ok = body.Attributes["dn"].(string)
×
1585
                if !ok && (ok && dn == "") {
×
1586
                        cont.log.Error("dn missing/empty in vpcIf")
×
1587
                        return false
×
1588
                }
×
1589
        }
1590
        var updated bool
×
1591
        cont.indexMutex.Lock()
×
1592
        // If accBndlGrpDn exists in hostFabricPathDnMap, the vpcIf belongs to the cluster AEP
×
1593
        hostInfo, exists := cont.hostFabricPathDnMap[accBndlGrpDn]
×
1594
        if exists {
×
1595
                if _, present := hostInfo.vpcIfDn[dn]; !present {
×
1596
                        hostInfo.vpcIfDn[dn] = struct{}{}
×
1597
                        cont.log.Infof("vpcIf processing, dn : %s, accBndlGrpDn: %s", dn, accBndlGrpDn)
×
1598
                }
×
1599
                if hostInfo.fabricPathDn != fabricPathDn {
×
1600
                        hostInfo.fabricPathDn = fabricPathDn
×
1601
                        cont.log.Info("Updated fabricPathDn of ipg :", hostInfo.host, " to: ", hostInfo.fabricPathDn)
×
1602
                        updated = true
×
1603
                }
×
1604
                cont.hostFabricPathDnMap[accBndlGrpDn] = hostInfo
×
1605
        }
1606
        cont.indexMutex.Unlock()
×
1607
        return updated
×
1608
}
1609

1610
func (cont *AciController) infraRtAttEntPChanged(obj apicapi.ApicObject) {
×
1611
        var tdn string
×
1612
        for _, body := range obj {
×
1613
                var ok bool
×
1614
                tdn, ok = body.Attributes["tDn"].(string)
×
1615
                if !ok || (ok && tdn == "") {
×
1616
                        cont.log.Error("tDn missing/empty in infraRtAttEntP")
×
1617
                        return
×
1618
                }
×
1619
        }
1620
        var updated bool
×
1621
        cont.log.Info("infraRtAttEntP updated, tDn : ", tdn)
×
1622

×
1623
        // tdn format for vpc : /uni/infra/funcprof/accbundle-esxi1-vpc-ipg
×
1624
        // tdn format for single leaf : /uni/infra/funcprof/accportgrp-IPG_CLIENT_SIM
×
1625

×
1626
        // Ignore processing of single leaf
×
1627
        if !strings.Contains(tdn, "/accbundle-") {
×
1628
                cont.log.Info("Skipping processing of infraRtAttEntP update, not applicable for non-VPC configuration: ", tdn)
×
1629
                return
×
1630
        }
×
1631

1632
        // extract esxi1-vpc-ipg
1633
        parts := strings.Split(tdn, "/")
×
1634
        lastPart := parts[len(parts)-1]
×
1635
        host := strings.TrimPrefix(lastPart, "accbundle-")
×
1636

×
1637
        // adding entry for ipg in hostFabricPathDnMap
×
1638
        cont.indexMutex.Lock()
×
1639
        _, exists := cont.hostFabricPathDnMap[tdn]
×
1640
        if !exists {
×
1641
                var hostInfo hostFabricInfo
×
1642
                hostInfo.host = host
×
1643
                hostInfo.vpcIfDn = make(map[string]struct{})
×
1644
                cont.hostFabricPathDnMap[tdn] = hostInfo
×
1645
        }
×
1646
        cont.indexMutex.Unlock()
×
1647

×
1648
        accBndlGrpFilter := fmt.Sprintf(`query-target-filter=and(eq(vpcIf.accBndlGrpDn,"%s"))`, tdn)
×
1649
        url := fmt.Sprintf("/api/class/vpcIf.json?%s", accBndlGrpFilter)
×
1650
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
1651
        if err != nil {
×
1652
                cont.log.Error("Failed to get APIC response, err: ", err.Error())
×
1653
                return
×
1654
        }
×
1655

1656
        for _, obj := range apicresp.Imdata {
×
1657
                if cont.updateHostFabricPathDnMap(obj) && !updated {
×
1658
                        updated = true
×
1659
                }
×
1660
        }
1661

1662
        if updated {
×
1663
                cont.updateDeviceCluster()
×
1664
        }
×
1665
        return
×
1666
}
1667

1668
func (cont *AciController) opflexDeviceChanged(obj apicapi.ApicObject) {
1✔
1669
        devType := obj.GetAttrStr("devType")
1✔
1670
        domName := obj.GetAttrStr("domName")
1✔
1671
        ctrlrName := obj.GetAttrStr("ctrlrName")
1✔
1672

1✔
1673
        if !cont.config.DisableServiceVlanPreprovisioning && strings.Contains(cont.config.Flavor, "openstack") {
1✔
1674
                if cont.openStackOpflexOdevUpdate(obj) {
×
1675
                        cont.log.Info("OpenStack opflexODev for ", obj.GetAttrStr("hostName"), " is added")
×
1676
                        cont.updateDeviceCluster()
×
1677
                }
×
1678
        }
1679
        if (devType == cont.env.OpFlexDeviceType()) && (domName == cont.config.AciVmmDomain) && (ctrlrName == cont.config.AciVmmController) {
2✔
1680
                cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Processing opflex device update")
1✔
1681
                if obj.GetAttrStr("state") == "disconnected" {
2✔
1682
                        cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Opflex device disconnected")
1✔
1683
                        cont.indexMutex.Lock()
1✔
1684
                        for node, devices := range cont.nodeOpflexDevice {
1✔
1685
                                if node == obj.GetAttrStr("hostName") {
×
1686
                                        for _, device := range devices {
×
1687
                                                if device.GetDn() == obj.GetDn() {
×
1688
                                                        device.SetAttr("state", "disconnected")
×
1689
                                                        cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Debug("Opflex device cache updated for disconnected node")
×
1690
                                                }
×
1691
                                        }
1692
                                        cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", devices)
×
1693
                                        break
×
1694
                                }
1695
                        }
1696
                        cont.indexMutex.Unlock()
1✔
1697
                        cont.updateDeviceCluster()
1✔
1698
                        return
1✔
1699
                }
1700
                var nodeUpdates []string
1✔
1701

1✔
1702
                cont.indexMutex.Lock()
1✔
1703
                nodefound := false
1✔
1704
                for node, devices := range cont.nodeOpflexDevice {
2✔
1705
                        found := false
1✔
1706

1✔
1707
                        if node == obj.GetAttrStr("hostName") {
2✔
1708
                                nodefound = true
1✔
1709
                        }
1✔
1710

1711
                        for i, device := range devices {
2✔
1712
                                if device.GetDn() != obj.GetDn() {
2✔
1713
                                        continue
1✔
1714
                                }
1715
                                found = true
1✔
1716

1✔
1717
                                if obj.GetAttrStr("hostName") != node {
2✔
1718
                                        cont.fabricPathLogger(node, device).
1✔
1719
                                                Debug("Moving opflex device from node")
1✔
1720

1✔
1721
                                        devices = append(devices[:i], devices[i+1:]...)
1✔
1722
                                        cont.nodeOpflexDevice[node] = devices
1✔
1723
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1724
                                        break
1✔
1725
                                } else if (device.GetAttrStr("mac") != obj.GetAttrStr("mac")) ||
1✔
1726
                                        (device.GetAttrStr("fabricPathDn") != obj.GetAttrStr("fabricPathDn")) ||
1✔
1727
                                        (device.GetAttrStr("state") != obj.GetAttrStr("state")) {
2✔
1728
                                        cont.fabricPathLogger(node, obj).
1✔
1729
                                                Debug("Updating opflex device")
1✔
1730

1✔
1731
                                        devices = append(append(devices[:i], devices[i+1:]...), obj)
1✔
1732
                                        cont.nodeOpflexDevice[node] = devices
1✔
1733
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1734
                                        break
1✔
1735
                                }
1736
                        }
1737
                        if !found && obj.GetAttrStr("hostName") == node {
2✔
1738
                                cont.fabricPathLogger(node, obj).
1✔
1739
                                        Debug("Appending opflex device")
1✔
1740

1✔
1741
                                devices = append(devices, obj)
1✔
1742
                                cont.nodeOpflexDevice[node] = devices
1✔
1743
                                nodeUpdates = append(nodeUpdates, node)
1✔
1744
                        }
1✔
1745
                }
1746
                if !nodefound {
2✔
1747
                        node := obj.GetAttrStr("hostName")
1✔
1748
                        cont.fabricPathLogger(node, obj).Debug("Adding opflex device")
1✔
1749
                        cont.nodeOpflexDevice[node] = apicapi.ApicSlice{obj}
1✔
1750
                        nodeUpdates = append(nodeUpdates, node)
1✔
1751
                }
1✔
1752
                cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", cont.nodeOpflexDevice[obj.GetAttrStr("hostName")])
1✔
1753
                cont.indexMutex.Unlock()
1✔
1754

1✔
1755
                for _, node := range nodeUpdates {
2✔
1756
                        cont.env.NodeServiceChanged(node)
1✔
1757
                        cont.erspanSyncOpflexDev()
1✔
1758
                }
1✔
1759
                cont.updateDeviceCluster()
1✔
1760
        }
1761
}
1762

1763
func (cont *AciController) postOpflexDeviceDelete(nodes []string) {
1✔
1764
        cont.updateDeviceCluster()
1✔
1765
        for _, node := range nodes {
2✔
1766
                cont.env.NodeServiceChanged(node)
1✔
1767
                cont.erspanSyncOpflexDev()
1✔
1768
        }
1✔
1769
}
1770

1771
func (cont *AciController) opflexDeviceDeleted(dn string) {
1✔
1772
        var nodeUpdates []string
1✔
1773
        var dnFound bool //to check if the dn belongs to this cluster
1✔
1774
        cont.log.Info("Processing opflex device delete notification of ", dn)
1✔
1775
        cont.indexMutex.Lock()
1✔
1776
        for node, devices := range cont.nodeOpflexDevice {
2✔
1777
                for i, device := range devices {
2✔
1778
                        if device.GetDn() != dn {
2✔
1779
                                continue
1✔
1780
                        }
1781
                        dnFound = true
1✔
1782
                        cont.fabricPathLogger(node, device).
1✔
1783
                                Debug("Deleting opflex device path")
1✔
1784
                        devices = append(devices[:i], devices[i+1:]...)
1✔
1785
                        cont.nodeOpflexDevice[node] = devices
1✔
1786
                        cont.log.Info("Deleted opflex device of node ", node, ": ", dn)
1✔
1787
                        nodeUpdates = append(nodeUpdates, node)
1✔
1788
                        break
1✔
1789
                }
1790
                if len(devices) == 0 {
2✔
1791
                        delete(cont.nodeOpflexDevice, node)
1✔
1792
                }
1✔
1793
        }
1794

1795
        // For clusters other than OpenShift On OpenStack,
1796
        // openStackFabricPathDnMap will be empty
1797
        for host, opflexOdevInfo := range cont.openStackFabricPathDnMap {
1✔
1798
                if _, ok := opflexOdevInfo.opflexODevDn[dn]; ok {
×
1799
                        cont.log.Info("Received OpenStack opflexODev delete notification for ", dn)
×
1800
                        delete(opflexOdevInfo.opflexODevDn, dn)
×
1801
                        if len(opflexOdevInfo.opflexODevDn) < 1 {
×
1802
                                delete(cont.openStackFabricPathDnMap, host)
×
1803
                                cont.log.Info("OpenStack opflexODev of host ", host, " is deleted from cache")
×
1804
                                dnFound = true
×
1805
                        } else {
×
1806
                                cont.openStackFabricPathDnMap[host] = opflexOdevInfo
×
1807
                        }
×
1808
                        break
×
1809
                }
1810
        }
1811
        cont.indexMutex.Unlock()
1✔
1812

1✔
1813
        if dnFound {
2✔
1814
                cont.postOpflexDeviceDelete(nodeUpdates)
1✔
1815
        }
1✔
1816
}
1817

1818
func (cont *AciController) writeApicSvc(key string, service *v1.Service) {
1✔
1819
        if cont.isCNOEnabled() {
1✔
1820
                return
×
1821
        }
×
1822
        aobj := apicapi.NewVmmInjectedSvc(cont.vmmDomainProvider(),
1✔
1823
                cont.config.AciVmmDomain, cont.config.AciVmmController,
1✔
1824
                service.Namespace, service.Name)
1✔
1825
        aobjDn := aobj.GetDn()
1✔
1826
        aobj.SetAttr("guid", string(service.UID))
1✔
1827

1✔
1828
        svcns := service.ObjectMeta.Namespace
1✔
1829
        _, exists, err := cont.namespaceIndexer.GetByKey(svcns)
1✔
1830
        if err != nil {
1✔
1831
                cont.log.Error("Failed to lookup ns : ", svcns, " ", err)
×
1832
                return
×
1833
        }
×
1834
        if !exists {
2✔
1835
                cont.log.Debug("Namespace of service ", service.ObjectMeta.Name, ": ", svcns, " doesn't exist, hence not sending an update to the APIC")
1✔
1836
                return
1✔
1837
        }
1✔
1838

1839
        if !cont.serviceEndPoints.SetServiceApicObject(aobj, service) {
2✔
1840
                return
1✔
1841
        }
1✔
1842
        var setApicSvcDnsName bool
1✔
1843
        if len(cont.config.ApicHosts) != 0 && apicapi.ApicVersion >= "5.1" {
1✔
1844
                setApicSvcDnsName = true
×
1845
        }
×
1846
        // APIC model only allows one of these
1847
        for _, ingress := range service.Status.LoadBalancer.Ingress {
1✔
1848
                if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
1849
                        aobj.SetAttr("lbIp", ingress.IP)
×
1850
                } else if ingress.Hostname != "" {
×
1851
                        ipList, err := net.LookupHost(ingress.Hostname)
×
1852
                        if err == nil && len(ipList) > 0 {
×
1853
                                aobj.SetAttr("lbIp", ipList[0])
×
1854
                        } else {
×
1855
                                cont.log.Errorf("Lookup: err: %v, ipList: %+v", err, ipList)
×
1856
                        }
×
1857
                }
1858
                break
×
1859
        }
1860
        if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != "None" {
2✔
1861
                aobj.SetAttr("clusterIp", service.Spec.ClusterIP)
1✔
1862
        }
1✔
1863

1864
        var t string
1✔
1865
        switch service.Spec.Type {
1✔
1866
        case v1.ServiceTypeClusterIP:
×
1867
                t = "clusterIp"
×
1868
        case v1.ServiceTypeNodePort:
×
1869
                t = "nodePort"
×
1870
        case v1.ServiceTypeLoadBalancer:
1✔
1871
                t = "loadBalancer"
1✔
1872
        case v1.ServiceTypeExternalName:
×
1873
                t = "externalName"
×
1874
        }
1875
        if t != "" {
2✔
1876
                aobj.SetAttr("type", t)
1✔
1877
        }
1✔
1878

1879
        if setApicSvcDnsName || cont.config.Flavor == "k8s-overlay" {
1✔
1880
                dnsName := fmt.Sprintf("%s.%s.svc.cluster.local", service.Name, service.Namespace)
×
1881

×
1882
                for _, ingress := range service.Status.LoadBalancer.Ingress {
×
1883
                        if ingress.Hostname != "" {
×
1884
                                aobj.SetAttr("dnsName", ingress.Hostname)
×
1885
                        } else if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
1886
                                aobj.SetAttr("dnsName", dnsName)
×
1887
                        }
×
1888
                }
1889
                if t == "clusterIp" || t == "nodePort" || t == "externalName" {
×
1890
                        aobj.SetAttr("dnsName", dnsName)
×
1891
                }
×
1892
        }
1893
        for _, port := range service.Spec.Ports {
2✔
1894
                proto := getProtocolStr(port.Protocol)
1✔
1895
                p := apicapi.NewVmmInjectedSvcPort(aobjDn,
1✔
1896
                        strconv.Itoa(int(port.Port)), proto, port.TargetPort.String())
1✔
1897
                p.SetAttr("nodePort", strconv.Itoa(int(port.NodePort)))
1✔
1898
                aobj.AddChild(p)
1✔
1899
        }
1✔
1900
        if cont.config.EnableVmmInjectedLabels && service.ObjectMeta.Labels != nil && apicapi.ApicVersion >= "5.2" {
1✔
1901
                for key, val := range service.ObjectMeta.Labels {
×
1902
                        newLabelKey := cont.aciNameForKey("label", key)
×
1903
                        label := apicapi.NewVmmInjectedLabel(aobj.GetDn(),
×
1904
                                newLabelKey, val)
×
1905
                        aobj.AddChild(label)
×
1906
                }
×
1907
        }
1908
        name := cont.aciNameForKey("service-vmm", key)
1✔
1909
        cont.log.Debug("Write Service Object: ", aobj)
1✔
1910
        cont.apicConn.WriteApicObjects(name, apicapi.ApicSlice{aobj})
1✔
1911
        cont.log.Debugf("svcObject: %+v", aobj)
1✔
1912
}
1913

1914
func removeAllConditions(conditions []metav1.Condition, conditionType string) []metav1.Condition {
1✔
1915
        i := 0
1✔
1916
        for _, cond := range conditions {
1✔
1917
                if cond.Type != conditionType {
×
1918
                        conditions[i] = cond
×
1919
                }
×
1920
        }
1921
        return conditions[:i]
1✔
1922
}
1923

1924
func (cont *AciController) updateServiceCondition(service *v1.Service, success bool, reason string, message string) bool {
1✔
1925
        conditionType := "LbIpamAllocation"
1✔
1926

1✔
1927
        var condition metav1.Condition
1✔
1928
        if success {
2✔
1929
                condition.Status = metav1.ConditionTrue
1✔
1930
        } else {
2✔
1931
                condition.Status = metav1.ConditionFalse
1✔
1932
                condition.Message = message
1✔
1933
        }
1✔
1934
        condition.Type = conditionType
1✔
1935
        condition.Reason = reason
1✔
1936
        condition.LastTransitionTime = metav1.Time{time.Now()}
1✔
1937
        for _, cond := range service.Status.Conditions {
2✔
1938
                if cond.Type == conditionType &&
1✔
1939
                        cond.Status == condition.Status &&
1✔
1940
                        cond.Message == condition.Message &&
1✔
1941
                        cond.Reason == condition.Reason {
2✔
1942
                        return false
1✔
1943
                }
1✔
1944
        }
1945

1946
        service.Status.Conditions = removeAllConditions(service.Status.Conditions, conditionType)
1✔
1947
        service.Status.Conditions = append(service.Status.Conditions, condition)
1✔
1948
        return true
1✔
1949
}
1950

1951
func (cont *AciController) validateRequestedIps(lbIpList []string) (net.IP, net.IP, bool) {
1✔
1952
        var ipv4, ipv6 net.IP
1✔
1953
        for _, lbIp := range lbIpList {
2✔
1954
                ip := net.ParseIP(lbIp)
1✔
1955
                if ip != nil {
2✔
1956
                        if ip.To4() != nil {
2✔
1957
                                if ipv4.Equal(net.IP{}) {
2✔
1958
                                        ipv4 = ip
1✔
1959
                                } else {
2✔
1960
                                        cont.log.Error("Annotation should have only one ipv4")
1✔
1961
                                        return ipv4, ipv6, false
1✔
1962
                                }
1✔
1963
                        } else if ip.To16() != nil {
2✔
1964
                                if ipv6.Equal(net.IP{}) {
2✔
1965
                                        ipv6 = ip
1✔
1966
                                } else {
2✔
1967
                                        cont.log.Error("Annotation should have only one ipv6")
1✔
1968
                                        return ipv4, ipv6, false
1✔
1969
                                }
1✔
1970
                        }
1971
                }
1972
        }
1973
        return ipv4, ipv6, true
1✔
1974
}
1975

1976
func (cont *AciController) returnUnusedStaticIngressIps(staticIngressIps, requestedIps []net.IP) {
1✔
1977
        for _, staticIp := range staticIngressIps {
2✔
1978
                found := false
1✔
1979
                for _, reqIp := range requestedIps {
2✔
1980
                        if reqIp.Equal(staticIp) {
2✔
1981
                                found = true
1✔
1982
                        }
1✔
1983
                }
1984
                if !found {
2✔
1985
                        returnIps(cont.staticServiceIps, []net.IP{staticIp})
1✔
1986
                }
1✔
1987
        }
1988
}
1989

1990
func (cont *AciController) allocateServiceIps(servicekey string,
1991
        service *v1.Service) bool {
1✔
1992
        logger := serviceLogger(cont.log, service)
1✔
1993
        cont.indexMutex.Lock()
1✔
1994
        meta, ok := cont.serviceMetaCache[servicekey]
1✔
1995
        if !ok {
2✔
1996
                meta = &serviceMeta{}
1✔
1997
                cont.serviceMetaCache[servicekey] = meta
1✔
1998

1✔
1999
                // Read any existing IPs and attempt to allocate them to the pod
1✔
2000
                for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
2001
                        ip := net.ParseIP(ingress.IP)
1✔
2002
                        if ip == nil {
1✔
2003
                                continue
×
2004
                        }
2005
                        if ip.To4() != nil {
2✔
2006
                                if cont.serviceIps.GetV4IpCache()[0].RemoveIp(ip) {
2✔
2007
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
2008
                                } else if cont.staticServiceIps.V4.RemoveIp(ip) {
3✔
2009
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
2010
                                }
1✔
2011
                        } else if ip.To16() != nil {
2✔
2012
                                if cont.serviceIps.GetV6IpCache()[0].RemoveIp(ip) {
2✔
2013
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
2014
                                } else if cont.staticServiceIps.V6.RemoveIp(ip) {
3✔
2015
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
2016
                                }
1✔
2017
                        }
2018
                }
2019
        }
2020

2021
        if !cont.serviceSyncEnabled {
2✔
2022
                cont.indexMutex.Unlock()
1✔
2023
                return false
1✔
2024
        }
1✔
2025

2026
        var requestedIps []net.IP
1✔
2027
        // try to give the requested load balancer IP to the pod
1✔
2028
        lbIps, ok := service.ObjectMeta.Annotations[metadata.LbIpAnnotation]
1✔
2029
        if ok {
2✔
2030
                lbIpList := strings.Split(lbIps, ",")
1✔
2031
                ipv4, ipv6, valid := cont.validateRequestedIps(lbIpList)
1✔
2032
                if valid {
2✔
2033
                        if ipv4 != nil {
2✔
2034
                                requestedIps = append(requestedIps, ipv4)
1✔
2035
                        }
1✔
2036
                        if ipv6 != nil {
2✔
2037
                                requestedIps = append(requestedIps, ipv6)
1✔
2038
                        }
1✔
2039
                } else {
1✔
2040
                        cont.returnServiceIps(meta.ingressIps)
1✔
2041
                        cont.log.Error("Invalid LB IP annotation for service ", servicekey)
1✔
2042
                        condUpdated := cont.updateServiceCondition(service, false, "InvalidAnnotation", "Invalid Loadbalancer IP annotation")
1✔
2043
                        if condUpdated {
2✔
2044
                                _, err := cont.updateServiceStatus(service)
1✔
2045
                                if err != nil {
1✔
2046
                                        logger.Error("Failed to update service status : ", err)
×
2047
                                        cont.indexMutex.Unlock()
×
2048
                                        return true
×
2049
                                }
×
2050
                        }
2051
                        cont.indexMutex.Unlock()
1✔
2052
                        return false
1✔
2053
                }
2054
        } else {
1✔
2055
                requestedIp := net.ParseIP(service.Spec.LoadBalancerIP)
1✔
2056
                if requestedIp != nil {
2✔
2057
                        requestedIps = append(requestedIps, requestedIp)
1✔
2058
                }
1✔
2059
        }
2060
        if len(requestedIps) > 0 {
2✔
2061
                meta.requestedIps = []net.IP{}
1✔
2062
                for _, requestedIp := range requestedIps {
2✔
2063
                        hasRequestedIp := false
1✔
2064
                        for _, ip := range meta.staticIngressIps {
2✔
2065
                                if reflect.DeepEqual(requestedIp, ip) {
2✔
2066
                                        hasRequestedIp = true
1✔
2067
                                }
1✔
2068
                        }
2069
                        if !hasRequestedIp {
2✔
2070
                                if requestedIp.To4() != nil &&
1✔
2071
                                        cont.staticServiceIps.V4.RemoveIp(requestedIp) {
2✔
2072
                                        hasRequestedIp = true
1✔
2073
                                } else if requestedIp.To16() != nil &&
2✔
2074
                                        cont.staticServiceIps.V6.RemoveIp(requestedIp) {
2✔
2075
                                        hasRequestedIp = true
1✔
2076
                                }
1✔
2077
                        }
2078
                        if hasRequestedIp {
2✔
2079
                                meta.requestedIps = append(meta.requestedIps, requestedIp)
1✔
2080
                        }
1✔
2081
                }
2082
                cont.returnUnusedStaticIngressIps(meta.staticIngressIps, meta.requestedIps)
1✔
2083
                meta.staticIngressIps = meta.requestedIps
1✔
2084
                cont.returnServiceIps(meta.ingressIps)
1✔
2085
                meta.ingressIps = nil
1✔
2086
                // If no requested ips are allocatable
1✔
2087
                if len(meta.requestedIps) < 1 {
2✔
2088
                        logger.Error("No Requested Ip addresses available for service ", servicekey)
1✔
2089
                        condUpdated := cont.updateServiceCondition(service, false, "RequestedIpsNotAllocatable", "The requested ips for loadbalancer service are not available or not in extern static range")
1✔
2090
                        if condUpdated {
2✔
2091
                                _, err := cont.updateServiceStatus(service)
1✔
2092
                                if err != nil {
1✔
2093
                                        cont.indexMutex.Unlock()
×
2094
                                        logger.Error("Failed to update service status: ", err)
×
2095
                                        return true
×
2096
                                }
×
2097
                        }
2098
                        cont.indexMutex.Unlock()
1✔
2099
                        return false
1✔
2100
                }
2101
        } else if len(meta.requestedIps) > 0 {
1✔
2102
                meta.requestedIps = nil
×
2103
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
×
2104
                meta.staticIngressIps = nil
×
2105
        }
×
2106
        ingressIps := make([]net.IP, 0)
1✔
2107
        ingressIps = append(ingressIps, meta.ingressIps...)
1✔
2108
        ingressIps = append(ingressIps, meta.staticIngressIps...)
1✔
2109

1✔
2110
        var ipv4, ipv6 net.IP
1✔
2111
        for _, ip := range ingressIps {
2✔
2112
                if ip.To4() != nil {
2✔
2113
                        ipv4 = ip
1✔
2114
                } else if ip.To16() != nil {
3✔
2115
                        ipv6 = ip
1✔
2116
                }
1✔
2117
        }
2118
        var clusterIPv4, clusterIPv6 net.IP
1✔
2119
        clusterIPs := append([]string{service.Spec.ClusterIP}, service.Spec.ClusterIPs...)
1✔
2120
        for _, ipStr := range clusterIPs {
2✔
2121
                ip := net.ParseIP(ipStr)
1✔
2122
                if ip == nil {
1✔
2123
                        continue
×
2124
                }
2125
                if ip.To4() != nil && clusterIPv4 == nil {
2✔
2126
                        clusterIPv4 = ip
1✔
2127
                } else if ip.To16() != nil && strings.Contains(ip.String(), ":") && clusterIPv6 == nil {
3✔
2128
                        clusterIPv6 = ip
1✔
2129
                }
1✔
2130
        }
2131
        if clusterIPv4 != nil && ipv4 == nil {
2✔
2132
                if len(requestedIps) < 1 {
2✔
2133
                        ipv4, _ = cont.serviceIps.AllocateIp(true)
1✔
2134
                        if ipv4 != nil {
2✔
2135
                                ingressIps = append(ingressIps, ipv4)
1✔
2136
                        }
1✔
2137
                }
2138
        } else if clusterIPv4 == nil && ipv4 != nil {
1✔
2139
                cont.removeIpFromIngressIPList(&ingressIps, ipv4)
×
2140
        }
×
2141

2142
        if clusterIPv6 != nil && ipv6 == nil {
2✔
2143
                if len(requestedIps) < 1 {
2✔
2144
                        ipv6, _ = cont.serviceIps.AllocateIp(false)
1✔
2145
                        if ipv6 != nil {
2✔
2146
                                ingressIps = append(ingressIps, ipv6)
1✔
2147
                        }
1✔
2148
                }
2149
        } else if clusterIPv6 == nil && ipv6 != nil {
1✔
2150
                cont.removeIpFromIngressIPList(&ingressIps, ipv6)
×
2151
        }
×
2152

2153
        if len(requestedIps) < 1 {
2✔
2154
                meta.ingressIps = ingressIps
1✔
2155
        }
1✔
2156
        if ipv4 == nil && ipv6 == nil {
2✔
2157
                logger.Error("No IP addresses available for service")
1✔
2158
                cont.indexMutex.Unlock()
1✔
2159
                return true
1✔
2160
        }
1✔
2161
        cont.indexMutex.Unlock()
1✔
2162
        var newIngress []v1.LoadBalancerIngress
1✔
2163
        for _, ip := range meta.ingressIps {
2✔
2164
                newIngress = append(newIngress, v1.LoadBalancerIngress{IP: ip.String()})
1✔
2165
        }
1✔
2166
        for _, ip := range meta.staticIngressIps {
2✔
2167
                newIngress = append(newIngress, v1.LoadBalancerIngress{IP: ip.String()})
1✔
2168
        }
1✔
2169

2170
        ipUpdated := false
1✔
2171
        if !reflect.DeepEqual(newIngress, service.Status.LoadBalancer.Ingress) {
2✔
2172
                service.Status.LoadBalancer.Ingress = newIngress
1✔
2173

1✔
2174
                logger.WithFields(logrus.Fields{
1✔
2175
                        "status": service.Status.LoadBalancer.Ingress,
1✔
2176
                }).Info("Updating service load balancer status")
1✔
2177

1✔
2178
                ipUpdated = true
1✔
2179
        }
1✔
2180

2181
        success := true
1✔
2182
        reason := "Success"
1✔
2183
        message := ""
1✔
2184
        if len(requestedIps) > 0 && len(requestedIps) != len(meta.staticIngressIps) {
1✔
2185
                success = false
×
2186
                reason = "OneIpNotAllocatable"
×
2187
                message = "One of the requested Ips is not allocatable"
×
2188
        }
×
2189
        condUpdated := cont.updateServiceCondition(service, success, reason, message)
1✔
2190
        if ipUpdated || condUpdated {
2✔
2191
                _, err := cont.updateServiceStatus(service)
1✔
2192
                if err != nil {
1✔
2193
                        logger.Error("Failed to update service status: ", err)
×
2194
                        return true
×
2195
                }
×
2196
        }
2197
        return false
1✔
2198
}
2199

2200
func (cont *AciController) handleServiceDelete(servicekey string) bool {
1✔
2201
        if cont.isCNOEnabled() {
1✔
2202
                return false
×
2203
        }
×
2204
        cont.clearLbService(servicekey)
1✔
2205
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("service-vmm",
1✔
2206
                servicekey))
1✔
2207
        return false
1✔
2208
}
2209

2210
func (cont *AciController) handleServiceUpdate(service *v1.Service) bool {
1✔
2211
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2212
        if err != nil {
1✔
2213
                serviceLogger(cont.log, service).
×
2214
                        Error("Could not create service key: ", err)
×
2215
                return false
×
2216
        }
×
2217
        if cont.isCNOEnabled() {
1✔
2218
                return false
×
2219
        }
×
2220
        var requeue bool
1✔
2221
        isLoadBalancer := service.Spec.Type == v1.ServiceTypeLoadBalancer
1✔
2222
        if isLoadBalancer {
2✔
2223
                if *cont.config.AllocateServiceIps {
2✔
2224
                        requeue = cont.allocateServiceIps(servicekey, service)
1✔
2225
                }
1✔
2226
                cont.indexMutex.Lock()
1✔
2227
                if cont.serviceSyncEnabled {
2✔
2228
                        cont.indexMutex.Unlock()
1✔
2229
                        err = cont.updateServiceDeviceInstance(servicekey, service)
1✔
2230
                        if err != nil {
1✔
2231
                                serviceLogger(cont.log, service).
×
2232
                                        Error("Failed to update service device Instance: ", err)
×
2233
                                return true
×
2234
                        }
×
2235
                } else {
1✔
2236
                        cont.indexMutex.Unlock()
1✔
2237
                }
1✔
2238
        } else {
1✔
2239
                cont.clearLbService(servicekey)
1✔
2240
        }
1✔
2241
        cont.writeApicSvc(servicekey, service)
1✔
2242
        return requeue
1✔
2243
}
2244

2245
func (cont *AciController) clearLbService(servicekey string) {
1✔
2246
        cont.indexMutex.Lock()
1✔
2247
        if meta, ok := cont.serviceMetaCache[servicekey]; ok {
2✔
2248
                cont.returnServiceIps(meta.ingressIps)
1✔
2249
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
1✔
2250
                delete(cont.serviceMetaCache, servicekey)
1✔
2251
        }
1✔
2252
        cont.indexMutex.Unlock()
1✔
2253
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("svc", servicekey))
1✔
2254
}
2255

2256
func getEndpointsIps(endpoints *v1.Endpoints) map[string]bool {
1✔
2257
        ips := make(map[string]bool)
1✔
2258
        for _, subset := range endpoints.Subsets {
2✔
2259
                for _, addr := range subset.Addresses {
2✔
2260
                        ips[addr.IP] = true
1✔
2261
                }
1✔
2262
                for _, addr := range subset.NotReadyAddresses {
1✔
2263
                        ips[addr.IP] = true
×
2264
                }
×
2265
        }
2266
        return ips
1✔
2267
}
2268

2269
func getServiceTargetPorts(service *v1.Service) map[string]targetPort {
1✔
2270
        ports := make(map[string]targetPort)
1✔
2271
        for _, port := range service.Spec.Ports {
2✔
2272
                portNum := port.TargetPort.IntValue()
1✔
2273
                if portNum <= 0 {
2✔
2274
                        portNum = int(port.Port)
1✔
2275
                }
1✔
2276
                key := portProto(&port.Protocol) + "-num-" + strconv.Itoa(portNum)
1✔
2277
                ports[key] = targetPort{
1✔
2278
                        proto: port.Protocol,
1✔
2279
                        ports: []int{portNum},
1✔
2280
                }
1✔
2281
        }
2282
        return ports
1✔
2283
}
2284

2285
func (cont *AciController) endpointsAdded(obj interface{}) {
1✔
2286
        endpoints := obj.(*v1.Endpoints)
1✔
2287
        servicekey, err := cache.MetaNamespaceKeyFunc(obj.(*v1.Endpoints))
1✔
2288
        if err != nil {
1✔
2289
                cont.log.Error("Could not create service key: ", err)
×
2290
                return
×
2291
        }
×
2292

2293
        ips := getEndpointsIps(endpoints)
1✔
2294
        cont.indexMutex.Lock()
1✔
2295
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
2296
        cont.queueIPNetPolUpdates(ips)
1✔
2297
        cont.indexMutex.Unlock()
1✔
2298

1✔
2299
        cont.queueEndpointsNetPolUpdates(endpoints)
1✔
2300

1✔
2301
        cont.queueServiceUpdateByKey(servicekey)
1✔
2302
}
2303

2304
func (cont *AciController) endpointsDeleted(obj interface{}) {
1✔
2305
        endpoints, isEndpoints := obj.(*v1.Endpoints)
1✔
2306
        if !isEndpoints {
1✔
2307
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2308
                if !ok {
×
2309
                        cont.log.Error("Received unexpected object: ", obj)
×
2310
                        return
×
2311
                }
×
2312
                endpoints, ok = deletedState.Obj.(*v1.Endpoints)
×
2313
                if !ok {
×
2314
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpoints object: ", deletedState.Obj)
×
2315
                        return
×
2316
                }
×
2317
        }
2318
        servicekey, err := cache.MetaNamespaceKeyFunc(endpoints)
1✔
2319
        if err != nil {
1✔
2320
                cont.log.Error("Could not create service key: ", err)
×
2321
                return
×
2322
        }
×
2323

2324
        ips := getEndpointsIps(endpoints)
1✔
2325
        cont.indexMutex.Lock()
1✔
2326
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
2327
        cont.queueIPNetPolUpdates(ips)
1✔
2328
        cont.indexMutex.Unlock()
1✔
2329

1✔
2330
        cont.queueEndpointsNetPolUpdates(endpoints)
1✔
2331

1✔
2332
        cont.queueServiceUpdateByKey(servicekey)
1✔
2333
}
2334

2335
func (cont *AciController) endpointsUpdated(oldEps, newEps interface{}) {
1✔
2336
        oldendpoints := oldEps.(*v1.Endpoints)
1✔
2337
        newendpoints := newEps.(*v1.Endpoints)
1✔
2338
        servicekey, err := cache.MetaNamespaceKeyFunc(newendpoints)
1✔
2339
        if err != nil {
1✔
2340
                cont.log.Error("Could not create service key: ", err)
×
2341
                return
×
2342
        }
×
2343

2344
        oldIps := getEndpointsIps(oldendpoints)
1✔
2345
        newIps := getEndpointsIps(newendpoints)
1✔
2346
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
2347
                cont.indexMutex.Lock()
1✔
2348
                cont.queueIPNetPolUpdates(oldIps)
1✔
2349
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
2350
                cont.queueIPNetPolUpdates(newIps)
1✔
2351
                cont.indexMutex.Unlock()
1✔
2352
        }
1✔
2353

2354
        if !reflect.DeepEqual(oldendpoints.Subsets, newendpoints.Subsets) {
2✔
2355
                cont.queueEndpointsNetPolUpdates(oldendpoints)
1✔
2356
                cont.queueEndpointsNetPolUpdates(newendpoints)
1✔
2357
        }
1✔
2358

2359
        cont.queueServiceUpdateByKey(servicekey)
1✔
2360
}
2361

2362
func (cont *AciController) serviceAdded(obj interface{}) {
1✔
2363
        service := obj.(*v1.Service)
1✔
2364
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2365
        if err != nil {
1✔
2366
                serviceLogger(cont.log, service).
×
2367
                        Error("Could not create service key: ", err)
×
2368
                return
×
2369
        }
×
2370

2371
        ports := getServiceTargetPorts(service)
1✔
2372
        cont.indexMutex.Lock()
1✔
2373
        cont.queuePortNetPolUpdates(ports)
1✔
2374
        cont.updateTargetPortIndex(true, servicekey, nil, ports)
1✔
2375
        cont.indexMutex.Unlock()
1✔
2376

1✔
2377
        cont.queueServiceUpdateByKey(servicekey)
1✔
2378
}
2379

2380
func (cont *AciController) serviceUpdated(oldSvc, newSvc interface{}) {
1✔
2381
        oldservice := oldSvc.(*v1.Service)
1✔
2382
        newservice := newSvc.(*v1.Service)
1✔
2383
        servicekey, err := cache.MetaNamespaceKeyFunc(newservice)
1✔
2384
        if err != nil {
1✔
2385
                serviceLogger(cont.log, newservice).
×
2386
                        Error("Could not create service key: ", err)
×
2387
                return
×
2388
        }
×
2389
        oldPorts := getServiceTargetPorts(oldservice)
1✔
2390
        newPorts := getServiceTargetPorts(newservice)
1✔
2391
        if !reflect.DeepEqual(oldPorts, newPorts) {
1✔
2392
                cont.indexMutex.Lock()
×
2393
                cont.queuePortNetPolUpdates(oldPorts)
×
2394
                cont.updateTargetPortIndex(true, servicekey, oldPorts, newPorts)
×
2395
                cont.queuePortNetPolUpdates(newPorts)
×
2396
                cont.indexMutex.Unlock()
×
2397
        }
×
2398
        cont.queueServiceUpdateByKey(servicekey)
1✔
2399
}
2400

2401
func (cont *AciController) serviceDeleted(obj interface{}) {
1✔
2402
        service, isService := obj.(*v1.Service)
1✔
2403
        if !isService {
1✔
2404
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2405
                if !ok {
×
2406
                        serviceLogger(cont.log, service).
×
2407
                                Error("Received unexpected object: ", obj)
×
2408
                        return
×
2409
                }
×
2410
                service, ok = deletedState.Obj.(*v1.Service)
×
2411
                if !ok {
×
2412
                        serviceLogger(cont.log, service).
×
2413
                                Error("DeletedFinalStateUnknown contained non-Services object: ", deletedState.Obj)
×
2414
                        return
×
2415
                }
×
2416
        }
2417
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2418
        if err != nil {
1✔
2419
                serviceLogger(cont.log, service).
×
2420
                        Error("Could not create service key: ", err)
×
2421
                return
×
2422
        }
×
2423

2424
        ports := getServiceTargetPorts(service)
1✔
2425
        cont.indexMutex.Lock()
1✔
2426
        cont.updateTargetPortIndex(true, servicekey, ports, nil)
1✔
2427
        cont.queuePortNetPolUpdates(ports)
1✔
2428
        delete(cont.snatServices, servicekey)
1✔
2429
        cont.indexMutex.Unlock()
1✔
2430

1✔
2431
        deletedServiceKey := "DELETED_" + servicekey
1✔
2432
        cont.queueServiceUpdateByKey(deletedServiceKey)
1✔
2433
}
2434

2435
func (cont *AciController) serviceFullSync() {
1✔
2436
        cache.ListAll(cont.serviceIndexer, labels.Everything(),
1✔
2437
                func(sobj interface{}) {
2✔
2438
                        cont.queueServiceUpdate(sobj.(*v1.Service))
1✔
2439
                })
1✔
2440
}
2441

2442
func (cont *AciController) getEndpointSliceIps(endpointSlice *discovery.EndpointSlice) map[string]bool {
1✔
2443
        ips := make(map[string]bool)
1✔
2444
        for _, endpoints := range endpointSlice.Endpoints {
2✔
2445
                for _, addr := range endpoints.Addresses {
2✔
2446
                        ips[addr] = true
1✔
2447
                }
1✔
2448
        }
2449
        return ips
1✔
2450
}
2451

2452
func (cont *AciController) notReadyEndpointPresent(endpointSlice *discovery.EndpointSlice) bool {
×
2453
        for _, endpoints := range endpointSlice.Endpoints {
×
2454
                if (endpoints.Conditions.Ready != nil && !*endpoints.Conditions.Ready) &&
×
2455
                        (endpoints.Conditions.Terminating == nil || !*endpoints.Conditions.Terminating) {
×
2456
                        return true
×
2457
                }
×
2458
        }
2459
        return false
×
2460
}
2461

2462
func (cont *AciController) getEndpointSliceEpIps(endpoints *discovery.Endpoint) map[string]bool {
×
2463
        ips := make(map[string]bool)
×
2464
        for _, addr := range endpoints.Addresses {
×
2465
                ips[addr] = true
×
2466
        }
×
2467
        return ips
×
2468
}
2469

2470
func (cont *AciController) processDelayedEpSlices() {
1✔
2471
        var processEps []DelayedEpSlice
1✔
2472
        cont.indexMutex.Lock()
1✔
2473
        for i := 0; i < len(cont.delayedEpSlices); i++ {
1✔
2474
                delayedepslice := cont.delayedEpSlices[i]
×
2475
                if time.Now().After(delayedepslice.DelayedTime) {
×
2476
                        var toprocess DelayedEpSlice
×
2477
                        err := util.DeepCopyObj(&delayedepslice, &toprocess)
×
2478
                        if err != nil {
×
2479
                                cont.log.Error(err)
×
2480
                                continue
×
2481
                        }
2482
                        processEps = append(processEps, toprocess)
×
2483
                        cont.delayedEpSlices = append(cont.delayedEpSlices[:i], cont.delayedEpSlices[i+1:]...)
×
2484
                }
2485
        }
2486

2487
        cont.indexMutex.Unlock()
1✔
2488
        for _, epslice := range processEps {
1✔
2489
                //ignore the epslice if newly added endpoint is not ready
×
2490
                if cont.notReadyEndpointPresent(epslice.NewEpSlice) {
×
2491
                        cont.log.Debug("Ignoring the update as the new endpoint is not ready : ", epslice.NewEpSlice)
×
2492
                } else {
×
2493
                        cont.log.Debug("Processing update of epslice : ", epslice.NewEpSlice)
×
2494
                        cont.doendpointSliceUpdated(epslice.OldEpSlice, epslice.NewEpSlice)
×
2495
                }
×
2496
        }
2497
}
2498

2499
func (cont *AciController) endpointSliceAdded(obj interface{}) {
1✔
2500
        endpointslice, ok := obj.(*discovery.EndpointSlice)
1✔
2501
        if !ok {
1✔
2502
                cont.log.Error("error processing Endpointslice object: ", obj)
×
2503
                return
×
2504
        }
×
2505
        servicekey, valid := getServiceKey(endpointslice)
1✔
2506
        if !valid {
1✔
2507
                return
×
2508
        }
×
2509
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
2510
        cont.indexMutex.Lock()
1✔
2511
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
2512
        cont.queueIPNetPolUpdates(ips)
1✔
2513
        cont.indexMutex.Unlock()
1✔
2514

1✔
2515
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
2516

1✔
2517
        cont.queueServiceUpdateByKey(servicekey)
1✔
2518
        cont.log.Info("EndPointSlice Object Added: ", servicekey)
1✔
2519
}
2520

2521
func (cont *AciController) endpointSliceDeleted(obj interface{}) {
1✔
2522
        endpointslice, isEndpointslice := obj.(*discovery.EndpointSlice)
1✔
2523
        if !isEndpointslice {
1✔
2524
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2525
                if !ok {
×
2526
                        cont.log.Error("Received unexpected object: ", obj)
×
2527
                        return
×
2528
                }
×
2529
                endpointslice, ok = deletedState.Obj.(*discovery.EndpointSlice)
×
2530
                if !ok {
×
2531
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpointslice object: ", deletedState.Obj)
×
2532
                        return
×
2533
                }
×
2534
        }
2535
        servicekey, valid := getServiceKey(endpointslice)
1✔
2536
        if !valid {
1✔
2537
                return
×
2538
        }
×
2539
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
2540
        cont.indexMutex.Lock()
1✔
2541
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
2542
        cont.queueIPNetPolUpdates(ips)
1✔
2543
        cont.indexMutex.Unlock()
1✔
2544
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
2545
        cont.queueServiceUpdateByKey(servicekey)
1✔
2546
}
2547

2548
// Checks if the given service is present in the user configured list of services
2549
// for pbr delay and if present, returns the servie specific delay if configured
2550
func (cont *AciController) svcInAddDelayList(name, ns string) (int, bool) {
×
2551
        for _, svc := range cont.config.ServiceGraphEndpointAddDelay.Services {
×
2552
                if svc.Name == name && svc.Namespace == ns {
×
2553
                        return svc.Delay, true
×
2554
                }
×
2555
        }
2556
        return 0, false
×
2557
}
2558

2559
// Check if the endpointslice update notification has any deletion of enpoint
2560
func (cont *AciController) isDeleteEndpointSlice(oldendpointslice, newendpointslice *discovery.EndpointSlice) bool {
×
2561
        del := false
×
2562

×
2563
        // if any endpoint is removed from endpontslice
×
2564
        if len(newendpointslice.Endpoints) < len(oldendpointslice.Endpoints) {
×
2565
                del = true
×
2566
        }
×
2567

2568
        if !del {
×
2569
                // if any one of the endpoint is in terminating state
×
2570
                for _, endpoint := range newendpointslice.Endpoints {
×
2571
                        if endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating {
×
2572
                                del = true
×
2573
                                break
×
2574
                        }
2575
                }
2576
        }
2577
        if !del {
×
2578
                // if any one of endpoint moved from ready state to not-ready state
×
2579
                for ix := range oldendpointslice.Endpoints {
×
2580
                        oldips := cont.getEndpointSliceEpIps(&oldendpointslice.Endpoints[ix])
×
2581
                        for newIx := range newendpointslice.Endpoints {
×
2582
                                newips := cont.getEndpointSliceEpIps(&newendpointslice.Endpoints[newIx])
×
2583
                                if reflect.DeepEqual(oldips, newips) {
×
2584
                                        if (oldendpointslice.Endpoints[ix].Conditions.Ready != nil && *oldendpointslice.Endpoints[ix].Conditions.Ready) &&
×
2585
                                                (newendpointslice.Endpoints[newIx].Conditions.Ready != nil && !*newendpointslice.Endpoints[newIx].Conditions.Ready) {
×
2586
                                                del = true
×
2587
                                        }
×
2588
                                        break
×
2589
                                }
2590
                        }
2591
                }
2592
        }
2593
        return del
×
2594
}
2595

2596
func (cont *AciController) doendpointSliceUpdatedDelay(oldendpointslice *discovery.EndpointSlice,
2597
        newendpointslice *discovery.EndpointSlice) {
×
2598
        svc, ns, valid := getServiceNameAndNs(newendpointslice)
×
2599
        if !valid {
×
2600
                return
×
2601
        }
×
2602
        svckey, valid := getServiceKey(newendpointslice)
×
2603
        if !valid {
×
2604
                return
×
2605
        }
×
2606
        delay := cont.config.ServiceGraphEndpointAddDelay.Delay
×
2607
        svcDelay, exists := cont.svcInAddDelayList(svc, ns)
×
2608
        if svcDelay > 0 {
×
2609
                delay = svcDelay
×
2610
        }
×
2611
        delayedsvc := exists && delay > 0
×
2612
        if delayedsvc {
×
2613
                cont.log.Debug("Delay of ", delay, " seconds is applicable for svc :", svc, " in ns: ", ns)
×
2614
                var delayedepslice DelayedEpSlice
×
2615
                delayedepslice.OldEpSlice = oldendpointslice
×
2616
                delayedepslice.ServiceKey = svckey
×
2617
                delayedepslice.NewEpSlice = newendpointslice
×
2618
                currentTime := time.Now()
×
2619
                delayedepslice.DelayedTime = currentTime.Add(time.Duration(delay) * time.Second)
×
2620
                cont.indexMutex.Lock()
×
2621
                cont.delayedEpSlices = append(cont.delayedEpSlices, &delayedepslice)
×
2622
                cont.indexMutex.Unlock()
×
2623
        } else {
×
2624
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
2625
        }
×
2626

2627
        if delayedsvc && cont.isDeleteEndpointSlice(oldendpointslice, newendpointslice) {
×
2628
                cont.log.Debug("Proceeding by ignoring delay as the update is due to delete of endpoint")
×
2629
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
2630
        }
×
2631
}
2632

2633
func (cont *AciController) endpointSliceUpdated(oldobj, newobj interface{}) {
1✔
2634
        oldendpointslice, ok := oldobj.(*discovery.EndpointSlice)
1✔
2635
        if !ok {
1✔
2636
                cont.log.Error("error processing Endpointslice object: ", oldobj)
×
2637
                return
×
2638
        }
×
2639
        newendpointslice, ok := newobj.(*discovery.EndpointSlice)
1✔
2640
        if !ok {
1✔
2641
                cont.log.Error("error processing Endpointslice object: ", newobj)
×
2642
                return
×
2643
        }
×
2644
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
2645
                cont.doendpointSliceUpdatedDelay(oldendpointslice, newendpointslice)
×
2646
        } else {
1✔
2647
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
1✔
2648
        }
1✔
2649
}
2650

2651
func (cont *AciController) doendpointSliceUpdated(oldendpointslice *discovery.EndpointSlice,
2652
        newendpointslice *discovery.EndpointSlice) {
1✔
2653
        servicekey, valid := getServiceKey(newendpointslice)
1✔
2654
        if !valid {
1✔
2655
                return
×
2656
        }
×
2657
        oldIps := cont.getEndpointSliceIps(oldendpointslice)
1✔
2658
        newIps := cont.getEndpointSliceIps(newendpointslice)
1✔
2659
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
2660
                cont.indexMutex.Lock()
1✔
2661
                cont.queueIPNetPolUpdates(oldIps)
1✔
2662
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
2663
                cont.queueIPNetPolUpdates(newIps)
1✔
2664
                cont.indexMutex.Unlock()
1✔
2665
        }
1✔
2666

2667
        if !reflect.DeepEqual(oldendpointslice.Endpoints, newendpointslice.Endpoints) {
2✔
2668
                cont.queueEndpointSliceNetPolUpdates(oldendpointslice)
1✔
2669
                cont.queueEndpointSliceNetPolUpdates(newendpointslice)
1✔
2670
        }
1✔
2671
        cont.log.Debug("EndPointSlice Object Update: ", servicekey)
1✔
2672
        cont.queueServiceUpdateByKey(servicekey)
1✔
2673
}
2674

2675
func (cont *AciController) queueEndpointSliceNetPolUpdates(endpointslice *discovery.EndpointSlice) {
1✔
2676
        for _, endpoint := range endpointslice.Endpoints {
2✔
2677
                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" ||
1✔
2678
                        endpoint.TargetRef.Namespace == "" || endpoint.TargetRef.Name == "" {
2✔
2679
                        continue
1✔
2680
                }
2681
                if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
1✔
2682
                        continue
×
2683
                }
2684
                podkey := endpoint.TargetRef.Namespace + "/" + endpoint.TargetRef.Name
1✔
2685
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
2686
                ps := make(map[string]bool)
1✔
2687
                for _, npkey := range npkeys {
2✔
2688
                        cont.queueNetPolUpdateByKey(npkey)
1✔
2689
                }
1✔
2690
                // Process if the  any matching namedport wildcard policy is present
2691
                // ignore np already processed policies
2692
                cont.queueMatchingNamedNp(ps, podkey)
1✔
2693
        }
2694
}
2695

2696
func getServiceKey(endPointSlice *discovery.EndpointSlice) (string, bool) {
1✔
2697
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
1✔
2698
        if !ok {
1✔
2699
                return "", false
×
2700
        }
×
2701
        return endPointSlice.ObjectMeta.Namespace + "/" + serviceName, true
1✔
2702
}
2703

2704
func getServiceNameAndNs(endPointSlice *discovery.EndpointSlice) (string, string, bool) {
×
2705
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
×
2706
        if !ok {
×
2707
                return "", "", false
×
2708
        }
×
2709
        return serviceName, endPointSlice.ObjectMeta.Namespace, true
×
2710
}
2711

2712
// can be called with index lock
2713
func (sep *serviceEndpoint) UpdateServicesForNode(nodename string) {
1✔
2714
        cont := sep.cont
1✔
2715
        cache.ListAll(cont.endpointsIndexer, labels.Everything(),
1✔
2716
                func(endpointsobj interface{}) {
2✔
2717
                        endpoints := endpointsobj.(*v1.Endpoints)
1✔
2718
                        for _, subset := range endpoints.Subsets {
2✔
2719
                                for _, addr := range subset.Addresses {
2✔
2720
                                        if addr.NodeName != nil && *addr.NodeName == nodename {
2✔
2721
                                                servicekey, err :=
1✔
2722
                                                        cache.MetaNamespaceKeyFunc(endpointsobj.(*v1.Endpoints))
1✔
2723
                                                if err != nil {
1✔
2724
                                                        cont.log.Error("Could not create endpoints key: ", err)
×
2725
                                                        return
×
2726
                                                }
×
2727
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
2728
                                                return
1✔
2729
                                        }
2730
                                }
2731
                        }
2732
                })
2733
}
2734

2735
func (seps *serviceEndpointSlice) UpdateServicesForNode(nodename string) {
1✔
2736
        // 1. List all the endpointslice and check for matching nodename
1✔
2737
        // 2. if it matches trigger the Service update and mark it visited
1✔
2738
        cont := seps.cont
1✔
2739
        visited := make(map[string]bool)
1✔
2740
        cache.ListAll(cont.endpointSliceIndexer, labels.Everything(),
1✔
2741
                func(endpointSliceobj interface{}) {
2✔
2742
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2743
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2744
                                if endpoint.NodeName != nil && *endpoint.NodeName == nodename {
2✔
2745
                                        servicekey, valid := getServiceKey(endpointSlices)
1✔
2746
                                        if !valid {
1✔
2747
                                                return
×
2748
                                        }
×
2749
                                        if _, ok := visited[servicekey]; !ok {
2✔
2750
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
2751
                                                visited[servicekey] = true
1✔
2752
                                                return
1✔
2753
                                        }
1✔
2754
                                }
2755
                        }
2756
                })
2757
}
2758
func (cont *AciController) setNodeMap(nodeMap map[string]*metadata.ServiceEndpoint, nodeName string) {
1✔
2759
        nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
2760
        if !ok {
1✔
2761
                return
×
2762
        }
×
2763
        _, ok = cont.fabricPathForNode(nodeName)
1✔
2764
        if !ok {
2✔
2765
                return
1✔
2766
        }
1✔
2767
        nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
2768
}
2769

2770
// 2 cases when epslices corresponding to given service is presnt in delayedEpSlices:
2771
//  1. endpoint not present in delayedEpSlices of the service
2772
//  2. endpoint present in delayedEpSlices of the service but in not ready state
2773
//
2774
// indexMutex lock must be acquired before calling the function
2775
func (cont *AciController) isDelayedEndpoint(endpoint *discovery.Endpoint, svckey string) bool {
×
2776
        delayed := false
×
2777
        endpointips := cont.getEndpointSliceEpIps(endpoint)
×
2778
        for _, delayedepslices := range cont.delayedEpSlices {
×
2779
                if delayedepslices.ServiceKey == svckey {
×
2780
                        var found bool
×
2781
                        epslice := delayedepslices.OldEpSlice
×
2782
                        for ix := range epslice.Endpoints {
×
2783
                                epips := cont.getEndpointSliceEpIps(&epslice.Endpoints[ix])
×
2784
                                if reflect.DeepEqual(endpointips, epips) {
×
2785
                                        // case 2
×
2786
                                        if epslice.Endpoints[ix].Conditions.Ready != nil && !*epslice.Endpoints[ix].Conditions.Ready {
×
2787
                                                delayed = true
×
2788
                                        }
×
2789
                                        found = true
×
2790
                                }
2791
                        }
2792
                        // case 1
2793
                        if !found {
×
2794
                                delayed = true
×
2795
                        }
×
2796
                }
2797
        }
2798
        return delayed
×
2799
}
2800

2801
// set nodemap only if endoint is ready and not in delayedEpSlices
2802
func (cont *AciController) setNodeMapDelay(nodeMap map[string]*metadata.ServiceEndpoint,
2803
        endpoint *discovery.Endpoint, service *v1.Service) {
×
2804
        svckey, err := cache.MetaNamespaceKeyFunc(service)
×
2805
        if err != nil {
×
2806
                cont.log.Error("Could not create service key: ", err)
×
2807
                return
×
2808
        }
×
2809
        if cont.config.NoWaitForServiceEpReadiness ||
×
2810
                (endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready) {
×
2811
                if endpoint.NodeName != nil && *endpoint.NodeName != "" {
×
2812
                        // donot setNodeMap for endpoint if:
×
2813
                        //   endpoint is newly added
×
2814
                        //   endpoint status changed from not ready to ready
×
2815
                        if !cont.isDelayedEndpoint(endpoint, svckey) {
×
2816
                                cont.setNodeMap(nodeMap, *endpoint.NodeName)
×
2817
                        }
×
2818
                }
2819
        }
2820
}
2821

2822
func (sep *serviceEndpoint) GetnodesMetadata(key string,
2823
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
2824
        cont := sep.cont
1✔
2825
        endpointsobj, exists, err := cont.endpointsIndexer.GetByKey(key)
1✔
2826
        if err != nil {
1✔
2827
                cont.log.Error("Could not lookup endpoints for " +
×
2828
                        key + ": " + err.Error())
×
2829
        }
×
2830
        if exists && endpointsobj != nil {
2✔
2831
                endpoints := endpointsobj.(*v1.Endpoints)
1✔
2832
                for _, subset := range endpoints.Subsets {
2✔
2833
                        for _, addr := range subset.Addresses {
2✔
2834
                                if addr.NodeName == nil {
2✔
2835
                                        continue
1✔
2836
                                }
2837
                                cont.setNodeMap(nodeMap, *addr.NodeName)
1✔
2838
                        }
2839
                }
2840
        }
2841
        cont.log.Info("NodeMap: ", nodeMap)
1✔
2842
}
2843

2844
func (seps *serviceEndpointSlice) GetnodesMetadata(key string,
2845
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
2846
        cont := seps.cont
1✔
2847
        // 1. Get all the Endpoint slices matching the label service-name
1✔
2848
        // 2. update the node map matching with endpoints nodes name
1✔
2849
        label := map[string]string{discovery.LabelServiceName: service.ObjectMeta.Name}
1✔
2850
        selector := labels.SelectorFromSet(label)
1✔
2851
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2852
                func(endpointSliceobj interface{}) {
2✔
2853
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2854
                        for ix := range endpointSlices.Endpoints {
2✔
2855
                                if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
2856
                                        cont.setNodeMapDelay(nodeMap, &endpointSlices.Endpoints[ix], service)
×
2857
                                } else if cont.config.NoWaitForServiceEpReadiness ||
1✔
2858
                                        (endpointSlices.Endpoints[ix].Conditions.Ready != nil && *endpointSlices.Endpoints[ix].Conditions.Ready) {
2✔
2859
                                        if endpointSlices.Endpoints[ix].NodeName != nil && *endpointSlices.Endpoints[ix].NodeName != "" {
2✔
2860
                                                cont.setNodeMap(nodeMap, *endpointSlices.Endpoints[ix].NodeName)
1✔
2861
                                        }
1✔
2862
                                }
2863
                        }
2864
                })
2865
        cont.log.Debug("NodeMap: ", nodeMap)
1✔
2866
}
2867

2868
func (sep *serviceEndpoint) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2869
        cont := sep.cont
1✔
2870
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
2871
        if err != nil {
1✔
2872
                serviceLogger(cont.log, service).
×
2873
                        Error("Could not create service key: ", err)
×
2874
                return false
×
2875
        }
×
2876
        endpointsobj, _, err := cont.endpointsIndexer.GetByKey(key)
1✔
2877
        if err != nil {
1✔
2878
                cont.log.Error("Could not lookup endpoints for " +
×
2879
                        key + ": " + err.Error())
×
2880
                return false
×
2881
        }
×
2882
        if endpointsobj != nil {
2✔
2883
                for _, subset := range endpointsobj.(*v1.Endpoints).Subsets {
2✔
2884
                        for _, addr := range subset.Addresses {
2✔
2885
                                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" {
1✔
2886
                                        continue
×
2887
                                }
2888
                                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(),
1✔
2889
                                        addr.TargetRef.Name))
1✔
2890
                        }
2891
                }
2892
        }
2893
        return true
1✔
2894
}
2895

2896
func (seps *serviceEndpointSlice) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2897
        cont := seps.cont
1✔
2898
        label := map[string]string{discovery.LabelServiceName: service.ObjectMeta.Name}
1✔
2899
        selector := labels.SelectorFromSet(label)
1✔
2900
        epcount := 0
1✔
2901
        childs := make(map[string]struct{})
1✔
2902
        var exists = struct{}{}
1✔
2903
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2904
                func(endpointSliceobj interface{}) {
2✔
2905
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2906
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2907
                                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" {
1✔
2908
                                        continue
×
2909
                                }
2910
                                epcount++
1✔
2911
                                childs[endpoint.TargetRef.Name] = exists
1✔
2912
                                cont.log.Debug("EndPoint added: ", endpoint.TargetRef.Name)
1✔
2913
                        }
2914
                })
2915
        for child := range childs {
2✔
2916
                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(), child))
1✔
2917
        }
1✔
2918
        return epcount != 0
1✔
2919
}
2920

2921
func getProtocolStr(proto v1.Protocol) string {
1✔
2922
        var protostring string
1✔
2923
        switch proto {
1✔
2924
        case v1.ProtocolUDP:
1✔
2925
                protostring = "udp"
1✔
2926
        case v1.ProtocolTCP:
1✔
2927
                protostring = "tcp"
1✔
2928
        case v1.ProtocolSCTP:
×
2929
                protostring = "sctp"
×
2930
        default:
×
2931
                protostring = "tcp"
×
2932
        }
2933
        return protostring
1✔
2934
}
2935

2936
func (cont *AciController) removeIpFromIngressIPList(ingressIps *[]net.IP, ip net.IP) {
×
2937
        cont.returnServiceIps([]net.IP{ip})
×
2938
        index := -1
×
2939
        for i, v := range *ingressIps {
×
2940
                if v.Equal(ip) {
×
2941
                        index = i
×
2942
                        break
×
2943
                }
2944
        }
2945
        if index == -1 {
×
2946
                return
×
2947
        }
×
2948
        *ingressIps = append((*ingressIps)[:index], (*ingressIps)[index+1:]...)
×
2949
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc