• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 11458

19 Dec 2025 11:30AM UTC coverage: 63.107% (-0.03%) from 63.133%
11458

push

travis-pro

allenantony-oc
Add support for port range in network policy

The aci-containers-controller previously ignored the endPort field
in NetworkPolicy rules. This resulted in incorrect ACI hostprotRule
objects that only enabled the starting port of an intended range.

This commit updates the controller to correctly process endPort.
It now maps the port field to fromPort and endPort to toPort,
ensuring that port ranges defined in a NetworkPolicy are properly
enforced in ACI.

31 of 44 new or added lines in 1 file covered. (70.45%)

8 existing lines in 3 files now uncovered.

13380 of 21202 relevant lines covered (63.11%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.52
/pkg/controller/services.go
1
// Copyright 2016 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "errors"
19
        "fmt"
20
        "net"
21
        "reflect"
22
        "regexp"
23
        "sort"
24
        "strconv"
25
        "strings"
26
        "time"
27

28
        "github.com/noironetworks/aci-containers/pkg/apicapi"
29
        "github.com/noironetworks/aci-containers/pkg/metadata"
30
        "github.com/noironetworks/aci-containers/pkg/util"
31
        "github.com/sirupsen/logrus"
32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35
        "k8s.io/apimachinery/pkg/fields"
36
        "k8s.io/apimachinery/pkg/labels"
37
        "k8s.io/client-go/kubernetes"
38
        "k8s.io/client-go/tools/cache"
39
)
40

41
// Default service contract scope value
42
const DefaultServiceContractScope = "context"
43

44
// Default service ext subnet scope - enable shared security
45
const DefaultServiceExtSubNetShared = false
46

47
func (cont *AciController) initEndpointsInformerFromClient(
48
        kubeClient kubernetes.Interface) {
×
49
        cont.initEndpointsInformerBase(
×
50
                cache.NewListWatchFromClient(
×
51
                        kubeClient.CoreV1().RESTClient(), "endpoints",
×
52
                        metav1.NamespaceAll, fields.Everything()))
×
53
}
×
54

55
func (cont *AciController) initEndpointSliceInformerFromClient(
56
        kubeClient kubernetes.Interface) {
×
57
        cont.initEndpointSliceInformerBase(
×
58
                cache.NewListWatchFromClient(
×
59
                        kubeClient.DiscoveryV1().RESTClient(), "endpointslices",
×
60
                        metav1.NamespaceAll, fields.Everything()))
×
61
}
×
62

63
func (cont *AciController) initEndpointSliceInformerBase(listWatch *cache.ListWatch) {
1✔
64
        cont.endpointSliceIndexer, cont.endpointSliceInformer = cache.NewIndexerInformer(
1✔
65
                listWatch, &discovery.EndpointSlice{}, 0,
1✔
66
                cache.ResourceEventHandlerFuncs{
1✔
67
                        AddFunc: func(obj interface{}) {
2✔
68
                                cont.endpointSliceAdded(obj)
1✔
69
                        },
1✔
70
                        UpdateFunc: func(oldobj interface{}, newobj interface{}) {
1✔
71
                                cont.endpointSliceUpdated(oldobj, newobj)
1✔
72
                        },
1✔
73
                        DeleteFunc: func(obj interface{}) {
1✔
74
                                cont.endpointSliceDeleted(obj)
1✔
75
                        },
1✔
76
                },
77
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
78
        )
79
}
80

81
func (cont *AciController) initEndpointsInformerBase(listWatch *cache.ListWatch) {
1✔
82
        cont.endpointsIndexer, cont.endpointsInformer = cache.NewIndexerInformer(
1✔
83
                listWatch, &v1.Endpoints{}, 0,
1✔
84
                cache.ResourceEventHandlerFuncs{
1✔
85
                        AddFunc: func(obj interface{}) {
2✔
86
                                cont.endpointsAdded(obj)
1✔
87
                        },
1✔
88
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
89
                                cont.endpointsUpdated(old, new)
1✔
90
                        },
1✔
91
                        DeleteFunc: func(obj interface{}) {
1✔
92
                                cont.endpointsDeleted(obj)
1✔
93
                        },
1✔
94
                },
95
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
96
        )
97
}
98

99
func (cont *AciController) initServiceInformerFromClient(
100
        kubeClient *kubernetes.Clientset) {
×
101
        cont.initServiceInformerBase(
×
102
                cache.NewListWatchFromClient(
×
103
                        kubeClient.CoreV1().RESTClient(), "services",
×
104
                        metav1.NamespaceAll, fields.Everything()))
×
105
}
×
106

107
func (cont *AciController) initServiceInformerBase(listWatch *cache.ListWatch) {
1✔
108
        cont.serviceIndexer, cont.serviceInformer = cache.NewIndexerInformer(
1✔
109
                listWatch, &v1.Service{}, 0,
1✔
110
                cache.ResourceEventHandlerFuncs{
1✔
111
                        AddFunc: func(obj interface{}) {
2✔
112
                                cont.serviceAdded(obj)
1✔
113
                        },
1✔
114
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
115
                                cont.serviceUpdated(old, new)
1✔
116
                        },
1✔
117
                        DeleteFunc: func(obj interface{}) {
×
118
                                cont.serviceDeleted(obj)
×
119
                        },
×
120
                },
121
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
122
        )
123
}
124

125
func serviceLogger(log *logrus.Logger, as *v1.Service) *logrus.Entry {
1✔
126
        return log.WithFields(logrus.Fields{
1✔
127
                "namespace": as.ObjectMeta.Namespace,
1✔
128
                "name":      as.ObjectMeta.Name,
1✔
129
                "type":      as.Spec.Type,
1✔
130
        })
1✔
131
}
1✔
132

133
func (cont *AciController) queueIPNetPolUpdates(ips map[string]bool) {
1✔
134
        for ipStr := range ips {
2✔
135
                ip := net.ParseIP(ipStr)
1✔
136
                if ip == nil {
1✔
137
                        continue
×
138
                }
139
                entries, err := cont.netPolSubnetIndex.ContainingNetworks(ip)
1✔
140
                if err != nil {
1✔
141
                        cont.log.Error("Corrupted network policy IP index, err: ", err)
×
142
                        return
×
143
                }
×
144
                for _, entry := range entries {
2✔
145
                        for npkey := range entry.(*ipIndexEntry).keys {
2✔
146
                                cont.queueNetPolUpdateByKey(npkey)
1✔
147
                        }
1✔
148
                }
149
        }
150
}
151

152
func (cont *AciController) queuePortNetPolUpdates(ports map[string]targetPort) {
1✔
153
        for portkey := range ports {
2✔
154
                entry := cont.targetPortIndex[portkey]
1✔
155
                if entry == nil {
2✔
156
                        continue
1✔
157
                }
158
                for npkey := range entry.networkPolicyKeys {
2✔
159
                        cont.queueNetPolUpdateByKey(npkey)
1✔
160
                }
1✔
161
        }
162
}
163

164
func (cont *AciController) queueNetPolForEpAddrs(addrs []v1.EndpointAddress) {
1✔
165
        for _, addr := range addrs {
2✔
166
                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" ||
1✔
167
                        addr.TargetRef.Namespace == "" || addr.TargetRef.Name == "" {
2✔
168
                        continue
1✔
169
                }
170
                podkey := addr.TargetRef.Namespace + "/" + addr.TargetRef.Name
1✔
171
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
172
                ps := make(map[string]bool)
1✔
173
                for _, npkey := range npkeys {
1✔
UNCOV
174
                        cont.queueNetPolUpdateByKey(npkey)
×
UNCOV
175
                        ps[npkey] = true
×
UNCOV
176
                }
×
177
                // Process if the  any matching namedport wildcard policy is present
178
                // ignore np already processed policies
179
                cont.queueMatchingNamedNp(ps, podkey)
1✔
180
        }
181
}
182

183
func (cont *AciController) queueMatchingNamedNp(served map[string]bool, podkey string) {
1✔
184
        cont.indexMutex.Lock()
1✔
185
        for npkey := range cont.nmPortNp {
2✔
186
                if _, ok := served[npkey]; !ok {
2✔
187
                        if cont.checkPodNmpMatchesNp(npkey, podkey) {
2✔
188
                                cont.queueNetPolUpdateByKey(npkey)
1✔
189
                        }
1✔
190
                }
191
        }
192
        cont.indexMutex.Unlock()
1✔
193
}
194
func (cont *AciController) queueEndpointsNetPolUpdates(endpoints *v1.Endpoints) {
1✔
195
        for _, subset := range endpoints.Subsets {
2✔
196
                cont.queueNetPolForEpAddrs(subset.Addresses)
1✔
197
                cont.queueNetPolForEpAddrs(subset.NotReadyAddresses)
1✔
198
        }
1✔
199
}
200

201
func (cont *AciController) returnServiceIps(ips []net.IP) {
1✔
202
        for _, ip := range ips {
2✔
203
                if ip.To4() != nil {
2✔
204
                        cont.serviceIps.DeallocateIp(ip)
1✔
205
                } else if ip.To16() != nil {
3✔
206
                        cont.serviceIps.DeallocateIp(ip)
1✔
207
                }
1✔
208
        }
209
}
210

211
func returnIps(pool *netIps, ips []net.IP) {
1✔
212
        for _, ip := range ips {
2✔
213
                if ip.To4() != nil {
2✔
214
                        pool.V4.AddIp(ip)
1✔
215
                } else if ip.To16() != nil {
3✔
216
                        pool.V6.AddIp(ip)
1✔
217
                }
1✔
218
        }
219
}
220

221
func (cont *AciController) staticMonPolName() string {
1✔
222
        return cont.aciNameForKey("monPol", cont.env.ServiceBd())
1✔
223
}
1✔
224

225
func (cont *AciController) staticMonPolDn() string {
1✔
226
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
227
                return fmt.Sprintf("uni/tn-%s/ipslaMonitoringPol-%s",
1✔
228
                        cont.config.AciVrfTenant, cont.staticMonPolName())
1✔
229
        }
1✔
230
        return ""
×
231
}
232

233
func (cont *AciController) staticServiceObjs() apicapi.ApicSlice {
1✔
234
        var serviceObjs apicapi.ApicSlice
1✔
235

1✔
236
        // Service bridge domain
1✔
237
        bdName := cont.aciNameForKey("bd", cont.env.ServiceBd())
1✔
238
        bd := apicapi.NewFvBD(cont.config.AciVrfTenant, bdName)
1✔
239
        if apicapi.ApicVersion >= "6.0(4c)" {
1✔
240
                bd.SetAttr("serviceBdRoutingDisable", "yes")
×
241
        }
×
242
        bd.SetAttr("arpFlood", "yes")
1✔
243
        bd.SetAttr("ipLearning", "no")
1✔
244
        bd.SetAttr("unkMacUcastAct", cont.config.UnknownMacUnicastAction)
1✔
245
        bdToOut := apicapi.NewRsBdToOut(bd.GetDn(), cont.config.AciL3Out)
1✔
246
        bd.AddChild(bdToOut)
1✔
247
        bdToVrf := apicapi.NewRsCtx(bd.GetDn(), cont.config.AciVrf)
1✔
248
        bd.AddChild(bdToVrf)
1✔
249

1✔
250
        bdn := bd.GetDn()
1✔
251
        for _, cidr := range cont.config.NodeServiceSubnets {
2✔
252
                sn := apicapi.NewFvSubnet(bdn, cidr)
1✔
253
                bd.AddChild(sn)
1✔
254
        }
1✔
255
        serviceObjs = append(serviceObjs, bd)
1✔
256

1✔
257
        // Service IP SLA monitoring policy
1✔
258
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
259
                monPol := apicapi.NewFvIPSLAMonitoringPol(cont.config.AciVrfTenant,
1✔
260
                        cont.staticMonPolName())
1✔
261
                monPol.SetAttr("slaFrequency",
1✔
262
                        strconv.Itoa(cont.config.AciServiceMonitorInterval))
1✔
263
                serviceObjs = append(serviceObjs, monPol)
1✔
264
        }
1✔
265

266
        return serviceObjs
1✔
267
}
268

269
func (cont *AciController) initStaticServiceObjs() {
1✔
270
        cont.apicConn.WriteApicObjects(cont.config.AciPrefix+"_service_static",
1✔
271
                cont.staticServiceObjs())
1✔
272
}
1✔
273

274
func (cont *AciController) updateServicesForNode(nodename string) {
1✔
275
        cont.serviceEndPoints.UpdateServicesForNode(nodename)
1✔
276
}
1✔
277

278
// must have index lock
279
func (cont *AciController) getActiveFabricPathDn(node string) string {
×
280
        var fabricPathDn string
×
281
        sz := len(cont.nodeOpflexDevice[node])
×
282
        for i := range cont.nodeOpflexDevice[node] {
×
283
                device := cont.nodeOpflexDevice[node][sz-1-i]
×
284
                if device.GetAttrStr("state") == "connected" {
×
285
                        fabricPathDn = device.GetAttrStr("fabricPathDn")
×
286
                        break
×
287
                }
288
        }
289
        return fabricPathDn
×
290
}
291

292
func (cont *AciController) getOpflexOdevCount(node string) (int, string) {
×
293
        fabricPathDnCount := make(map[string]int)
×
294
        var maxCount int
×
295
        var fabricPathDn string
×
296
        for _, device := range cont.nodeOpflexDevice[node] {
×
297
                fabricpathdn := device.GetAttrStr("fabricPathDn")
×
298
                count, ok := fabricPathDnCount[fabricpathdn]
×
299
                if ok {
×
300
                        fabricPathDnCount[fabricpathdn] = count + 1
×
301
                } else {
×
302
                        fabricPathDnCount[fabricpathdn] = 1
×
303
                }
×
304
                if fabricPathDnCount[fabricpathdn] >= maxCount {
×
305
                        maxCount = fabricPathDnCount[fabricpathdn]
×
306
                        fabricPathDn = fabricpathdn
×
307
                }
×
308
        }
309
        return maxCount, fabricPathDn
×
310
}
311

312
func deleteDevicesFromList(delDevices, devices apicapi.ApicSlice) apicapi.ApicSlice {
×
313
        var newDevices apicapi.ApicSlice
×
314
        for _, device := range devices {
×
315
                found := false
×
316
                for _, delDev := range delDevices {
×
317
                        if reflect.DeepEqual(delDev, device) {
×
318
                                found = true
×
319
                        }
×
320
                }
321
                if !found {
×
322
                        newDevices = append(newDevices, device)
×
323
                }
×
324
        }
325
        return newDevices
×
326
}
327

328
func (cont *AciController) getAciPodSubnet(pod string) (string, error) {
×
329
        podslice := strings.Split(pod, "-")
×
330
        if len(podslice) < 2 {
×
331
                return "", fmt.Errorf("Failed to get podid from pod")
×
332
        }
×
333
        podid := podslice[1]
×
334
        var subnet string
×
335
        args := []string{
×
336
                "query-target=self",
×
337
        }
×
338
        url := fmt.Sprintf("/api/node/mo/uni/controller/setuppol/setupp-%s.json?%s", podid, strings.Join(args, "&"))
×
339
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
340
        if err != nil {
×
341
                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
342
                return subnet, err
×
343
        }
×
344
        for _, obj := range apicresp.Imdata {
×
345
                for _, body := range obj {
×
346
                        tepPool, ok := body.Attributes["tepPool"].(string)
×
347
                        if ok {
×
348
                                subnet = tepPool
×
349
                                break
×
350
                        }
351
                }
352
        }
353
        return subnet, nil
×
354
}
355

356
func (cont *AciController) isSingleOpflexOdev(fabricPathDn string) (bool, error) {
×
357
        pathSlice := strings.Split(fabricPathDn, "/")
×
358
        if len(pathSlice) > 2 {
×
359
                path := pathSlice[2]
×
360
                // topology/<aci_pod_name>/paths-<id>/pathep-<iface> - fabricPathDn if
×
361
                // host is connnected via single link
×
362
                // topology/<aci_pod_name>/protpaths-<id_1>-<id_2>/pathep-<iface> - fabricPathDn if
×
363
                // host is connected to vpc pair
×
364
                if strings.Contains(path, "protpaths-") {
×
365
                        args := []string{
×
366
                                "query-target=self",
×
367
                        }
×
368
                        url := fmt.Sprintf("/api/node/class/vpcDom.json?%s", strings.Join(args, "&"))
×
369
                        apicresp, err := cont.apicConn.GetApicResponse(url)
×
370
                        if err != nil {
×
371
                                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
372
                                return false, err
×
373
                        }
×
374
                        nodeIdSlice := strings.Split(path, "-")
×
375
                        if len(nodeIdSlice) != 3 {
×
376
                                cont.log.Error("Invalid fabricPathDn ", fabricPathDn)
×
377
                                return false, fmt.Errorf("Invalid path in fabricPathDn %s", fabricPathDn)
×
378
                        }
×
379
                        // As host is connected to vpc pair, check if the status of any of the vpcDom is down
380
                        upCount := 0
×
381
                        for i, nodeId := range nodeIdSlice {
×
382
                                instDn := fmt.Sprintf("topology/%s/node-%s/sys/vpc/inst", pathSlice[1], nodeId)
×
383
                                if i == 0 {
×
384
                                        continue
×
385
                                }
386
                                for _, obj := range apicresp.Imdata {
×
387
                                        for _, body := range obj {
×
388
                                                dn, ok := body.Attributes["dn"].(string)
×
389
                                                if ok && strings.Contains(dn, instDn) {
×
390
                                                        peerSt, ok := body.Attributes["peerSt"].(string)
×
391
                                                        if ok && peerSt == "up" {
×
392
                                                                upCount++
×
393
                                                        }
×
394
                                                }
395
                                        }
396
                                }
397
                        }
398
                        if upCount < 2 {
×
399
                                return true, nil
×
400
                        }
×
401
                        return false, nil
×
402
                } else {
×
403
                        return true, nil
×
404
                }
×
405
        }
406
        return false, fmt.Errorf("Invalid fabricPathDn %s ", fabricPathDn)
×
407
}
408

409
func (cont *AciController) createAciPodAnnotation(node string) (aciPodAnnot, error) {
×
410
        odevCount, fabricPathDn := cont.getOpflexOdevCount(node)
×
411
        nodeAciPodAnnot := cont.nodeACIPod[node]
×
412
        isSingleOdev := false
×
413
        if odevCount == 1 {
×
414
                var err error
×
415
                isSingleOdev, err = cont.isSingleOpflexOdev(fabricPathDn)
×
416
                if err != nil {
×
417
                        cont.log.Error(err)
×
418
                        return nodeAciPodAnnot, err
×
419
                }
×
420
        }
421
        if (odevCount == 0) ||
×
422
                (odevCount == 1 && !isSingleOdev) {
×
423
                if nodeAciPodAnnot.aciPod != "none" {
×
424
                        if nodeAciPodAnnot.disconnectTime.IsZero() {
×
425
                                nodeAciPodAnnot.disconnectTime = time.Now()
×
426
                        } else {
×
427
                                currentTime := time.Now()
×
428
                                diff := currentTime.Sub(nodeAciPodAnnot.disconnectTime)
×
429
                                if diff.Seconds() > float64(cont.config.OpflexDeviceReconnectWaitTimeout) {
×
430
                                        nodeAciPodAnnot.aciPod = "none"
×
431
                                        nodeAciPodAnnot.disconnectTime = time.Time{}
×
432
                                }
×
433
                        }
434
                }
435
                return nodeAciPodAnnot, nil
×
436
        } else if (odevCount == 2) ||
×
437
                (odevCount == 1 && isSingleOdev) {
×
438
                // when there is already a connected opflex device,
×
439
                // fabricPathDn will have latest pod iformation
×
440
                // and annotation will be in the form pod-<podid>-<subnet of pod>
×
441
                nodeAciPodAnnot.disconnectTime = time.Time{}
×
442
                nodeAciPod := nodeAciPodAnnot.aciPod
×
443
                pathSlice := strings.Split(fabricPathDn, "/")
×
444
                if len(pathSlice) > 1 {
×
445
                        pod := pathSlice[1]
×
446

×
447
                        // when there is difference in pod info avaliable from fabricPathDn
×
448
                        // and what we have in cache, update info in cache and change annotation on node
×
449
                        if !strings.Contains(nodeAciPod, pod) {
×
450
                                subnet, err := cont.getAciPodSubnet(pod)
×
451
                                if err != nil {
×
452
                                        cont.log.Error("Failed to get subnet of aci pod ", err.Error())
×
453
                                        return nodeAciPodAnnot, err
×
454
                                } else {
×
455
                                        nodeAciPodAnnot.aciPod = pod + "-" + subnet
×
456
                                        return nodeAciPodAnnot, nil
×
457
                                }
×
458
                        } else {
×
459
                                return nodeAciPodAnnot, nil
×
460
                        }
×
461
                } else {
×
462
                        cont.log.Error("Invalid fabricPathDn of opflexOdev of node ", node)
×
463
                        return nodeAciPodAnnot, fmt.Errorf("Invalid fabricPathDn of opflexOdev")
×
464
                }
×
465
        }
466
        return nodeAciPodAnnot, fmt.Errorf("Failed to get annotation for node %s", node)
×
467
}
468

469
func (cont *AciController) createNodeAciPodAnnotation(node string) (aciPodAnnot, error) {
×
470
        odevCount, fabricPathDn := cont.getOpflexOdevCount(node)
×
471
        nodeAciPodAnnot := cont.nodeACIPodAnnot[node]
×
472
        isSingleOdev := false
×
473
        if odevCount == 1 {
×
474
                var err error
×
475
                isSingleOdev, err = cont.isSingleOpflexOdev(fabricPathDn)
×
476
                if err != nil {
×
477
                        cont.log.Error(err)
×
478
                        return nodeAciPodAnnot, err
×
479
                }
×
480
        }
481
        if (odevCount == 0) ||
×
482
                (odevCount == 1 && !isSingleOdev) {
×
483
                if nodeAciPodAnnot.aciPod != "none" {
×
484
                        nodeAciPodAnnot.aciPod = "none"
×
485
                }
×
486
                return nodeAciPodAnnot, nil
×
487
        } else if (odevCount == 2) ||
×
488
                (odevCount == 1 && isSingleOdev) {
×
489
                pathSlice := strings.Split(fabricPathDn, "/")
×
490
                if len(pathSlice) > 1 {
×
491

×
492
                        nodeAciPodAnnot.aciPod = pathSlice[1]
×
493
                        return nodeAciPodAnnot, nil
×
494
                } else {
×
495
                        cont.log.Error("Invalid fabricPathDn of opflexOdev of node ", node)
×
496
                        return nodeAciPodAnnot, fmt.Errorf("Invalid fabricPathDn of opflexOdev")
×
497
                }
×
498
        }
499
        return nodeAciPodAnnot, fmt.Errorf("Failed to get annotation for node %s", node)
×
500
}
501

502
func (cont *AciController) checkChangeOfOpflexOdevAciPod() {
×
503
        var nodeAnnotationUpdates []string
×
504
        cont.apicConn.SyncMutex.Lock()
×
505
        syncDone := cont.apicConn.SyncDone
×
506
        cont.apicConn.SyncMutex.Unlock()
×
507

×
508
        if !syncDone {
×
509
                return
×
510
        }
×
511

512
        cont.indexMutex.Lock()
×
513
        for node := range cont.nodeACIPodAnnot {
×
514
                annot, err := cont.createNodeAciPodAnnotation(node)
×
515
                if err != nil {
×
516
                        if strings.Contains(fmt.Sprint(err), "Failed to get annotation") {
×
517
                                now := time.Now()
×
518
                                if annot.lastErrorTime.IsZero() || now.Sub(annot.lastErrorTime).Seconds() >= 60 {
×
519
                                        annot.lastErrorTime = now
×
520
                                        cont.nodeACIPodAnnot[node] = annot
×
521
                                        cont.log.Error(err.Error())
×
522
                                }
×
523
                        } else {
×
524
                                cont.log.Error(err.Error())
×
525
                        }
×
526
                } else {
×
527
                        if annot != cont.nodeACIPodAnnot[node] {
×
528
                                cont.nodeACIPodAnnot[node] = annot
×
529
                                nodeAnnotationUpdates = append(nodeAnnotationUpdates, node)
×
530
                        }
×
531
                }
532
        }
533
        cont.indexMutex.Unlock()
×
534
        if len(nodeAnnotationUpdates) > 0 {
×
535
                for _, updatednode := range nodeAnnotationUpdates {
×
536
                        go cont.env.NodeAnnotationChanged(updatednode)
×
537
                }
×
538
        }
539
}
540

541
func (cont *AciController) checkChangeOfOdevAciPod() {
×
542
        var nodeAnnotationUpdates []string
×
543
        cont.apicConn.SyncMutex.Lock()
×
544
        syncDone := cont.apicConn.SyncDone
×
545
        cont.apicConn.SyncMutex.Unlock()
×
546

×
547
        if !syncDone {
×
548
                return
×
549
        }
×
550

551
        cont.indexMutex.Lock()
×
552
        for node := range cont.nodeACIPod {
×
553
                annot, err := cont.createAciPodAnnotation(node)
×
554
                if err != nil {
×
555
                        if strings.Contains(fmt.Sprint(err), "Failed to get annotation") {
×
556
                                now := time.Now()
×
557
                                if annot.lastErrorTime.IsZero() || now.Sub(annot.lastErrorTime).Seconds() >= 60 {
×
558
                                        annot.lastErrorTime = now
×
559
                                        cont.nodeACIPod[node] = annot
×
560
                                        cont.log.Error(err.Error())
×
561
                                }
×
562
                        } else {
×
563
                                cont.log.Error(err.Error())
×
564
                        }
×
565
                } else {
×
566
                        if annot != cont.nodeACIPod[node] {
×
567
                                cont.nodeACIPod[node] = annot
×
568
                                nodeAnnotationUpdates = append(nodeAnnotationUpdates, node)
×
569
                        }
×
570
                }
571
        }
572
        cont.indexMutex.Unlock()
×
573
        if len(nodeAnnotationUpdates) > 0 {
×
574
                for _, updatednode := range nodeAnnotationUpdates {
×
575
                        go cont.env.NodeAnnotationChanged(updatednode)
×
576
                }
×
577
        }
578
}
579

580
func (cont *AciController) deleteOldOpflexDevices() {
1✔
581
        var nodeUpdates []string
1✔
582
        cont.indexMutex.Lock()
1✔
583
        for node, devices := range cont.nodeOpflexDevice {
1✔
584
                var delDevices apicapi.ApicSlice
×
585
                fabricPathDn := cont.getActiveFabricPathDn(node)
×
586
                if fabricPathDn != "" {
×
587
                        for _, device := range devices {
×
588
                                if device.GetAttrStr("delete") == "true" && device.GetAttrStr("fabricPathDn") != fabricPathDn {
×
589
                                        deleteTimeStr := device.GetAttrStr("deleteTime")
×
590
                                        deleteTime, err := time.Parse(time.RFC3339, deleteTimeStr)
×
591
                                        if err != nil {
×
592
                                                cont.log.Error("Failed to parse opflex device delete time: ", err)
×
593
                                                continue
×
594
                                        }
595
                                        now := time.Now()
×
596
                                        diff := now.Sub(deleteTime)
×
597
                                        if diff.Seconds() >= cont.config.OpflexDeviceDeleteTimeout {
×
598
                                                delDevices = append(delDevices, device)
×
599
                                        }
×
600
                                }
601
                        }
602
                        if len(delDevices) > 0 {
×
603
                                newDevices := deleteDevicesFromList(delDevices, devices)
×
604
                                cont.nodeOpflexDevice[node] = newDevices
×
605
                                cont.log.Info("Opflex device list for node ", node, " after deleting stale entries: ", cont.nodeOpflexDevice[node])
×
606
                                if len(newDevices) == 0 {
×
607
                                        delete(cont.nodeOpflexDevice, node)
×
608
                                }
×
609
                                nodeUpdates = append(nodeUpdates, node)
×
610
                        }
611
                }
612
        }
613
        cont.indexMutex.Unlock()
1✔
614
        if len(nodeUpdates) > 0 {
1✔
615
                cont.postOpflexDeviceDelete(nodeUpdates)
×
616
        }
×
617
}
618

619
// must have index lock
620
func (cont *AciController) setDeleteFlagForOldDevices(node, fabricPathDn string) {
1✔
621
        for _, device := range cont.nodeOpflexDevice[node] {
2✔
622
                if device.GetAttrStr("fabricPathDn") != fabricPathDn {
1✔
623
                        t := time.Now()
×
624
                        device.SetAttr("delete", "true")
×
625
                        device.SetAttr("deleteTime", t.Format(time.RFC3339))
×
626
                }
×
627
        }
628
}
629

630
// must have index lock
631
func (cont *AciController) fabricPathForNode(name string) (string, bool) {
1✔
632
        sz := len(cont.nodeOpflexDevice[name])
1✔
633
        for i := range cont.nodeOpflexDevice[name] {
2✔
634
                device := cont.nodeOpflexDevice[name][sz-1-i]
1✔
635
                deviceState := device.GetAttrStr("state")
1✔
636
                if deviceState == "connected" {
2✔
637
                        if deviceState != device.GetAttrStr("prevState") {
2✔
638
                                cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Info("Processing fabric path for node ",
1✔
639
                                        "when connected device state is found")
1✔
640
                                device.SetAttr("prevState", deviceState)
1✔
641
                        }
1✔
642
                        fabricPathDn := device.GetAttrStr("fabricPathDn")
1✔
643
                        cont.setDeleteFlagForOldDevices(name, fabricPathDn)
1✔
644
                        return fabricPathDn, true
1✔
645
                } else {
1✔
646
                        device.SetAttr("prevState", deviceState)
1✔
647
                }
1✔
648
        }
649
        if sz > 0 {
2✔
650
                // When the opflex-device for a node changes, for example during a live migration,
1✔
651
                // we end up with both the old and the new device objects till the old object
1✔
652
                // ages out on APIC. The new object is at end of the devices list (see opflexDeviceChanged),
1✔
653
                // so we return the fabricPathDn of the last opflex-device.
1✔
654
                cont.fabricPathLogger(cont.nodeOpflexDevice[name][sz-1].GetAttrStr("hostName"),
1✔
655
                        cont.nodeOpflexDevice[name][sz-1]).Info("Processing fabricPathDn for node")
1✔
656
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("fabricPathDn"), true
1✔
657
        }
1✔
658
        return "", false
1✔
659
}
660

661
// must have index lock
662
func (cont *AciController) deviceMacForNode(name string) (string, bool) {
1✔
663
        sz := len(cont.nodeOpflexDevice[name])
1✔
664
        if sz > 0 {
2✔
665
                // When the opflex-device for a node changes, for example when the
1✔
666
                // node is recreated, we end up with both the old and the new
1✔
667
                // device objects till the old object ages out on APIC. The
1✔
668
                // new object is at end of the devices list (see
1✔
669
                // opflexDeviceChanged), so we return the MAC address of the
1✔
670
                // last opflex-device.
1✔
671
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("mac"), true
1✔
672
        }
1✔
673
        return "", false
1✔
674
}
675

676
func apicRedirectDst(rpDn string, ip string, mac string,
677
        descr string, healthGroupDn string, enablePbrTracking bool) apicapi.ApicObject {
1✔
678
        dst := apicapi.NewVnsRedirectDest(rpDn, ip, mac).SetAttr("descr", descr)
1✔
679
        if healthGroupDn != "" && enablePbrTracking {
2✔
680
                dst.AddChild(apicapi.NewVnsRsRedirectHealthGroup(dst.GetDn(),
1✔
681
                        healthGroupDn))
1✔
682
        }
1✔
683
        return dst
1✔
684
}
685

686
func (cont *AciController) apicRedirectPol(name string, tenantName string, nodes []string,
687
        nodeMap map[string]*metadata.ServiceEndpoint,
688
        monPolDn string, enablePbrTracking bool) (apicapi.ApicObject, string) {
1✔
689
        rp := apicapi.NewVnsSvcRedirectPol(tenantName, name)
1✔
690
        rp.SetAttr("thresholdDownAction", "deny")
1✔
691
        if cont.config.DisableResilientHashing {
1✔
692
                rp.SetAttr("resilientHashEnabled", "no")
×
693
        }
×
694
        rpDn := rp.GetDn()
1✔
695
        for _, node := range nodes {
2✔
696
                cont.indexMutex.Lock()
1✔
697
                serviceEp, ok := nodeMap[node]
1✔
698
                if !ok {
1✔
699
                        continue
×
700
                }
701
                if serviceEp.Ipv4 != nil {
2✔
702
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv4.String(),
1✔
703
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
1✔
704
                }
1✔
705
                if serviceEp.Ipv6 != nil {
1✔
706
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv6.String(),
×
707
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
×
708
                }
×
709
                cont.indexMutex.Unlock()
1✔
710
        }
711
        if monPolDn != "" && enablePbrTracking {
2✔
712
                rp.AddChild(apicapi.NewVnsRsIPSLAMonitoringPol(rpDn, monPolDn))
1✔
713
        }
1✔
714
        return rp, rpDn
1✔
715
}
716

717
func apicExtNetCreate(enDn string, ingress string, ipv4 bool,
718
        cidr bool, sharedSec bool) apicapi.ApicObject {
1✔
719
        if !cidr {
2✔
720
                if ipv4 {
2✔
721
                        ingress += "/32"
1✔
722
                } else {
1✔
723
                        ingress += "/128"
×
724
                }
×
725
        }
726
        subnet := apicapi.NewL3extSubnet(enDn, ingress, "", "")
1✔
727
        if sharedSec {
2✔
728
                subnet.SetAttr("scope", "import-security,shared-security")
1✔
729
        }
1✔
730
        return subnet
1✔
731
}
732

733
func apicExtNet(name string, tenantName string, l3Out string,
734
        ingresses []string, sharedSecurity bool, snat bool) apicapi.ApicObject {
1✔
735
        en := apicapi.NewL3extInstP(tenantName, l3Out, name)
1✔
736
        enDn := en.GetDn()
1✔
737
        if snat {
2✔
738
                en.AddChild(apicapi.NewFvRsCons(enDn, name))
1✔
739
        } else {
2✔
740
                en.AddChild(apicapi.NewFvRsProv(enDn, name))
1✔
741
        }
1✔
742

743
        for _, ingress := range ingresses {
2✔
744
                ip, _, _ := net.ParseCIDR(ingress)
1✔
745
                // If ingress is a subnet
1✔
746
                if ip != nil {
2✔
747
                        if ip != nil && ip.To4() != nil {
2✔
748
                                subnet := apicExtNetCreate(enDn, ingress, true, true, sharedSecurity)
1✔
749
                                en.AddChild(subnet)
1✔
750
                        } else if ip != nil && ip.To16() != nil {
1✔
751
                                subnet := apicExtNetCreate(enDn, ingress, false, true, sharedSecurity)
×
752
                                en.AddChild(subnet)
×
753
                        }
×
754
                } else {
1✔
755
                        // If ingress is an IP address
1✔
756
                        ip := net.ParseIP(ingress)
1✔
757
                        if ip != nil && ip.To4() != nil {
2✔
758
                                subnet := apicExtNetCreate(enDn, ingress, true, false, sharedSecurity)
1✔
759
                                en.AddChild(subnet)
1✔
760
                        } else if ip != nil && ip.To16() != nil {
1✔
761
                                subnet := apicExtNetCreate(enDn, ingress, false, false, sharedSecurity)
×
762
                                en.AddChild(subnet)
×
763
                        }
×
764
                }
765
        }
766
        return en
1✔
767
}
768

769
func apicDefaultEgCons(conName string, tenantName string,
770
        appProfile string, epg string) apicapi.ApicObject {
×
771
        enDn := fmt.Sprintf("uni/tn-%s/ap-%s/epg-%s", tenantName, appProfile, epg)
×
772
        return apicapi.NewFvRsCons(enDn, conName)
×
773
}
×
774

775
func apicExtNetCons(conName string, tenantName string,
776
        l3Out string, net string) apicapi.ApicObject {
1✔
777
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
778
        return apicapi.NewFvRsCons(enDn, conName)
1✔
779
}
1✔
780

781
func apicExtNetProv(conName string, tenantName string,
782
        l3Out string, net string) apicapi.ApicObject {
1✔
783
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
784
        return apicapi.NewFvRsProv(enDn, conName)
1✔
785
}
1✔
786

787
// Helper function to check if a string item exists in a slice
788
func stringInSlice(str string, list []string) bool {
1✔
789
        for _, v := range list {
2✔
790
                if v == str {
2✔
791
                        return true
1✔
792
                }
1✔
793
        }
794
        return false
×
795
}
796

797
func validScope(scope string) bool {
1✔
798
        validValues := []string{"", "context", "tenant", "global"}
1✔
799
        return stringInSlice(scope, validValues)
1✔
800
}
1✔
801

802
func (cont *AciController) getGraphNameFromContract(name, tenantName string) (string, error) {
×
803
        var graphName string
×
804
        args := []string{
×
805
                "query-target=subtree",
×
806
        }
×
807
        url := fmt.Sprintf("/api/node/mo/uni/tn-%s/brc-%s.json?%s", tenantName, name, strings.Join(args, "&"))
×
808
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
809
        if err != nil {
×
810
                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
811
                return graphName, err
×
812
        }
×
813
        for _, obj := range apicresp.Imdata {
×
814
                for class, body := range obj {
×
815
                        if class == "vzRsSubjGraphAtt" {
×
816
                                tnVnsAbsGraphName, ok := body.Attributes["tnVnsAbsGraphName"].(string)
×
817
                                if ok {
×
818
                                        graphName = tnVnsAbsGraphName
×
819
                                }
×
820
                                break
×
821
                        }
822
                }
823
        }
824
        cont.log.Debug("graphName: ", graphName)
×
825
        return graphName, err
×
826
}
827

828
func apicContract(conName string, tenantName string,
829
        graphName string, scopeName string, isSnatPbrFltrChain bool,
830
        customSGAnnot bool) apicapi.ApicObject {
1✔
831
        con := apicapi.NewVzBrCP(tenantName, conName)
1✔
832
        if scopeName != "" && scopeName != "context" {
2✔
833
                con.SetAttr("scope", scopeName)
1✔
834
        }
1✔
835
        cs := apicapi.NewVzSubj(con.GetDn(), "loadbalancedservice")
1✔
836
        csDn := cs.GetDn()
1✔
837
        if isSnatPbrFltrChain {
2✔
838
                cs.SetAttr("revFltPorts", "no")
1✔
839
                inTerm := apicapi.NewVzInTerm(csDn)
1✔
840
                outTerm := apicapi.NewVzOutTerm(csDn)
1✔
841
                inTerm.AddChild(apicapi.NewVzRsInTermGraphAtt(inTerm.GetDn(), graphName))
1✔
842
                inTerm.AddChild(apicapi.NewVzRsFiltAtt(inTerm.GetDn(), conName+"_fromCons-toProv"))
1✔
843
                outTerm.AddChild(apicapi.NewVzRsOutTermGraphAtt(outTerm.GetDn(), graphName))
1✔
844
                outTerm.AddChild(apicapi.NewVzRsFiltAtt(outTerm.GetDn(), conName+"_fromProv-toCons"))
1✔
845
                cs.AddChild(inTerm)
1✔
846
                cs.AddChild(outTerm)
1✔
847
        } else {
2✔
848
                cs.AddChild(apicapi.NewVzRsSubjGraphAtt(csDn, graphName, customSGAnnot))
1✔
849
                cs.AddChild(apicapi.NewVzRsSubjFiltAtt(csDn, conName))
1✔
850
        }
1✔
851
        con.AddChild(cs)
1✔
852
        return con
1✔
853
}
854

855
func apicDevCtx(name string, tenantName string,
856
        graphName string, deviceName string, bdName string, rpDn string, isSnatPbrFltrChain bool) apicapi.ApicObject {
1✔
857
        cc := apicapi.NewVnsLDevCtx(tenantName, name, graphName, "loadbalancer")
1✔
858
        ccDn := cc.GetDn()
1✔
859
        graphDn := fmt.Sprintf("uni/tn-%s/lDevVip-%s", tenantName, deviceName)
1✔
860
        lifDn := fmt.Sprintf("%s/lIf-%s", graphDn, "interface")
1✔
861
        bdDn := fmt.Sprintf("uni/tn-%s/BD-%s", tenantName, bdName)
1✔
862
        cc.AddChild(apicapi.NewVnsRsLDevCtxToLDev(ccDn, graphDn))
1✔
863
        rpDnBase := rpDn
1✔
864
        for _, ctxConn := range []string{"consumer", "provider"} {
2✔
865
                lifCtx := apicapi.NewVnsLIfCtx(ccDn, ctxConn)
1✔
866
                if isSnatPbrFltrChain {
2✔
867
                        if ctxConn == "consumer" {
2✔
868
                                rpDn = rpDnBase + "_Cons"
1✔
869
                        } else {
2✔
870
                                rpDn = rpDnBase + "_Prov"
1✔
871
                        }
1✔
872
                }
873
                lifCtxDn := lifCtx.GetDn()
1✔
874
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToSvcRedirectPol(lifCtxDn, rpDn))
1✔
875
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToBD(lifCtxDn, bdDn))
1✔
876
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToLIf(lifCtxDn, lifDn))
1✔
877
                cc.AddChild(lifCtx)
1✔
878
        }
879
        return cc
1✔
880
}
881

882
func apicFilterEntry(filterDn string, count string, p_start string,
883
        p_end string, protocol string, stateful string, snat bool, outTerm bool) apicapi.ApicObject {
1✔
884
        fe := apicapi.NewVzEntry(filterDn, count)
1✔
885
        fe.SetAttr("etherT", "ip")
1✔
886
        fe.SetAttr("prot", protocol)
1✔
887
        if snat {
2✔
888
                if outTerm {
2✔
889
                        if protocol == "tcp" {
2✔
890
                                fe.SetAttr("tcpRules", "est")
1✔
891
                        }
1✔
892
                        // Reverse the ports for outTerm
893
                        fe.SetAttr("dFromPort", p_start)
1✔
894
                        fe.SetAttr("dToPort", p_end)
1✔
895
                } else {
1✔
896
                        fe.SetAttr("sFromPort", p_start)
1✔
897
                        fe.SetAttr("sToPort", p_end)
1✔
898
                }
1✔
899
        } else {
1✔
900
                fe.SetAttr("dFromPort", p_start)
1✔
901
                fe.SetAttr("dToPort", p_end)
1✔
902
        }
1✔
903
        fe.SetAttr("stateful", stateful)
1✔
904
        return fe
1✔
905
}
906
func apicFilter(name string, tenantName string,
907
        portSpec []v1.ServicePort, snat bool, snatRange portRangeSnat) apicapi.ApicObject {
1✔
908
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
909
        filterDn := filter.GetDn()
1✔
910

1✔
911
        var i int
1✔
912
        var port v1.ServicePort
1✔
913
        for i, port = range portSpec {
2✔
914
                pstr := strconv.Itoa(int(port.Port))
1✔
915
                proto := getProtocolStr(port.Protocol)
1✔
916
                fe := apicFilterEntry(filterDn, strconv.Itoa(i), pstr,
1✔
917
                        pstr, proto, "no", false, false)
1✔
918
                filter.AddChild(fe)
1✔
919
        }
1✔
920

921
        if snat {
1✔
922
                portSpec := []portRangeSnat{snatRange}
×
923
                p_start := strconv.Itoa(portSpec[0].start)
×
924
                p_end := strconv.Itoa(portSpec[0].end)
×
925

×
926
                fe1 := apicFilterEntry(filterDn, strconv.Itoa(i+1), p_start,
×
927
                        p_end, "tcp", "no", false, false)
×
928
                filter.AddChild(fe1)
×
929
                fe2 := apicFilterEntry(filterDn, strconv.Itoa(i+2), p_start,
×
930
                        p_end, "udp", "no", false, false)
×
931
                filter.AddChild(fe2)
×
932
        }
×
933
        return filter
1✔
934
}
935

936
func apicFilterSnat(name string, tenantName string,
937
        portSpec []portRangeSnat, outTerm bool) apicapi.ApicObject {
1✔
938
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
939
        filterDn := filter.GetDn()
1✔
940

1✔
941
        p_start := strconv.Itoa(portSpec[0].start)
1✔
942
        p_end := strconv.Itoa(portSpec[0].end)
1✔
943

1✔
944
        fe := apicFilterEntry(filterDn, "0", p_start,
1✔
945
                p_end, "tcp", "no", true, outTerm)
1✔
946
        filter.AddChild(fe)
1✔
947
        fe1 := apicFilterEntry(filterDn, "1", p_start,
1✔
948
                p_end, "udp", "no", true, outTerm)
1✔
949
        filter.AddChild(fe1)
1✔
950

1✔
951
        return filter
1✔
952
}
1✔
953

954
func (cont *AciController) updateServiceDeviceInstance(key string,
955
        service *v1.Service) error {
1✔
956
        cont.indexMutex.Lock()
1✔
957
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
958
        cont.serviceEndPoints.GetnodesMetadata(key, service, nodeMap)
1✔
959
        cont.indexMutex.Unlock()
1✔
960

1✔
961
        var nodes []string
1✔
962
        for node := range nodeMap {
2✔
963
                nodes = append(nodes, node)
1✔
964
        }
1✔
965
        sort.Strings(nodes)
1✔
966
        name := cont.aciNameForKey("svc", key)
1✔
967
        var conScope string
1✔
968
        scopeVal, ok := service.ObjectMeta.Annotations[metadata.ServiceContractScopeAnnotation]
1✔
969
        if ok {
2✔
970
                normScopeVal := strings.ToLower(scopeVal)
1✔
971
                if !validScope(normScopeVal) {
1✔
972
                        errString := "Invalid service contract scope value provided " + scopeVal
×
973
                        err := errors.New(errString)
×
974
                        serviceLogger(cont.log, service).Error("Could not create contract: ", err)
×
975
                        return err
×
976
                } else {
1✔
977
                        conScope = normScopeVal
1✔
978
                }
1✔
979
        } else {
1✔
980
                conScope = DefaultServiceContractScope
1✔
981
        }
1✔
982

983
        var sharedSecurity bool
1✔
984
        if conScope == "global" {
2✔
985
                sharedSecurity = true
1✔
986
        } else {
2✔
987
                sharedSecurity = DefaultServiceExtSubNetShared
1✔
988
        }
1✔
989

990
        graphName := cont.aciNameForKey("svc", "global")
1✔
991
        deviceName := cont.aciNameForKey("svc", "global")
1✔
992
        _, customSGAnnPresent := service.ObjectMeta.Annotations[metadata.ServiceGraphNameAnnotation]
1✔
993
        if customSGAnnPresent {
1✔
994
                customSG, err := cont.getGraphNameFromContract(name, cont.config.AciVrfTenant)
×
995
                if err == nil {
×
996
                        graphName = customSG
×
997
                }
×
998
        }
999
        cont.log.Debug("Using service graph ", graphName, " for service ", key)
1✔
1000

1✔
1001
        var serviceObjs apicapi.ApicSlice
1✔
1002
        if len(nodes) > 0 {
2✔
1003
                // 1. Service redirect policy
1✔
1004
                // The service redirect policy contains the MAC address
1✔
1005
                // and IP address of each of the service endpoints for
1✔
1006
                // each node that hosts a pod for this service.  The
1✔
1007
                // example below shows the case of two nodes.
1✔
1008
                rp, rpDn :=
1✔
1009
                        cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
1✔
1010
                                nodeMap, cont.staticMonPolDn(), cont.config.AciPbrTrackingNonSnat)
1✔
1011
                serviceObjs = append(serviceObjs, rp)
1✔
1012

1✔
1013
                // 2. Service graph contract and external network
1✔
1014
                // The service graph contract must be bound to the service
1✔
1015
                // graph.  This contract must be consumed by the default
1✔
1016
                // layer 3 network and provided by the service layer 3
1✔
1017
                // network.
1✔
1018
                {
2✔
1019
                        var ingresses []string
1✔
1020
                        for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
1021
                                ingresses = append(ingresses, ingress.IP)
1✔
1022
                        }
1✔
1023
                        serviceObjs = append(serviceObjs,
1✔
1024
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
1025
                                        cont.config.AciL3Out, ingresses, sharedSecurity, false))
1✔
1026
                }
1027

1028
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, false, customSGAnnPresent)
1✔
1029
                serviceObjs = append(serviceObjs, contract)
1✔
1030
                for _, net := range cont.config.AciExtNetworks {
2✔
1031
                        serviceObjs = append(serviceObjs,
1✔
1032
                                apicExtNetCons(name, cont.config.AciVrfTenant,
1✔
1033
                                        cont.config.AciL3Out, net))
1✔
1034
                }
1✔
1035

1036
                if cont.config.AddExternalContractToDefaultEPG && service.Spec.Type == v1.ServiceTypeLoadBalancer {
1✔
1037
                        defaultEpgTenant := cont.config.DefaultEg.PolicySpace
×
1038
                        defaultEpgStringSplit := strings.Split(cont.config.DefaultEg.Name, "|")
×
1039
                        var defaultEpgName, appProfile string
×
1040
                        if len(defaultEpgStringSplit) > 1 {
×
1041
                                appProfile = defaultEpgStringSplit[0]
×
1042
                                defaultEpgName = defaultEpgStringSplit[1]
×
1043
                        } else {
×
1044
                                appProfile = cont.config.AppProfile
×
1045
                                defaultEpgName = defaultEpgStringSplit[0]
×
1046
                        }
×
1047
                        serviceObjs = append(serviceObjs,
×
1048
                                apicDefaultEgCons(name, defaultEpgTenant, appProfile, defaultEpgName))
×
1049
                }
1050

1051
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
1052
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
1053

1✔
1054
                _, snat := cont.snatServices[key]
1✔
1055
                filter := apicFilter(name, cont.config.AciVrfTenant,
1✔
1056
                        service.Spec.Ports, snat, defaultPortRange)
1✔
1057
                serviceObjs = append(serviceObjs, filter)
1✔
1058

1✔
1059
                // 3. Device cluster context
1✔
1060
                // The logical device context binds the service contract
1✔
1061
                // to the redirect policy and the device cluster and
1✔
1062
                // bridge domain for the device cluster.
1✔
1063
                serviceObjs = append(serviceObjs,
1✔
1064
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName, deviceName,
1✔
1065
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, false))
1✔
1066
        }
1067

1068
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1069
        return nil
1✔
1070
}
1071

1072
func (cont *AciController) updateServiceDeviceInstanceSnat(key string) error {
1✔
1073
        nodeList := cont.nodeIndexer.List()
1✔
1074
        cont.indexMutex.Lock()
1✔
1075
        if len(cont.nodeServiceMetaCache) == 0 {
2✔
1076
                cont.indexMutex.Unlock()
1✔
1077
                return nil
1✔
1078
        }
1✔
1079
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
1080
        sort.Slice(nodeList, func(i, j int) bool {
2✔
1081
                nodeA := nodeList[i].(*v1.Node)
1✔
1082
                nodeB := nodeList[j].(*v1.Node)
1✔
1083
                return nodeA.ObjectMeta.Name < nodeB.ObjectMeta.Name
1✔
1084
        })
1✔
1085
        for itr, nodeItem := range nodeList {
2✔
1086
                if itr == cont.config.MaxSvcGraphNodes {
1✔
1087
                        break
×
1088
                }
1089
                node := nodeItem.(*v1.Node)
1✔
1090
                nodeName := node.ObjectMeta.Name
1✔
1091
                nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
1092
                if !ok {
2✔
1093
                        continue
1✔
1094
                }
1095
                _, ok = cont.fabricPathForNode(nodeName)
1✔
1096
                if !ok {
1✔
1097
                        continue
×
1098
                }
1099
                nodeLabels := node.ObjectMeta.Labels
1✔
1100
                excludeNode := cont.nodeLabelsInExcludeList(nodeLabels)
1✔
1101
                if excludeNode {
1✔
1102
                        continue
×
1103
                }
1104
                nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
1105
        }
1106
        cont.indexMutex.Unlock()
1✔
1107

1✔
1108
        var nodes []string
1✔
1109
        for node := range nodeMap {
2✔
1110
                nodes = append(nodes, node)
1✔
1111
        }
1✔
1112
        sort.Strings(nodes)
1✔
1113
        name := cont.aciNameForKey("snat", key)
1✔
1114
        var conScope = cont.config.SnatSvcContractScope
1✔
1115
        sharedSecurity := true
1✔
1116

1✔
1117
        graphName := cont.aciNameForKey("svc", "global")
1✔
1118
        var serviceObjs apicapi.ApicSlice
1✔
1119
        if len(nodes) > 0 {
2✔
1120
                // 1. Service redirect policy
1✔
1121
                // The service redirect policy contains the MAC address
1✔
1122
                // and IP address of each of the service endpoints for
1✔
1123
                // each node that hosts a pod for this service.
1✔
1124
                // For SNAT with the introduction of filter-chain usage, to work-around
1✔
1125
                // an APIC limitation, creating two PBR policies with same nodes.
1✔
1126
                var rpDn string
1✔
1127
                var rp apicapi.ApicObject
1✔
1128
                if cont.apicConn.SnatPbrFltrChain {
2✔
1129
                        rpCons, rpDnCons :=
1✔
1130
                                cont.apicRedirectPol(name+"_Cons", cont.config.AciVrfTenant, nodes,
1✔
1131
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
1132
                        serviceObjs = append(serviceObjs, rpCons)
1✔
1133
                        rpProv, _ :=
1✔
1134
                                cont.apicRedirectPol(name+"_Prov", cont.config.AciVrfTenant, nodes,
1✔
1135
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
1136
                        serviceObjs = append(serviceObjs, rpProv)
1✔
1137
                        rpDn = strings.TrimSuffix(rpDnCons, "_Cons")
1✔
1138
                } else {
1✔
1139
                        rp, rpDn =
×
1140
                                cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
×
1141
                                        nodeMap, cont.staticMonPolDn(), true)
×
1142
                        serviceObjs = append(serviceObjs, rp)
×
1143
                }
×
1144
                // 2. Service graph contract and external network
1145
                // The service graph contract must be bound to the
1146
                // service
1147
                // graph.  This contract must be consumed by the default
1148
                // layer 3 network and provided by the service layer 3
1149
                // network.
1150
                {
1✔
1151
                        var ingresses []string
1✔
1152
                        for _, policy := range cont.snatPolicyCache {
2✔
1153
                                ingresses = append(ingresses, policy.SnatIp...)
1✔
1154
                        }
1✔
1155
                        serviceObjs = append(serviceObjs,
1✔
1156
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
1157
                                        cont.config.AciL3Out, ingresses, sharedSecurity, true))
1✔
1158
                }
1159

1160
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, cont.apicConn.SnatPbrFltrChain, false)
1✔
1161
                serviceObjs = append(serviceObjs, contract)
1✔
1162

1✔
1163
                for _, net := range cont.config.AciExtNetworks {
2✔
1164
                        serviceObjs = append(serviceObjs,
1✔
1165
                                apicExtNetProv(name, cont.config.AciVrfTenant,
1✔
1166
                                        cont.config.AciL3Out, net))
1✔
1167
                }
1✔
1168

1169
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
1170
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
1171
                portSpec := []portRangeSnat{defaultPortRange}
1✔
1172
                if cont.apicConn.SnatPbrFltrChain {
2✔
1173
                        filterIn := apicFilterSnat(name+"_fromCons-toProv", cont.config.AciVrfTenant, portSpec, false)
1✔
1174
                        serviceObjs = append(serviceObjs, filterIn)
1✔
1175
                        filterOut := apicFilterSnat(name+"_fromProv-toCons", cont.config.AciVrfTenant, portSpec, true)
1✔
1176
                        serviceObjs = append(serviceObjs, filterOut)
1✔
1177
                } else {
1✔
1178
                        filter := apicFilterSnat(name, cont.config.AciVrfTenant, portSpec, false)
×
1179
                        serviceObjs = append(serviceObjs, filter)
×
1180
                }
×
1181
                // 3. Device cluster context
1182
                // The logical device context binds the service contract
1183
                // to the redirect policy and the device cluster and
1184
                // bridge domain for the device cluster.
1185
                serviceObjs = append(serviceObjs,
1✔
1186
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName, graphName,
1✔
1187
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, cont.apicConn.SnatPbrFltrChain))
1✔
1188
        }
1189
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1190
        return nil
1✔
1191
}
1192

1193
func (cont *AciController) nodeLabelsInExcludeList(Labels map[string]string) bool {
1✔
1194
        nodeSnatRedirectExclude := cont.config.NodeSnatRedirectExclude
1✔
1195

1✔
1196
        for _, nodeGroup := range nodeSnatRedirectExclude {
1✔
1197
                if len(nodeGroup.Labels) == 0 {
×
1198
                        continue
×
1199
                }
1200
                matchFound := true
×
1201
                for _, label := range nodeGroup.Labels {
×
1202
                        if _, ok := Labels["node-role.kubernetes.io/"+label]; !ok {
×
1203
                                matchFound = false
×
1204
                                break
×
1205
                        }
1206
                }
1207
                if matchFound {
×
1208
                        return true
×
1209
                }
×
1210
        }
1211
        return false
1✔
1212
}
1213

1214
func (cont *AciController) queueServiceUpdateByKey(key string) {
1✔
1215
        cont.serviceQueue.Add(key)
1✔
1216
}
1✔
1217

1218
func (cont *AciController) queueServiceUpdate(service *v1.Service) {
1✔
1219
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
1220
        if err != nil {
1✔
1221
                serviceLogger(cont.log, service).
×
1222
                        Error("Could not create service key: ", err)
×
1223
                return
×
1224
        }
×
1225
        cont.serviceQueue.Add(key)
1✔
1226
}
1227

1228
func apicDeviceCluster(name string, vrfTenant string,
1229
        physDom string, encap string,
1230
        nodes []string, nodeMap map[string]string) (apicapi.ApicObject, string) {
1✔
1231
        dc := apicapi.NewVnsLDevVip(vrfTenant, name)
1✔
1232
        dc.SetAttr("managed", "no")
1✔
1233
        dcDn := dc.GetDn()
1✔
1234
        dc.AddChild(apicapi.NewVnsRsALDevToPhysDomP(dcDn,
1✔
1235
                fmt.Sprintf("uni/phys-%s", physDom)))
1✔
1236
        lif := apicapi.NewVnsLIf(dcDn, "interface")
1✔
1237
        lif.SetAttr("encap", encap)
1✔
1238
        lifDn := lif.GetDn()
1✔
1239

1✔
1240
        for _, node := range nodes {
2✔
1241
                path, ok := nodeMap[node]
1✔
1242
                if !ok {
1✔
1243
                        continue
×
1244
                }
1245

1246
                cdev := apicapi.NewVnsCDev(dcDn, node)
1✔
1247
                cif := apicapi.NewVnsCif(cdev.GetDn(), "interface")
1✔
1248
                cif.AddChild(apicapi.NewVnsRsCIfPathAtt(cif.GetDn(), path))
1✔
1249
                cdev.AddChild(cif)
1✔
1250
                lif.AddChild(apicapi.NewVnsRsCIfAttN(lifDn, cif.GetDn()))
1✔
1251
                dc.AddChild(cdev)
1✔
1252
        }
1253

1254
        dc.AddChild(lif)
1✔
1255

1✔
1256
        return dc, dcDn
1✔
1257
}
1258

1259
func apicServiceGraph(name string, tenantName string,
1260
        dcDn string) apicapi.ApicObject {
1✔
1261
        sg := apicapi.NewVnsAbsGraph(tenantName, name)
1✔
1262
        sgDn := sg.GetDn()
1✔
1263
        var provDn string
1✔
1264
        var consDn string
1✔
1265
        var cTermDn string
1✔
1266
        var pTermDn string
1✔
1267
        {
2✔
1268
                an := apicapi.NewVnsAbsNode(sgDn, "loadbalancer")
1✔
1269
                an.SetAttr("managed", "no")
1✔
1270
                an.SetAttr("routingMode", "Redirect")
1✔
1271
                anDn := an.GetDn()
1✔
1272
                cons := apicapi.NewVnsAbsFuncConn(anDn, "consumer")
1✔
1273
                consDn = cons.GetDn()
1✔
1274
                an.AddChild(cons)
1✔
1275
                prov := apicapi.NewVnsAbsFuncConn(anDn, "provider")
1✔
1276
                provDn = prov.GetDn()
1✔
1277
                an.AddChild(prov)
1✔
1278
                an.AddChild(apicapi.NewVnsRsNodeToLDev(anDn, dcDn))
1✔
1279
                sg.AddChild(an)
1✔
1280
        }
1✔
1281
        {
1✔
1282
                tnc := apicapi.NewVnsAbsTermNodeCon(sgDn, "T1")
1✔
1283
                tncDn := tnc.GetDn()
1✔
1284
                cTerm := apicapi.NewVnsAbsTermConn(tncDn)
1✔
1285
                cTermDn = cTerm.GetDn()
1✔
1286
                tnc.AddChild(cTerm)
1✔
1287
                tnc.AddChild(apicapi.NewVnsInTerm(tncDn))
1✔
1288
                tnc.AddChild(apicapi.NewVnsOutTerm(tncDn))
1✔
1289
                sg.AddChild(tnc)
1✔
1290
        }
1✔
1291
        {
1✔
1292
                tnp := apicapi.NewVnsAbsTermNodeProv(sgDn, "T2")
1✔
1293
                tnpDn := tnp.GetDn()
1✔
1294
                pTerm := apicapi.NewVnsAbsTermConn(tnpDn)
1✔
1295
                pTermDn = pTerm.GetDn()
1✔
1296
                tnp.AddChild(pTerm)
1✔
1297
                tnp.AddChild(apicapi.NewVnsInTerm(tnpDn))
1✔
1298
                tnp.AddChild(apicapi.NewVnsOutTerm(tnpDn))
1✔
1299
                sg.AddChild(tnp)
1✔
1300
        }
1✔
1301
        {
1✔
1302
                acc := apicapi.NewVnsAbsConnection(sgDn, "C1")
1✔
1303
                acc.SetAttr("connDir", "provider")
1✔
1304
                accDn := acc.GetDn()
1✔
1305
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, consDn))
1✔
1306
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, cTermDn))
1✔
1307
                sg.AddChild(acc)
1✔
1308
        }
1✔
1309
        {
1✔
1310
                acp := apicapi.NewVnsAbsConnection(sgDn, "C2")
1✔
1311
                acp.SetAttr("connDir", "provider")
1✔
1312
                acpDn := acp.GetDn()
1✔
1313
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, provDn))
1✔
1314
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, pTermDn))
1✔
1315
                sg.AddChild(acp)
1✔
1316
        }
1✔
1317
        return sg
1✔
1318
}
1319
func (cont *AciController) updateDeviceCluster() {
1✔
1320
        nodeMap := make(map[string]string)
1✔
1321
        cont.indexMutex.Lock()
1✔
1322
        for node := range cont.nodeOpflexDevice {
2✔
1323
                cont.log.Debug("Processing node in nodeOpflexDevice cache : ", node)
1✔
1324
                fabricPath, ok := cont.fabricPathForNode(node)
1✔
1325
                if !ok {
2✔
1326
                        continue
1✔
1327
                }
1328
                nodeMap[node] = fabricPath
1✔
1329
        }
1330

1331
        // For clusters other than OpenShift On OpenStack,
1332
        // openStackFabricPathDnMap will be empty
1333
        for host, opflexOdevInfo := range cont.openStackFabricPathDnMap {
1✔
1334
                nodeMap[host] = opflexOdevInfo.fabricPathDn
×
1335
        }
×
1336

1337
        // For OpenShift On OpenStack clusters,
1338
        // hostFabricPathDnMap will be empty
1339
        for _, hostInfo := range cont.hostFabricPathDnMap {
1✔
1340
                if hostInfo.fabricPathDn != "" {
×
1341
                        nodeMap[hostInfo.host] = hostInfo.fabricPathDn
×
1342
                }
×
1343
        }
1344
        cont.indexMutex.Unlock()
1✔
1345

1✔
1346
        var nodes []string
1✔
1347
        for node := range nodeMap {
2✔
1348
                nodes = append(nodes, node)
1✔
1349
        }
1✔
1350
        sort.Strings(nodes)
1✔
1351

1✔
1352
        name := cont.aciNameForKey("svc", "global")
1✔
1353
        var serviceObjs apicapi.ApicSlice
1✔
1354

1✔
1355
        // 1. Device cluster:
1✔
1356
        // The device cluster is a set of physical paths that need to be
1✔
1357
        // created for each node in the cluster, that correspond to the
1✔
1358
        // service interface for each node.
1✔
1359
        dc, dcDn := apicDeviceCluster(name, cont.config.AciVrfTenant,
1✔
1360
                cont.config.AciServicePhysDom, cont.config.AciServiceEncap,
1✔
1361
                nodes, nodeMap)
1✔
1362
        serviceObjs = append(serviceObjs, dc)
1✔
1363

1✔
1364
        // 2. Service graph template
1✔
1365
        // The service graph controls how the traffic will be redirected.
1✔
1366
        // A service graph must be created for each device cluster.
1✔
1367
        serviceObjs = append(serviceObjs,
1✔
1368
                apicServiceGraph(name, cont.config.AciVrfTenant, dcDn))
1✔
1369

1✔
1370
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1371
}
1372

1373
func (cont *AciController) fabricPathLogger(node string,
1374
        obj apicapi.ApicObject) *logrus.Entry {
1✔
1375
        return cont.log.WithFields(logrus.Fields{
1✔
1376
                "fabricPath": obj.GetAttr("fabricPathDn"),
1✔
1377
                "mac":        obj.GetAttr("mac"),
1✔
1378
                "node":       node,
1✔
1379
                "obj":        obj,
1✔
1380
        })
1✔
1381
}
1✔
1382

1383
func (cont *AciController) setOpenStackSystemId() string {
×
1384

×
1385
        // 1) get opflexIDEp with containerName == <node name of any one of the openshift nodes>
×
1386
        // 2) extract OpenStack system id from compHvDn attribute
×
1387
        //    comp/prov-OpenStack/ctrlr-[k8s-scale]-k8s-scale/hv-overcloud-novacompute-0 - sample compHvDn,
×
1388
        //    where k8s-scale is the system id
×
1389

×
1390
        var systemId string
×
1391
        nodeList := cont.nodeIndexer.List()
×
1392
        if len(nodeList) < 1 {
×
1393
                return systemId
×
1394
        }
×
1395
        node := nodeList[0].(*v1.Node)
×
1396
        nodeName := node.ObjectMeta.Name
×
1397
        opflexIDEpFilter := fmt.Sprintf("query-target-filter=and(eq(opflexIDEp.containerName,\"%s\"))", nodeName)
×
1398
        opflexIDEpArgs := []string{
×
1399
                opflexIDEpFilter,
×
1400
        }
×
1401
        url := fmt.Sprintf("/api/node/class/opflexIDEp.json?%s", strings.Join(opflexIDEpArgs, "&"))
×
1402
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
1403
        if err != nil {
×
1404
                cont.log.Error("Failed to get APIC response, err: ", err.Error())
×
1405
                return systemId
×
1406
        }
×
1407
        for _, obj := range apicresp.Imdata {
×
1408
                for _, body := range obj {
×
1409
                        compHvDn, ok := body.Attributes["compHvDn"].(string)
×
1410
                        if ok {
×
1411
                                systemId = compHvDn[strings.IndexByte(compHvDn, '[')+1 : strings.IndexByte(compHvDn, ']')]
×
1412
                                break
×
1413
                        }
1414
                }
1415
        }
1416
        cont.indexMutex.Lock()
×
1417
        cont.openStackSystemId = systemId
×
1418
        cont.log.Info("Setting OpenStack system id : ", cont.openStackSystemId)
×
1419
        cont.indexMutex.Unlock()
×
1420
        return systemId
×
1421
}
1422

1423
// Returns true when a new OpenStack opflexODev is added
1424
func (cont *AciController) openStackOpflexOdevUpdate(obj apicapi.ApicObject) bool {
×
1425

×
1426
        // If opflexOdev compHvDn contains comp/prov-OpenShift/ctrlr-[<systemid>]-<systemid>,
×
1427
        // it means that it is an OpenStack OpflexOdev which belongs to OpenStack with system id <systemid>
×
1428

×
1429
        var deviceClusterUpdate bool
×
1430
        compHvDn := obj.GetAttrStr("compHvDn")
×
1431
        if strings.Contains(compHvDn, "prov-OpenStack") {
×
1432
                cont.indexMutex.Lock()
×
1433
                systemId := cont.openStackSystemId
×
1434
                cont.indexMutex.Unlock()
×
1435
                if systemId == "" {
×
1436
                        systemId = cont.setOpenStackSystemId()
×
1437
                }
×
1438
                if systemId == "" {
×
1439
                        cont.log.Error("Failed  to get OpenStack system id")
×
1440
                        return deviceClusterUpdate
×
1441
                }
×
1442
                prefix := fmt.Sprintf("comp/prov-OpenStack/ctrlr-[%s]-%s", systemId, systemId)
×
1443
                if strings.Contains(compHvDn, prefix) {
×
1444
                        cont.log.Info("Received notification for OpenStack opflexODev update, hostName: ",
×
1445
                                obj.GetAttrStr("hostName"), " dn: ", obj.GetAttrStr("dn"))
×
1446
                        cont.indexMutex.Lock()
×
1447
                        opflexOdevInfo, ok := cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")]
×
1448
                        if ok {
×
1449
                                opflexOdevInfo.opflexODevDn[obj.GetAttrStr("dn")] = struct{}{}
×
1450
                                cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")] = opflexOdevInfo
×
1451
                        } else {
×
1452
                                var openstackopflexodevinfo openstackOpflexOdevInfo
×
1453
                                opflexODevDn := make(map[string]struct{})
×
1454
                                opflexODevDn[obj.GetAttrStr("dn")] = struct{}{}
×
1455
                                openstackopflexodevinfo.fabricPathDn = obj.GetAttrStr("fabricPathDn")
×
1456
                                openstackopflexodevinfo.opflexODevDn = opflexODevDn
×
1457
                                cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")] = openstackopflexodevinfo
×
1458
                                deviceClusterUpdate = true
×
1459
                        }
×
1460
                        cont.indexMutex.Unlock()
×
1461
                }
1462
        }
1463
        return deviceClusterUpdate
×
1464
}
1465

1466
func (cont *AciController) infraRtAttEntPDeleted(dn string) {
×
1467
        // dn format : uni/infra/attentp-k8s-scale-esxi-aaep/rtattEntP-[uni/infra/funcprof/accbundle-esxi1-vpc-ipg]
×
1468
        cont.log.Info("Processing delete of infraRtAttEntP: ", dn)
×
1469

×
1470
        // extract uni/infra/funcprof/accbundle-esxi1-vpc-ipg
×
1471
        re := regexp.MustCompile(`\[(.*?)\]`)
×
1472
        matches := re.FindStringSubmatch(dn)
×
1473

×
1474
        if len(matches) < 2 {
×
1475
                cont.log.Error("Failed to extract ipg from dn : ", dn)
×
1476
                return
×
1477
        }
×
1478
        tdn := matches[1]
×
1479

×
1480
        cont.indexMutex.Lock()
×
1481
        _, ok := cont.hostFabricPathDnMap[tdn]
×
1482
        if ok {
×
1483
                delete(cont.hostFabricPathDnMap, tdn)
×
1484
                cont.log.Info("Deleted ipg : ", tdn)
×
1485
        }
×
1486
        cont.indexMutex.Unlock()
×
1487

×
1488
        if ok {
×
1489
                cont.updateDeviceCluster()
×
1490
        }
×
1491
}
1492

1493
func (cont *AciController) vpcIfDeleted(dn string) {
×
1494
        var deleted bool
×
1495
        cont.indexMutex.Lock()
×
1496
        for tDn, hostInfo := range cont.hostFabricPathDnMap {
×
1497
                if _, present := hostInfo.vpcIfDn[dn]; present {
×
1498
                        cont.log.Info("Deleting vpcIf, dn :", dn)
×
1499
                        delete(hostInfo.vpcIfDn, dn)
×
1500
                        if len(hostInfo.vpcIfDn) == 0 {
×
1501
                                cont.log.Infof("Removing fabricPathDn(%s) of ipg : %s ", hostInfo.fabricPathDn, hostInfo.host)
×
1502
                                hostInfo.fabricPathDn = ""
×
1503
                                deleted = true
×
1504
                        }
×
1505
                        cont.hostFabricPathDnMap[tDn] = hostInfo
×
1506
                }
1507
        }
1508
        cont.indexMutex.Unlock()
×
1509
        if deleted {
×
1510
                cont.updateDeviceCluster()
×
1511
        }
×
1512
}
1513

1514
func (cont *AciController) vpcIfChanged(obj apicapi.ApicObject) {
×
1515
        if cont.updateHostFabricPathDnMap(obj) {
×
1516
                cont.updateDeviceCluster()
×
1517
        }
×
1518
}
1519

1520
func (cont *AciController) updateHostFabricPathDnMap(obj apicapi.ApicObject) bool {
×
1521
        var accBndlGrpDn, fabricPathDn, dn string
×
1522
        for _, body := range obj {
×
1523
                var ok bool
×
1524
                accBndlGrpDn, ok = body.Attributes["accBndlGrpDn"].(string)
×
1525
                if !ok || (ok && accBndlGrpDn == "") {
×
1526
                        cont.log.Error("accBndlGrpDn missing/empty in vpcIf")
×
1527
                        return false
×
1528
                }
×
1529
                fabricPathDn, ok = body.Attributes["fabricPathDn"].(string)
×
1530
                if !ok && (ok && fabricPathDn == "") {
×
1531
                        cont.log.Error("fabricPathDn missing/empty in vpcIf")
×
1532
                        return false
×
1533
                }
×
1534
                dn, ok = body.Attributes["dn"].(string)
×
1535
                if !ok && (ok && dn == "") {
×
1536
                        cont.log.Error("dn missing/empty in vpcIf")
×
1537
                        return false
×
1538
                }
×
1539
        }
1540
        var updated bool
×
1541
        cont.indexMutex.Lock()
×
1542
        // If accBndlGrpDn exists in hostFabricPathDnMap, the vpcIf belongs to the cluster AEP
×
1543
        hostInfo, exists := cont.hostFabricPathDnMap[accBndlGrpDn]
×
1544
        if exists {
×
1545
                if _, present := hostInfo.vpcIfDn[dn]; !present {
×
1546
                        hostInfo.vpcIfDn[dn] = struct{}{}
×
1547
                        cont.log.Infof("vpcIf processing, dn : %s, accBndlGrpDn: %s", dn, accBndlGrpDn)
×
1548
                }
×
1549
                if hostInfo.fabricPathDn != fabricPathDn {
×
1550
                        hostInfo.fabricPathDn = fabricPathDn
×
1551
                        cont.log.Info("Updated fabricPathDn of ipg :", hostInfo.host, " to: ", hostInfo.fabricPathDn)
×
1552
                        updated = true
×
1553
                }
×
1554
                cont.hostFabricPathDnMap[accBndlGrpDn] = hostInfo
×
1555
        }
1556
        cont.indexMutex.Unlock()
×
1557
        return updated
×
1558
}
1559

1560
func (cont *AciController) infraRtAttEntPChanged(obj apicapi.ApicObject) {
×
1561
        var tdn string
×
1562
        for _, body := range obj {
×
1563
                var ok bool
×
1564
                tdn, ok = body.Attributes["tDn"].(string)
×
1565
                if !ok || (ok && tdn == "") {
×
1566
                        cont.log.Error("tDn missing/empty in infraRtAttEntP")
×
1567
                        return
×
1568
                }
×
1569
        }
1570
        var updated bool
×
1571
        cont.log.Info("infraRtAttEntP updated, tDn : ", tdn)
×
1572

×
1573
        // tdn format for vpc : /uni/infra/funcprof/accbundle-esxi1-vpc-ipg
×
1574
        // tdn format for single leaf : /uni/infra/funcprof/accportgrp-IPG_CLIENT_SIM
×
1575

×
1576
        // Ignore processing of single leaf
×
1577
        if !strings.Contains(tdn, "/accbundle-") {
×
1578
                cont.log.Info("Skipping processing of infraRtAttEntP update, not applicable for non-VPC configuration: ", tdn)
×
1579
                return
×
1580
        }
×
1581

1582
        // extract esxi1-vpc-ipg
1583
        parts := strings.Split(tdn, "/")
×
1584
        lastPart := parts[len(parts)-1]
×
1585
        host := strings.TrimPrefix(lastPart, "accbundle-")
×
1586

×
1587
        // adding entry for ipg in hostFabricPathDnMap
×
1588
        cont.indexMutex.Lock()
×
1589
        _, exists := cont.hostFabricPathDnMap[tdn]
×
1590
        if !exists {
×
1591
                var hostInfo hostFabricInfo
×
1592
                hostInfo.host = host
×
1593
                hostInfo.vpcIfDn = make(map[string]struct{})
×
1594
                cont.hostFabricPathDnMap[tdn] = hostInfo
×
1595
        }
×
1596
        cont.indexMutex.Unlock()
×
1597

×
1598
        accBndlGrpFilter := fmt.Sprintf(`query-target-filter=and(eq(vpcIf.accBndlGrpDn,"%s"))`, tdn)
×
1599
        url := fmt.Sprintf("/api/class/vpcIf.json?%s", accBndlGrpFilter)
×
1600
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
1601
        if err != nil {
×
1602
                cont.log.Error("Failed to get APIC response, err: ", err.Error())
×
1603
                return
×
1604
        }
×
1605

1606
        for _, obj := range apicresp.Imdata {
×
1607
                if cont.updateHostFabricPathDnMap(obj) && !updated {
×
1608
                        updated = true
×
1609
                }
×
1610
        }
1611

1612
        if updated {
×
1613
                cont.updateDeviceCluster()
×
1614
        }
×
1615
        return
×
1616
}
1617

1618
func (cont *AciController) opflexDeviceChanged(obj apicapi.ApicObject) {
1✔
1619
        devType := obj.GetAttrStr("devType")
1✔
1620
        domName := obj.GetAttrStr("domName")
1✔
1621
        ctrlrName := obj.GetAttrStr("ctrlrName")
1✔
1622

1✔
1623
        if !cont.config.DisableServiceVlanPreprovisioning && strings.Contains(cont.config.Flavor, "openstack") {
1✔
1624
                if cont.openStackOpflexOdevUpdate(obj) {
×
1625
                        cont.log.Info("OpenStack opflexODev for ", obj.GetAttrStr("hostName"), " is added")
×
1626
                        cont.updateDeviceCluster()
×
1627
                }
×
1628
        }
1629
        if (devType == cont.env.OpFlexDeviceType()) && (domName == cont.config.AciVmmDomain) && (ctrlrName == cont.config.AciVmmController) {
2✔
1630
                cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Processing opflex device update")
1✔
1631
                if obj.GetAttrStr("state") == "disconnected" {
2✔
1632
                        cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Opflex device disconnected")
1✔
1633
                        cont.indexMutex.Lock()
1✔
1634
                        for node, devices := range cont.nodeOpflexDevice {
1✔
1635
                                if node == obj.GetAttrStr("hostName") {
×
1636
                                        for _, device := range devices {
×
1637
                                                if device.GetDn() == obj.GetDn() {
×
1638
                                                        device.SetAttr("state", "disconnected")
×
1639
                                                        cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Debug("Opflex device cache updated for disconnected node")
×
1640
                                                }
×
1641
                                        }
1642
                                        cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", devices)
×
1643
                                        break
×
1644
                                }
1645
                        }
1646
                        cont.indexMutex.Unlock()
1✔
1647
                        cont.updateDeviceCluster()
1✔
1648
                        return
1✔
1649
                }
1650
                var nodeUpdates []string
1✔
1651

1✔
1652
                cont.indexMutex.Lock()
1✔
1653
                nodefound := false
1✔
1654
                for node, devices := range cont.nodeOpflexDevice {
2✔
1655
                        found := false
1✔
1656

1✔
1657
                        if node == obj.GetAttrStr("hostName") {
2✔
1658
                                nodefound = true
1✔
1659
                        }
1✔
1660

1661
                        for i, device := range devices {
2✔
1662
                                if device.GetDn() != obj.GetDn() {
2✔
1663
                                        continue
1✔
1664
                                }
1665
                                found = true
1✔
1666

1✔
1667
                                if obj.GetAttrStr("hostName") != node {
2✔
1668
                                        cont.fabricPathLogger(node, device).
1✔
1669
                                                Debug("Moving opflex device from node")
1✔
1670

1✔
1671
                                        devices = append(devices[:i], devices[i+1:]...)
1✔
1672
                                        cont.nodeOpflexDevice[node] = devices
1✔
1673
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1674
                                        break
1✔
1675
                                } else if (device.GetAttrStr("mac") != obj.GetAttrStr("mac")) ||
1✔
1676
                                        (device.GetAttrStr("fabricPathDn") != obj.GetAttrStr("fabricPathDn")) ||
1✔
1677
                                        (device.GetAttrStr("state") != obj.GetAttrStr("state")) {
2✔
1678
                                        cont.fabricPathLogger(node, obj).
1✔
1679
                                                Debug("Updating opflex device")
1✔
1680

1✔
1681
                                        devices = append(append(devices[:i], devices[i+1:]...), obj)
1✔
1682
                                        cont.nodeOpflexDevice[node] = devices
1✔
1683
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1684
                                        break
1✔
1685
                                }
1686
                        }
1687
                        if !found && obj.GetAttrStr("hostName") == node {
2✔
1688
                                cont.fabricPathLogger(node, obj).
1✔
1689
                                        Debug("Appending opflex device")
1✔
1690

1✔
1691
                                devices = append(devices, obj)
1✔
1692
                                cont.nodeOpflexDevice[node] = devices
1✔
1693
                                nodeUpdates = append(nodeUpdates, node)
1✔
1694
                        }
1✔
1695
                }
1696
                if !nodefound {
2✔
1697
                        node := obj.GetAttrStr("hostName")
1✔
1698
                        cont.fabricPathLogger(node, obj).Debug("Adding opflex device")
1✔
1699
                        cont.nodeOpflexDevice[node] = apicapi.ApicSlice{obj}
1✔
1700
                        nodeUpdates = append(nodeUpdates, node)
1✔
1701
                }
1✔
1702
                cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", cont.nodeOpflexDevice[obj.GetAttrStr("hostName")])
1✔
1703
                cont.indexMutex.Unlock()
1✔
1704

1✔
1705
                for _, node := range nodeUpdates {
2✔
1706
                        cont.env.NodeServiceChanged(node)
1✔
1707
                        cont.erspanSyncOpflexDev()
1✔
1708
                }
1✔
1709
                cont.updateDeviceCluster()
1✔
1710
        }
1711
}
1712

1713
func (cont *AciController) postOpflexDeviceDelete(nodes []string) {
1✔
1714
        cont.updateDeviceCluster()
1✔
1715
        for _, node := range nodes {
2✔
1716
                cont.env.NodeServiceChanged(node)
1✔
1717
                cont.erspanSyncOpflexDev()
1✔
1718
        }
1✔
1719
}
1720

1721
func (cont *AciController) opflexDeviceDeleted(dn string) {
1✔
1722
        var nodeUpdates []string
1✔
1723
        var dnFound bool //to check if the dn belongs to this cluster
1✔
1724
        cont.log.Info("Processing opflex device delete notification of ", dn)
1✔
1725
        cont.indexMutex.Lock()
1✔
1726
        for node, devices := range cont.nodeOpflexDevice {
2✔
1727
                for i, device := range devices {
2✔
1728
                        if device.GetDn() != dn {
2✔
1729
                                continue
1✔
1730
                        }
1731
                        dnFound = true
1✔
1732
                        cont.fabricPathLogger(node, device).
1✔
1733
                                Debug("Deleting opflex device path")
1✔
1734
                        devices = append(devices[:i], devices[i+1:]...)
1✔
1735
                        cont.nodeOpflexDevice[node] = devices
1✔
1736
                        cont.log.Info("Deleted opflex device of node ", node, ": ", dn)
1✔
1737
                        nodeUpdates = append(nodeUpdates, node)
1✔
1738
                        break
1✔
1739
                }
1740
                if len(devices) == 0 {
2✔
1741
                        delete(cont.nodeOpflexDevice, node)
1✔
1742
                }
1✔
1743
        }
1744

1745
        // For clusters other than OpenShift On OpenStack,
1746
        // openStackFabricPathDnMap will be empty
1747
        for host, opflexOdevInfo := range cont.openStackFabricPathDnMap {
1✔
1748
                if _, ok := opflexOdevInfo.opflexODevDn[dn]; ok {
×
1749
                        cont.log.Info("Received OpenStack opflexODev delete notification for ", dn)
×
1750
                        delete(opflexOdevInfo.opflexODevDn, dn)
×
1751
                        if len(opflexOdevInfo.opflexODevDn) < 1 {
×
1752
                                delete(cont.openStackFabricPathDnMap, host)
×
1753
                                cont.log.Info("OpenStack opflexODev of host ", host, " is deleted from cache")
×
1754
                                dnFound = true
×
1755
                        } else {
×
1756
                                cont.openStackFabricPathDnMap[host] = opflexOdevInfo
×
1757
                        }
×
1758
                        break
×
1759
                }
1760
        }
1761
        cont.indexMutex.Unlock()
1✔
1762

1✔
1763
        if dnFound {
2✔
1764
                cont.postOpflexDeviceDelete(nodeUpdates)
1✔
1765
        }
1✔
1766
}
1767

1768
func (cont *AciController) writeApicSvc(key string, service *v1.Service) {
1✔
1769
        if cont.isCNOEnabled() {
1✔
1770
                return
×
1771
        }
×
1772
        aobj := apicapi.NewVmmInjectedSvc(cont.vmmDomainProvider(),
1✔
1773
                cont.config.AciVmmDomain, cont.config.AciVmmController,
1✔
1774
                service.Namespace, service.Name)
1✔
1775
        aobjDn := aobj.GetDn()
1✔
1776
        aobj.SetAttr("guid", string(service.UID))
1✔
1777

1✔
1778
        svcns := service.ObjectMeta.Namespace
1✔
1779
        _, exists, err := cont.namespaceIndexer.GetByKey(svcns)
1✔
1780
        if err != nil {
1✔
1781
                cont.log.Error("Failed to lookup ns : ", svcns, " ", err)
×
1782
                return
×
1783
        }
×
1784
        if !exists {
2✔
1785
                cont.log.Debug("Namespace of service ", service.ObjectMeta.Name, ": ", svcns, " doesn't exist, hence not sending an update to the APIC")
1✔
1786
                return
1✔
1787
        }
1✔
1788

1789
        if !cont.serviceEndPoints.SetServiceApicObject(aobj, service) {
2✔
1790
                return
1✔
1791
        }
1✔
1792
        var setApicSvcDnsName bool
1✔
1793
        if len(cont.config.ApicHosts) != 0 && apicapi.ApicVersion >= "5.1" {
1✔
1794
                setApicSvcDnsName = true
×
1795
        }
×
1796
        // APIC model only allows one of these
1797
        for _, ingress := range service.Status.LoadBalancer.Ingress {
1✔
1798
                if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
1799
                        aobj.SetAttr("lbIp", ingress.IP)
×
1800
                } else if ingress.Hostname != "" {
×
1801
                        ipList, err := net.LookupHost(ingress.Hostname)
×
1802
                        if err == nil && len(ipList) > 0 {
×
1803
                                aobj.SetAttr("lbIp", ipList[0])
×
1804
                        } else {
×
1805
                                cont.log.Errorf("Lookup: err: %v, ipList: %+v", err, ipList)
×
1806
                        }
×
1807
                }
1808
                break
×
1809
        }
1810
        if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != "None" {
2✔
1811
                aobj.SetAttr("clusterIp", service.Spec.ClusterIP)
1✔
1812
        }
1✔
1813

1814
        var t string
1✔
1815
        switch service.Spec.Type {
1✔
1816
        case v1.ServiceTypeClusterIP:
×
1817
                t = "clusterIp"
×
1818
        case v1.ServiceTypeNodePort:
×
1819
                t = "nodePort"
×
1820
        case v1.ServiceTypeLoadBalancer:
1✔
1821
                t = "loadBalancer"
1✔
1822
        case v1.ServiceTypeExternalName:
×
1823
                t = "externalName"
×
1824
        }
1825
        if t != "" {
2✔
1826
                aobj.SetAttr("type", t)
1✔
1827
        }
1✔
1828

1829
        if setApicSvcDnsName || cont.config.Flavor == "k8s-overlay" {
1✔
1830
                dnsName := fmt.Sprintf("%s.%s.svc.cluster.local", service.Name, service.Namespace)
×
1831

×
1832
                for _, ingress := range service.Status.LoadBalancer.Ingress {
×
1833
                        if ingress.Hostname != "" {
×
1834
                                aobj.SetAttr("dnsName", ingress.Hostname)
×
1835
                        } else if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
1836
                                aobj.SetAttr("dnsName", dnsName)
×
1837
                        }
×
1838
                }
1839
                if t == "clusterIp" || t == "nodePort" || t == "externalName" {
×
1840
                        aobj.SetAttr("dnsName", dnsName)
×
1841
                }
×
1842
        }
1843
        for _, port := range service.Spec.Ports {
2✔
1844
                proto := getProtocolStr(port.Protocol)
1✔
1845
                p := apicapi.NewVmmInjectedSvcPort(aobjDn,
1✔
1846
                        strconv.Itoa(int(port.Port)), proto, port.TargetPort.String())
1✔
1847
                p.SetAttr("nodePort", strconv.Itoa(int(port.NodePort)))
1✔
1848
                aobj.AddChild(p)
1✔
1849
        }
1✔
1850
        if cont.config.EnableVmmInjectedLabels && service.ObjectMeta.Labels != nil && apicapi.ApicVersion >= "5.2" {
1✔
1851
                for key, val := range service.ObjectMeta.Labels {
×
1852
                        newLabelKey := cont.aciNameForKey("label", key)
×
1853
                        label := apicapi.NewVmmInjectedLabel(aobj.GetDn(),
×
1854
                                newLabelKey, val)
×
1855
                        aobj.AddChild(label)
×
1856
                }
×
1857
        }
1858
        name := cont.aciNameForKey("service-vmm", key)
1✔
1859
        cont.log.Debug("Write Service Object: ", aobj)
1✔
1860
        cont.apicConn.WriteApicObjects(name, apicapi.ApicSlice{aobj})
1✔
1861
        cont.log.Debugf("svcObject: %+v", aobj)
1✔
1862
}
1863

1864
func removeAllConditions(conditions []metav1.Condition, conditionType string) []metav1.Condition {
1✔
1865
        i := 0
1✔
1866
        for _, cond := range conditions {
1✔
1867
                if cond.Type != conditionType {
×
1868
                        conditions[i] = cond
×
1869
                }
×
1870
        }
1871
        return conditions[:i]
1✔
1872
}
1873

1874
func (cont *AciController) updateServiceCondition(service *v1.Service, success bool, reason string, message string) bool {
1✔
1875
        conditionType := "LbIpamAllocation"
1✔
1876

1✔
1877
        var condition metav1.Condition
1✔
1878
        if success {
2✔
1879
                condition.Status = metav1.ConditionTrue
1✔
1880
        } else {
2✔
1881
                condition.Status = metav1.ConditionFalse
1✔
1882
                condition.Message = message
1✔
1883
        }
1✔
1884
        condition.Type = conditionType
1✔
1885
        condition.Reason = reason
1✔
1886
        condition.LastTransitionTime = metav1.Time{time.Now()}
1✔
1887
        for _, cond := range service.Status.Conditions {
2✔
1888
                if cond.Type == conditionType &&
1✔
1889
                        cond.Status == condition.Status &&
1✔
1890
                        cond.Message == condition.Message &&
1✔
1891
                        cond.Reason == condition.Reason {
2✔
1892
                        return false
1✔
1893
                }
1✔
1894
        }
1895

1896
        service.Status.Conditions = removeAllConditions(service.Status.Conditions, conditionType)
1✔
1897
        service.Status.Conditions = append(service.Status.Conditions, condition)
1✔
1898
        return true
1✔
1899
}
1900

1901
func (cont *AciController) validateRequestedIps(lbIpList []string) (net.IP, net.IP, bool) {
1✔
1902
        var ipv4, ipv6 net.IP
1✔
1903
        for _, lbIp := range lbIpList {
2✔
1904
                ip := net.ParseIP(lbIp)
1✔
1905
                if ip != nil {
2✔
1906
                        if ip.To4() != nil {
2✔
1907
                                if ipv4.Equal(net.IP{}) {
2✔
1908
                                        ipv4 = ip
1✔
1909
                                } else {
2✔
1910
                                        cont.log.Error("Annotation should have only one ipv4")
1✔
1911
                                        return ipv4, ipv6, false
1✔
1912
                                }
1✔
1913
                        } else if ip.To16() != nil {
2✔
1914
                                if ipv6.Equal(net.IP{}) {
2✔
1915
                                        ipv6 = ip
1✔
1916
                                } else {
2✔
1917
                                        cont.log.Error("Annotation should have only one ipv6")
1✔
1918
                                        return ipv4, ipv6, false
1✔
1919
                                }
1✔
1920
                        }
1921
                }
1922
        }
1923
        return ipv4, ipv6, true
1✔
1924
}
1925

1926
func (cont *AciController) returnUnusedStaticIngressIps(staticIngressIps, requestedIps []net.IP) {
1✔
1927
        for _, staticIp := range staticIngressIps {
2✔
1928
                found := false
1✔
1929
                for _, reqIp := range requestedIps {
2✔
1930
                        if reqIp.Equal(staticIp) {
2✔
1931
                                found = true
1✔
1932
                        }
1✔
1933
                }
1934
                if !found {
2✔
1935
                        returnIps(cont.staticServiceIps, []net.IP{staticIp})
1✔
1936
                }
1✔
1937
        }
1938
}
1939

1940
func (cont *AciController) allocateServiceIps(servicekey string,
1941
        service *v1.Service) bool {
1✔
1942
        logger := serviceLogger(cont.log, service)
1✔
1943
        cont.indexMutex.Lock()
1✔
1944
        meta, ok := cont.serviceMetaCache[servicekey]
1✔
1945
        if !ok {
2✔
1946
                meta = &serviceMeta{}
1✔
1947
                cont.serviceMetaCache[servicekey] = meta
1✔
1948

1✔
1949
                // Read any existing IPs and attempt to allocate them to the pod
1✔
1950
                for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
1951
                        ip := net.ParseIP(ingress.IP)
1✔
1952
                        if ip == nil {
1✔
1953
                                continue
×
1954
                        }
1955
                        if ip.To4() != nil {
2✔
1956
                                if cont.serviceIps.GetV4IpCache()[0].RemoveIp(ip) {
2✔
1957
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
1958
                                } else if cont.staticServiceIps.V4.RemoveIp(ip) {
3✔
1959
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
1960
                                }
1✔
1961
                        } else if ip.To16() != nil {
2✔
1962
                                if cont.serviceIps.GetV6IpCache()[0].RemoveIp(ip) {
2✔
1963
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
1964
                                } else if cont.staticServiceIps.V6.RemoveIp(ip) {
3✔
1965
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
1966
                                }
1✔
1967
                        }
1968
                }
1969
        }
1970

1971
        if !cont.serviceSyncEnabled {
2✔
1972
                cont.indexMutex.Unlock()
1✔
1973
                return false
1✔
1974
        }
1✔
1975

1976
        var requestedIps []net.IP
1✔
1977
        // try to give the requested load balancer IP to the pod
1✔
1978
        lbIps, ok := service.ObjectMeta.Annotations[metadata.LbIpAnnotation]
1✔
1979
        if ok {
2✔
1980
                lbIpList := strings.Split(lbIps, ",")
1✔
1981
                ipv4, ipv6, valid := cont.validateRequestedIps(lbIpList)
1✔
1982
                if valid {
2✔
1983
                        if ipv4 != nil {
2✔
1984
                                requestedIps = append(requestedIps, ipv4)
1✔
1985
                        }
1✔
1986
                        if ipv6 != nil {
2✔
1987
                                requestedIps = append(requestedIps, ipv6)
1✔
1988
                        }
1✔
1989
                } else {
1✔
1990
                        cont.returnServiceIps(meta.ingressIps)
1✔
1991
                        cont.log.Error("Invalid LB IP annotation for service ", servicekey)
1✔
1992
                        condUpdated := cont.updateServiceCondition(service, false, "InvalidAnnotation", "Invalid Loadbalancer IP annotation")
1✔
1993
                        if condUpdated {
2✔
1994
                                _, err := cont.updateServiceStatus(service)
1✔
1995
                                if err != nil {
1✔
1996
                                        logger.Error("Failed to update service status : ", err)
×
1997
                                        cont.indexMutex.Unlock()
×
1998
                                        return true
×
1999
                                }
×
2000
                        }
2001
                        cont.indexMutex.Unlock()
1✔
2002
                        return false
1✔
2003
                }
2004
        } else {
1✔
2005
                requestedIp := net.ParseIP(service.Spec.LoadBalancerIP)
1✔
2006
                if requestedIp != nil {
2✔
2007
                        requestedIps = append(requestedIps, requestedIp)
1✔
2008
                }
1✔
2009
        }
2010
        if len(requestedIps) > 0 {
2✔
2011
                meta.requestedIps = []net.IP{}
1✔
2012
                for _, requestedIp := range requestedIps {
2✔
2013
                        hasRequestedIp := false
1✔
2014
                        for _, ip := range meta.staticIngressIps {
2✔
2015
                                if reflect.DeepEqual(requestedIp, ip) {
2✔
2016
                                        hasRequestedIp = true
1✔
2017
                                }
1✔
2018
                        }
2019
                        if !hasRequestedIp {
2✔
2020
                                if requestedIp.To4() != nil &&
1✔
2021
                                        cont.staticServiceIps.V4.RemoveIp(requestedIp) {
2✔
2022
                                        hasRequestedIp = true
1✔
2023
                                } else if requestedIp.To16() != nil &&
2✔
2024
                                        cont.staticServiceIps.V6.RemoveIp(requestedIp) {
2✔
2025
                                        hasRequestedIp = true
1✔
2026
                                }
1✔
2027
                        }
2028
                        if hasRequestedIp {
2✔
2029
                                meta.requestedIps = append(meta.requestedIps, requestedIp)
1✔
2030
                        }
1✔
2031
                }
2032
                cont.returnUnusedStaticIngressIps(meta.staticIngressIps, meta.requestedIps)
1✔
2033
                meta.staticIngressIps = meta.requestedIps
1✔
2034
                cont.returnServiceIps(meta.ingressIps)
1✔
2035
                meta.ingressIps = nil
1✔
2036
                // If no requested ips are allocatable
1✔
2037
                if len(meta.requestedIps) < 1 {
2✔
2038
                        logger.Error("No Requested Ip addresses available for service ", servicekey)
1✔
2039
                        condUpdated := cont.updateServiceCondition(service, false, "RequestedIpsNotAllocatable", "The requested ips for loadbalancer service are not available or not in extern static range")
1✔
2040
                        if condUpdated {
2✔
2041
                                _, err := cont.updateServiceStatus(service)
1✔
2042
                                if err != nil {
1✔
2043
                                        cont.indexMutex.Unlock()
×
2044
                                        logger.Error("Failed to update service status: ", err)
×
2045
                                        return true
×
2046
                                }
×
2047
                        }
2048
                        cont.indexMutex.Unlock()
1✔
2049
                        return false
1✔
2050
                }
2051
        } else if len(meta.requestedIps) > 0 {
1✔
2052
                meta.requestedIps = nil
×
2053
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
×
2054
                meta.staticIngressIps = nil
×
2055
        }
×
2056
        ingressIps := make([]net.IP, 0)
1✔
2057
        ingressIps = append(ingressIps, meta.ingressIps...)
1✔
2058
        ingressIps = append(ingressIps, meta.staticIngressIps...)
1✔
2059

1✔
2060
        var ipv4, ipv6 net.IP
1✔
2061
        for _, ip := range ingressIps {
2✔
2062
                if ip.To4() != nil {
2✔
2063
                        ipv4 = ip
1✔
2064
                } else if ip.To16() != nil {
3✔
2065
                        ipv6 = ip
1✔
2066
                }
1✔
2067
        }
2068
        var clusterIPv4, clusterIPv6 net.IP
1✔
2069
        clusterIPs := append([]string{service.Spec.ClusterIP}, service.Spec.ClusterIPs...)
1✔
2070
        for _, ipStr := range clusterIPs {
2✔
2071
                ip := net.ParseIP(ipStr)
1✔
2072
                if ip == nil {
1✔
2073
                        continue
×
2074
                }
2075
                if ip.To4() != nil && clusterIPv4 == nil {
2✔
2076
                        clusterIPv4 = ip
1✔
2077
                } else if ip.To16() != nil && strings.Contains(ip.String(), ":") && clusterIPv6 == nil {
3✔
2078
                        clusterIPv6 = ip
1✔
2079
                }
1✔
2080
        }
2081
        if clusterIPv4 != nil && ipv4 == nil {
2✔
2082
                if len(requestedIps) < 1 {
2✔
2083
                        ipv4, _ = cont.serviceIps.AllocateIp(true)
1✔
2084
                        if ipv4 != nil {
2✔
2085
                                ingressIps = append(ingressIps, ipv4)
1✔
2086
                        }
1✔
2087
                }
2088
        } else if clusterIPv4 == nil && ipv4 != nil {
1✔
2089
                cont.removeIpFromIngressIPList(&ingressIps, ipv4)
×
2090
        }
×
2091

2092
        if clusterIPv6 != nil && ipv6 == nil {
2✔
2093
                if len(requestedIps) < 1 {
2✔
2094
                        ipv6, _ = cont.serviceIps.AllocateIp(false)
1✔
2095
                        if ipv6 != nil {
2✔
2096
                                ingressIps = append(ingressIps, ipv6)
1✔
2097
                        }
1✔
2098
                }
2099
        } else if clusterIPv6 == nil && ipv6 != nil {
1✔
2100
                cont.removeIpFromIngressIPList(&ingressIps, ipv6)
×
2101
        }
×
2102

2103
        if len(requestedIps) < 1 {
2✔
2104
                meta.ingressIps = ingressIps
1✔
2105
        }
1✔
2106
        if ipv4 == nil && ipv6 == nil {
2✔
2107
                logger.Error("No IP addresses available for service")
1✔
2108
                cont.indexMutex.Unlock()
1✔
2109
                return true
1✔
2110
        }
1✔
2111
        cont.indexMutex.Unlock()
1✔
2112
        var newIngress []v1.LoadBalancerIngress
1✔
2113
        for _, ip := range meta.ingressIps {
2✔
2114
                newIngress = append(newIngress, v1.LoadBalancerIngress{IP: ip.String()})
1✔
2115
        }
1✔
2116
        for _, ip := range meta.staticIngressIps {
2✔
2117
                newIngress = append(newIngress, v1.LoadBalancerIngress{IP: ip.String()})
1✔
2118
        }
1✔
2119

2120
        ipUpdated := false
1✔
2121
        if !reflect.DeepEqual(newIngress, service.Status.LoadBalancer.Ingress) {
2✔
2122
                service.Status.LoadBalancer.Ingress = newIngress
1✔
2123

1✔
2124
                logger.WithFields(logrus.Fields{
1✔
2125
                        "status": service.Status.LoadBalancer.Ingress,
1✔
2126
                }).Info("Updating service load balancer status")
1✔
2127

1✔
2128
                ipUpdated = true
1✔
2129
        }
1✔
2130

2131
        success := true
1✔
2132
        reason := "Success"
1✔
2133
        message := ""
1✔
2134
        if len(requestedIps) > 0 && len(requestedIps) != len(meta.staticIngressIps) {
1✔
2135
                success = false
×
2136
                reason = "OneIpNotAllocatable"
×
2137
                message = "One of the requested Ips is not allocatable"
×
2138
        }
×
2139
        condUpdated := cont.updateServiceCondition(service, success, reason, message)
1✔
2140
        if ipUpdated || condUpdated {
2✔
2141
                _, err := cont.updateServiceStatus(service)
1✔
2142
                if err != nil {
1✔
2143
                        logger.Error("Failed to update service status: ", err)
×
2144
                        return true
×
2145
                }
×
2146
        }
2147
        return false
1✔
2148
}
2149

2150
func (cont *AciController) handleServiceDelete(servicekey string) bool {
1✔
2151
        if cont.isCNOEnabled() {
1✔
2152
                return false
×
2153
        }
×
2154
        cont.clearLbService(servicekey)
1✔
2155
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("service-vmm",
1✔
2156
                servicekey))
1✔
2157
        return false
1✔
2158
}
2159

2160
func (cont *AciController) handleServiceUpdate(service *v1.Service) bool {
1✔
2161
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2162
        if err != nil {
1✔
2163
                serviceLogger(cont.log, service).
×
2164
                        Error("Could not create service key: ", err)
×
2165
                return false
×
2166
        }
×
2167
        if cont.isCNOEnabled() {
1✔
2168
                return false
×
2169
        }
×
2170
        var requeue bool
1✔
2171
        isLoadBalancer := service.Spec.Type == v1.ServiceTypeLoadBalancer
1✔
2172
        if isLoadBalancer {
2✔
2173
                if *cont.config.AllocateServiceIps {
2✔
2174
                        requeue = cont.allocateServiceIps(servicekey, service)
1✔
2175
                }
1✔
2176
                cont.indexMutex.Lock()
1✔
2177
                if cont.serviceSyncEnabled {
2✔
2178
                        cont.indexMutex.Unlock()
1✔
2179
                        err = cont.updateServiceDeviceInstance(servicekey, service)
1✔
2180
                        if err != nil {
1✔
2181
                                serviceLogger(cont.log, service).
×
2182
                                        Error("Failed to update service device Instance: ", err)
×
2183
                                return true
×
2184
                        }
×
2185
                } else {
1✔
2186
                        cont.indexMutex.Unlock()
1✔
2187
                }
1✔
2188
        } else {
1✔
2189
                cont.clearLbService(servicekey)
1✔
2190
        }
1✔
2191
        cont.writeApicSvc(servicekey, service)
1✔
2192
        return requeue
1✔
2193
}
2194

2195
func (cont *AciController) clearLbService(servicekey string) {
1✔
2196
        cont.indexMutex.Lock()
1✔
2197
        if meta, ok := cont.serviceMetaCache[servicekey]; ok {
2✔
2198
                cont.returnServiceIps(meta.ingressIps)
1✔
2199
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
1✔
2200
                delete(cont.serviceMetaCache, servicekey)
1✔
2201
        }
1✔
2202
        cont.indexMutex.Unlock()
1✔
2203
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("svc", servicekey))
1✔
2204
}
2205

2206
func getEndpointsIps(endpoints *v1.Endpoints) map[string]bool {
1✔
2207
        ips := make(map[string]bool)
1✔
2208
        for _, subset := range endpoints.Subsets {
2✔
2209
                for _, addr := range subset.Addresses {
2✔
2210
                        ips[addr.IP] = true
1✔
2211
                }
1✔
2212
                for _, addr := range subset.NotReadyAddresses {
1✔
2213
                        ips[addr.IP] = true
×
2214
                }
×
2215
        }
2216
        return ips
1✔
2217
}
2218

2219
func getServiceTargetPorts(service *v1.Service) map[string]targetPort {
1✔
2220
        ports := make(map[string]targetPort)
1✔
2221
        for _, port := range service.Spec.Ports {
2✔
2222
                portNum := port.TargetPort.IntValue()
1✔
2223
                if portNum <= 0 {
2✔
2224
                        portNum = int(port.Port)
1✔
2225
                }
1✔
2226
                key := portProto(&port.Protocol) + "-num-" + strconv.Itoa(portNum)
1✔
2227
                ports[key] = targetPort{
1✔
2228
                        proto: port.Protocol,
1✔
2229
                        ports: []int{portNum},
1✔
2230
                }
1✔
2231
        }
2232
        return ports
1✔
2233
}
2234

2235
func (cont *AciController) endpointsAdded(obj interface{}) {
1✔
2236
        endpoints := obj.(*v1.Endpoints)
1✔
2237
        servicekey, err := cache.MetaNamespaceKeyFunc(obj.(*v1.Endpoints))
1✔
2238
        if err != nil {
1✔
2239
                cont.log.Error("Could not create service key: ", err)
×
2240
                return
×
2241
        }
×
2242

2243
        ips := getEndpointsIps(endpoints)
1✔
2244
        cont.indexMutex.Lock()
1✔
2245
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
2246
        cont.queueIPNetPolUpdates(ips)
1✔
2247
        cont.indexMutex.Unlock()
1✔
2248

1✔
2249
        cont.queueEndpointsNetPolUpdates(endpoints)
1✔
2250

1✔
2251
        cont.queueServiceUpdateByKey(servicekey)
1✔
2252
}
2253

2254
func (cont *AciController) endpointsDeleted(obj interface{}) {
1✔
2255
        endpoints, isEndpoints := obj.(*v1.Endpoints)
1✔
2256
        if !isEndpoints {
1✔
2257
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2258
                if !ok {
×
2259
                        cont.log.Error("Received unexpected object: ", obj)
×
2260
                        return
×
2261
                }
×
2262
                endpoints, ok = deletedState.Obj.(*v1.Endpoints)
×
2263
                if !ok {
×
2264
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpoints object: ", deletedState.Obj)
×
2265
                        return
×
2266
                }
×
2267
        }
2268
        servicekey, err := cache.MetaNamespaceKeyFunc(endpoints)
1✔
2269
        if err != nil {
1✔
2270
                cont.log.Error("Could not create service key: ", err)
×
2271
                return
×
2272
        }
×
2273

2274
        ips := getEndpointsIps(endpoints)
1✔
2275
        cont.indexMutex.Lock()
1✔
2276
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
2277
        cont.queueIPNetPolUpdates(ips)
1✔
2278
        cont.indexMutex.Unlock()
1✔
2279

1✔
2280
        cont.queueEndpointsNetPolUpdates(endpoints)
1✔
2281

1✔
2282
        cont.queueServiceUpdateByKey(servicekey)
1✔
2283
}
2284

2285
func (cont *AciController) endpointsUpdated(oldEps, newEps interface{}) {
1✔
2286
        oldendpoints := oldEps.(*v1.Endpoints)
1✔
2287
        newendpoints := newEps.(*v1.Endpoints)
1✔
2288
        servicekey, err := cache.MetaNamespaceKeyFunc(newendpoints)
1✔
2289
        if err != nil {
1✔
2290
                cont.log.Error("Could not create service key: ", err)
×
2291
                return
×
2292
        }
×
2293

2294
        oldIps := getEndpointsIps(oldendpoints)
1✔
2295
        newIps := getEndpointsIps(newendpoints)
1✔
2296
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
2297
                cont.indexMutex.Lock()
1✔
2298
                cont.queueIPNetPolUpdates(oldIps)
1✔
2299
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
2300
                cont.queueIPNetPolUpdates(newIps)
1✔
2301
                cont.indexMutex.Unlock()
1✔
2302
        }
1✔
2303

2304
        if !reflect.DeepEqual(oldendpoints.Subsets, newendpoints.Subsets) {
2✔
2305
                cont.queueEndpointsNetPolUpdates(oldendpoints)
1✔
2306
                cont.queueEndpointsNetPolUpdates(newendpoints)
1✔
2307
        }
1✔
2308

2309
        cont.queueServiceUpdateByKey(servicekey)
1✔
2310
}
2311

2312
func (cont *AciController) serviceAdded(obj interface{}) {
1✔
2313
        service := obj.(*v1.Service)
1✔
2314
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2315
        if err != nil {
1✔
2316
                serviceLogger(cont.log, service).
×
2317
                        Error("Could not create service key: ", err)
×
2318
                return
×
2319
        }
×
2320

2321
        ports := getServiceTargetPorts(service)
1✔
2322
        cont.indexMutex.Lock()
1✔
2323
        cont.queuePortNetPolUpdates(ports)
1✔
2324
        cont.updateTargetPortIndex(true, servicekey, nil, ports)
1✔
2325
        cont.indexMutex.Unlock()
1✔
2326

1✔
2327
        cont.queueServiceUpdateByKey(servicekey)
1✔
2328
}
2329

2330
func (cont *AciController) serviceUpdated(oldSvc, newSvc interface{}) {
1✔
2331
        oldservice := oldSvc.(*v1.Service)
1✔
2332
        newservice := newSvc.(*v1.Service)
1✔
2333
        servicekey, err := cache.MetaNamespaceKeyFunc(newservice)
1✔
2334
        if err != nil {
1✔
2335
                serviceLogger(cont.log, newservice).
×
2336
                        Error("Could not create service key: ", err)
×
2337
                return
×
2338
        }
×
2339
        oldPorts := getServiceTargetPorts(oldservice)
1✔
2340
        newPorts := getServiceTargetPorts(newservice)
1✔
2341
        if !reflect.DeepEqual(oldPorts, newPorts) {
1✔
2342
                cont.indexMutex.Lock()
×
2343
                cont.queuePortNetPolUpdates(oldPorts)
×
2344
                cont.updateTargetPortIndex(true, servicekey, oldPorts, newPorts)
×
2345
                cont.queuePortNetPolUpdates(newPorts)
×
2346
                cont.indexMutex.Unlock()
×
2347
        }
×
2348
        cont.queueServiceUpdateByKey(servicekey)
1✔
2349
}
2350

2351
func (cont *AciController) serviceDeleted(obj interface{}) {
1✔
2352
        service, isService := obj.(*v1.Service)
1✔
2353
        if !isService {
1✔
2354
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2355
                if !ok {
×
2356
                        serviceLogger(cont.log, service).
×
2357
                                Error("Received unexpected object: ", obj)
×
2358
                        return
×
2359
                }
×
2360
                service, ok = deletedState.Obj.(*v1.Service)
×
2361
                if !ok {
×
2362
                        serviceLogger(cont.log, service).
×
2363
                                Error("DeletedFinalStateUnknown contained non-Services object: ", deletedState.Obj)
×
2364
                        return
×
2365
                }
×
2366
        }
2367
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2368
        if err != nil {
1✔
2369
                serviceLogger(cont.log, service).
×
2370
                        Error("Could not create service key: ", err)
×
2371
                return
×
2372
        }
×
2373

2374
        ports := getServiceTargetPorts(service)
1✔
2375
        cont.indexMutex.Lock()
1✔
2376
        cont.updateTargetPortIndex(true, servicekey, ports, nil)
1✔
2377
        cont.queuePortNetPolUpdates(ports)
1✔
2378
        delete(cont.snatServices, servicekey)
1✔
2379
        cont.indexMutex.Unlock()
1✔
2380

1✔
2381
        deletedServiceKey := "DELETED_" + servicekey
1✔
2382
        cont.queueServiceUpdateByKey(deletedServiceKey)
1✔
2383
}
2384

2385
func (cont *AciController) serviceFullSync() {
1✔
2386
        cache.ListAll(cont.serviceIndexer, labels.Everything(),
1✔
2387
                func(sobj interface{}) {
2✔
2388
                        cont.queueServiceUpdate(sobj.(*v1.Service))
1✔
2389
                })
1✔
2390
}
2391

2392
func (cont *AciController) getEndpointSliceIps(endpointSlice *discovery.EndpointSlice) map[string]bool {
1✔
2393
        ips := make(map[string]bool)
1✔
2394
        for _, endpoints := range endpointSlice.Endpoints {
2✔
2395
                for _, addr := range endpoints.Addresses {
2✔
2396
                        ips[addr] = true
1✔
2397
                }
1✔
2398
        }
2399
        return ips
1✔
2400
}
2401

2402
func (cont *AciController) notReadyEndpointPresent(endpointSlice *discovery.EndpointSlice) bool {
×
2403
        for _, endpoints := range endpointSlice.Endpoints {
×
2404
                if (endpoints.Conditions.Ready != nil && !*endpoints.Conditions.Ready) &&
×
2405
                        (endpoints.Conditions.Terminating == nil || !*endpoints.Conditions.Terminating) {
×
2406
                        return true
×
2407
                }
×
2408
        }
2409
        return false
×
2410
}
2411

2412
func (cont *AciController) getEndpointSliceEpIps(endpoints *discovery.Endpoint) map[string]bool {
×
2413
        ips := make(map[string]bool)
×
2414
        for _, addr := range endpoints.Addresses {
×
2415
                ips[addr] = true
×
2416
        }
×
2417
        return ips
×
2418
}
2419

2420
func (cont *AciController) processDelayedEpSlices() {
1✔
2421
        var processEps []DelayedEpSlice
1✔
2422
        cont.indexMutex.Lock()
1✔
2423
        for i := 0; i < len(cont.delayedEpSlices); i++ {
1✔
2424
                delayedepslice := cont.delayedEpSlices[i]
×
2425
                if time.Now().After(delayedepslice.DelayedTime) {
×
2426
                        var toprocess DelayedEpSlice
×
2427
                        err := util.DeepCopyObj(&delayedepslice, &toprocess)
×
2428
                        if err != nil {
×
2429
                                cont.log.Error(err)
×
2430
                                continue
×
2431
                        }
2432
                        processEps = append(processEps, toprocess)
×
2433
                        cont.delayedEpSlices = append(cont.delayedEpSlices[:i], cont.delayedEpSlices[i+1:]...)
×
2434
                }
2435
        }
2436

2437
        cont.indexMutex.Unlock()
1✔
2438
        for _, epslice := range processEps {
1✔
2439
                //ignore the epslice if newly added endpoint is not ready
×
2440
                if cont.notReadyEndpointPresent(epslice.NewEpSlice) {
×
2441
                        cont.log.Debug("Ignoring the update as the new endpoint is not ready : ", epslice.NewEpSlice)
×
2442
                } else {
×
2443
                        cont.log.Debug("Processing update of epslice : ", epslice.NewEpSlice)
×
2444
                        cont.doendpointSliceUpdated(epslice.OldEpSlice, epslice.NewEpSlice)
×
2445
                }
×
2446
        }
2447
}
2448

2449
func (cont *AciController) endpointSliceAdded(obj interface{}) {
1✔
2450
        endpointslice, ok := obj.(*discovery.EndpointSlice)
1✔
2451
        if !ok {
1✔
2452
                cont.log.Error("error processing Endpointslice object: ", obj)
×
2453
                return
×
2454
        }
×
2455
        servicekey, valid := getServiceKey(endpointslice)
1✔
2456
        if !valid {
1✔
2457
                return
×
2458
        }
×
2459
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
2460
        cont.indexMutex.Lock()
1✔
2461
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
2462
        cont.queueIPNetPolUpdates(ips)
1✔
2463
        cont.indexMutex.Unlock()
1✔
2464

1✔
2465
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
2466

1✔
2467
        cont.queueServiceUpdateByKey(servicekey)
1✔
2468
        cont.log.Info("EndPointSlice Object Added: ", servicekey)
1✔
2469
}
2470

2471
func (cont *AciController) endpointSliceDeleted(obj interface{}) {
1✔
2472
        endpointslice, isEndpointslice := obj.(*discovery.EndpointSlice)
1✔
2473
        if !isEndpointslice {
1✔
2474
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2475
                if !ok {
×
2476
                        cont.log.Error("Received unexpected object: ", obj)
×
2477
                        return
×
2478
                }
×
2479
                endpointslice, ok = deletedState.Obj.(*discovery.EndpointSlice)
×
2480
                if !ok {
×
2481
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpointslice object: ", deletedState.Obj)
×
2482
                        return
×
2483
                }
×
2484
        }
2485
        servicekey, valid := getServiceKey(endpointslice)
1✔
2486
        if !valid {
1✔
2487
                return
×
2488
        }
×
2489
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
2490
        cont.indexMutex.Lock()
1✔
2491
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
2492
        cont.queueIPNetPolUpdates(ips)
1✔
2493
        cont.indexMutex.Unlock()
1✔
2494
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
2495
        cont.queueServiceUpdateByKey(servicekey)
1✔
2496
}
2497

2498
// Checks if the given service is present in the user configured list of services
2499
// for pbr delay and if present, returns the servie specific delay if configured
2500
func (cont *AciController) svcInAddDelayList(name, ns string) (int, bool) {
×
2501
        for _, svc := range cont.config.ServiceGraphEndpointAddDelay.Services {
×
2502
                if svc.Name == name && svc.Namespace == ns {
×
2503
                        return svc.Delay, true
×
2504
                }
×
2505
        }
2506
        return 0, false
×
2507
}
2508

2509
// Check if the endpointslice update notification has any deletion of enpoint
2510
func (cont *AciController) isDeleteEndpointSlice(oldendpointslice, newendpointslice *discovery.EndpointSlice) bool {
×
2511
        del := false
×
2512

×
2513
        // if any endpoint is removed from endpontslice
×
2514
        if len(newendpointslice.Endpoints) < len(oldendpointslice.Endpoints) {
×
2515
                del = true
×
2516
        }
×
2517

2518
        if !del {
×
2519
                // if any one of the endpoint is in terminating state
×
2520
                for _, endpoint := range newendpointslice.Endpoints {
×
2521
                        if endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating {
×
2522
                                del = true
×
2523
                                break
×
2524
                        }
2525
                }
2526
        }
2527
        if !del {
×
2528
                // if any one of endpoint moved from ready state to not-ready state
×
2529
                for ix := range oldendpointslice.Endpoints {
×
2530
                        oldips := cont.getEndpointSliceEpIps(&oldendpointslice.Endpoints[ix])
×
2531
                        for newIx := range newendpointslice.Endpoints {
×
2532
                                newips := cont.getEndpointSliceEpIps(&newendpointslice.Endpoints[newIx])
×
2533
                                if reflect.DeepEqual(oldips, newips) {
×
2534
                                        if (oldendpointslice.Endpoints[ix].Conditions.Ready != nil && *oldendpointslice.Endpoints[ix].Conditions.Ready) &&
×
2535
                                                (newendpointslice.Endpoints[newIx].Conditions.Ready != nil && !*newendpointslice.Endpoints[newIx].Conditions.Ready) {
×
2536
                                                del = true
×
2537
                                        }
×
2538
                                        break
×
2539
                                }
2540
                        }
2541
                }
2542
        }
2543
        return del
×
2544
}
2545

2546
func (cont *AciController) doendpointSliceUpdatedDelay(oldendpointslice *discovery.EndpointSlice,
2547
        newendpointslice *discovery.EndpointSlice) {
×
2548
        svc, ns, valid := getServiceNameAndNs(newendpointslice)
×
2549
        if !valid {
×
2550
                return
×
2551
        }
×
2552
        svckey, valid := getServiceKey(newendpointslice)
×
2553
        if !valid {
×
2554
                return
×
2555
        }
×
2556
        delay := cont.config.ServiceGraphEndpointAddDelay.Delay
×
2557
        svcDelay, exists := cont.svcInAddDelayList(svc, ns)
×
2558
        if svcDelay > 0 {
×
2559
                delay = svcDelay
×
2560
        }
×
2561
        delayedsvc := exists && delay > 0
×
2562
        if delayedsvc {
×
2563
                cont.log.Debug("Delay of ", delay, " seconds is applicable for svc :", svc, " in ns: ", ns)
×
2564
                var delayedepslice DelayedEpSlice
×
2565
                delayedepslice.OldEpSlice = oldendpointslice
×
2566
                delayedepslice.ServiceKey = svckey
×
2567
                delayedepslice.NewEpSlice = newendpointslice
×
2568
                currentTime := time.Now()
×
2569
                delayedepslice.DelayedTime = currentTime.Add(time.Duration(delay) * time.Second)
×
2570
                cont.indexMutex.Lock()
×
2571
                cont.delayedEpSlices = append(cont.delayedEpSlices, &delayedepslice)
×
2572
                cont.indexMutex.Unlock()
×
2573
        } else {
×
2574
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
2575
        }
×
2576

2577
        if delayedsvc && cont.isDeleteEndpointSlice(oldendpointslice, newendpointslice) {
×
2578
                cont.log.Debug("Proceeding by ignoring delay as the update is due to delete of endpoint")
×
2579
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
2580
        }
×
2581
}
2582

2583
func (cont *AciController) endpointSliceUpdated(oldobj, newobj interface{}) {
1✔
2584
        oldendpointslice, ok := oldobj.(*discovery.EndpointSlice)
1✔
2585
        if !ok {
1✔
2586
                cont.log.Error("error processing Endpointslice object: ", oldobj)
×
2587
                return
×
2588
        }
×
2589
        newendpointslice, ok := newobj.(*discovery.EndpointSlice)
1✔
2590
        if !ok {
1✔
2591
                cont.log.Error("error processing Endpointslice object: ", newobj)
×
2592
                return
×
2593
        }
×
2594
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
2595
                cont.doendpointSliceUpdatedDelay(oldendpointslice, newendpointslice)
×
2596
        } else {
1✔
2597
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
1✔
2598
        }
1✔
2599
}
2600

2601
func (cont *AciController) doendpointSliceUpdated(oldendpointslice *discovery.EndpointSlice,
2602
        newendpointslice *discovery.EndpointSlice) {
1✔
2603
        servicekey, valid := getServiceKey(newendpointslice)
1✔
2604
        if !valid {
1✔
2605
                return
×
2606
        }
×
2607
        oldIps := cont.getEndpointSliceIps(oldendpointslice)
1✔
2608
        newIps := cont.getEndpointSliceIps(newendpointslice)
1✔
2609
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
2610
                cont.indexMutex.Lock()
1✔
2611
                cont.queueIPNetPolUpdates(oldIps)
1✔
2612
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
2613
                cont.queueIPNetPolUpdates(newIps)
1✔
2614
                cont.indexMutex.Unlock()
1✔
2615
        }
1✔
2616

2617
        if !reflect.DeepEqual(oldendpointslice.Endpoints, newendpointslice.Endpoints) {
2✔
2618
                cont.queueEndpointSliceNetPolUpdates(oldendpointslice)
1✔
2619
                cont.queueEndpointSliceNetPolUpdates(newendpointslice)
1✔
2620
        }
1✔
2621
        cont.log.Debug("EndPointSlice Object Update: ", servicekey)
1✔
2622
        cont.queueServiceUpdateByKey(servicekey)
1✔
2623
}
2624

2625
func (cont *AciController) queueEndpointSliceNetPolUpdates(endpointslice *discovery.EndpointSlice) {
1✔
2626
        for _, endpoint := range endpointslice.Endpoints {
2✔
2627
                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" ||
1✔
2628
                        endpoint.TargetRef.Namespace == "" || endpoint.TargetRef.Name == "" {
2✔
2629
                        continue
1✔
2630
                }
2631
                if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
1✔
2632
                        continue
×
2633
                }
2634
                podkey := endpoint.TargetRef.Namespace + "/" + endpoint.TargetRef.Name
1✔
2635
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
2636
                ps := make(map[string]bool)
1✔
2637
                for _, npkey := range npkeys {
2✔
2638
                        cont.queueNetPolUpdateByKey(npkey)
1✔
2639
                }
1✔
2640
                // Process if the  any matching namedport wildcard policy is present
2641
                // ignore np already processed policies
2642
                cont.queueMatchingNamedNp(ps, podkey)
1✔
2643
        }
2644
}
2645

2646
func getServiceKey(endPointSlice *discovery.EndpointSlice) (string, bool) {
1✔
2647
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
1✔
2648
        if !ok {
1✔
2649
                return "", false
×
2650
        }
×
2651
        return endPointSlice.ObjectMeta.Namespace + "/" + serviceName, true
1✔
2652
}
2653

2654
func getServiceNameAndNs(endPointSlice *discovery.EndpointSlice) (string, string, bool) {
×
2655
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
×
2656
        if !ok {
×
2657
                return "", "", false
×
2658
        }
×
2659
        return serviceName, endPointSlice.ObjectMeta.Namespace, true
×
2660
}
2661

2662
// can be called with index lock
2663
func (sep *serviceEndpoint) UpdateServicesForNode(nodename string) {
1✔
2664
        cont := sep.cont
1✔
2665
        cache.ListAll(cont.endpointsIndexer, labels.Everything(),
1✔
2666
                func(endpointsobj interface{}) {
2✔
2667
                        endpoints := endpointsobj.(*v1.Endpoints)
1✔
2668
                        for _, subset := range endpoints.Subsets {
2✔
2669
                                for _, addr := range subset.Addresses {
2✔
2670
                                        if addr.NodeName != nil && *addr.NodeName == nodename {
2✔
2671
                                                servicekey, err :=
1✔
2672
                                                        cache.MetaNamespaceKeyFunc(endpointsobj.(*v1.Endpoints))
1✔
2673
                                                if err != nil {
1✔
2674
                                                        cont.log.Error("Could not create endpoints key: ", err)
×
2675
                                                        return
×
2676
                                                }
×
2677
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
2678
                                                return
1✔
2679
                                        }
2680
                                }
2681
                        }
2682
                })
2683
}
2684

2685
func (seps *serviceEndpointSlice) UpdateServicesForNode(nodename string) {
1✔
2686
        // 1. List all the endpointslice and check for matching nodename
1✔
2687
        // 2. if it matches trigger the Service update and mark it visited
1✔
2688
        cont := seps.cont
1✔
2689
        visited := make(map[string]bool)
1✔
2690
        cache.ListAll(cont.endpointSliceIndexer, labels.Everything(),
1✔
2691
                func(endpointSliceobj interface{}) {
2✔
2692
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2693
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2694
                                if endpoint.NodeName != nil && *endpoint.NodeName == nodename {
2✔
2695
                                        servicekey, valid := getServiceKey(endpointSlices)
1✔
2696
                                        if !valid {
1✔
2697
                                                return
×
2698
                                        }
×
2699
                                        if _, ok := visited[servicekey]; !ok {
2✔
2700
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
2701
                                                visited[servicekey] = true
1✔
2702
                                                return
1✔
2703
                                        }
1✔
2704
                                }
2705
                        }
2706
                })
2707
}
2708
func (cont *AciController) setNodeMap(nodeMap map[string]*metadata.ServiceEndpoint, nodeName string) {
1✔
2709
        nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
2710
        if !ok {
1✔
2711
                return
×
2712
        }
×
2713
        _, ok = cont.fabricPathForNode(nodeName)
1✔
2714
        if !ok {
2✔
2715
                return
1✔
2716
        }
1✔
2717
        nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
2718
}
2719

2720
// 2 cases when epslices corresponding to given service is presnt in delayedEpSlices:
2721
//  1. endpoint not present in delayedEpSlices of the service
2722
//  2. endpoint present in delayedEpSlices of the service but in not ready state
2723
//
2724
// indexMutex lock must be acquired before calling the function
2725
func (cont *AciController) isDelayedEndpoint(endpoint *discovery.Endpoint, svckey string) bool {
×
2726
        delayed := false
×
2727
        endpointips := cont.getEndpointSliceEpIps(endpoint)
×
2728
        for _, delayedepslices := range cont.delayedEpSlices {
×
2729
                if delayedepslices.ServiceKey == svckey {
×
2730
                        var found bool
×
2731
                        epslice := delayedepslices.OldEpSlice
×
2732
                        for ix := range epslice.Endpoints {
×
2733
                                epips := cont.getEndpointSliceEpIps(&epslice.Endpoints[ix])
×
2734
                                if reflect.DeepEqual(endpointips, epips) {
×
2735
                                        // case 2
×
2736
                                        if epslice.Endpoints[ix].Conditions.Ready != nil && !*epslice.Endpoints[ix].Conditions.Ready {
×
2737
                                                delayed = true
×
2738
                                        }
×
2739
                                        found = true
×
2740
                                }
2741
                        }
2742
                        // case 1
2743
                        if !found {
×
2744
                                delayed = true
×
2745
                        }
×
2746
                }
2747
        }
2748
        return delayed
×
2749
}
2750

2751
// set nodemap only if endoint is ready and not in delayedEpSlices
2752
func (cont *AciController) setNodeMapDelay(nodeMap map[string]*metadata.ServiceEndpoint,
2753
        endpoint *discovery.Endpoint, service *v1.Service) {
×
2754
        svckey, err := cache.MetaNamespaceKeyFunc(service)
×
2755
        if err != nil {
×
2756
                cont.log.Error("Could not create service key: ", err)
×
2757
                return
×
2758
        }
×
2759
        if cont.config.NoWaitForServiceEpReadiness ||
×
2760
                (endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready) {
×
2761
                if endpoint.NodeName != nil && *endpoint.NodeName != "" {
×
2762
                        // donot setNodeMap for endpoint if:
×
2763
                        //   endpoint is newly added
×
2764
                        //   endpoint status changed from not ready to ready
×
2765
                        if !cont.isDelayedEndpoint(endpoint, svckey) {
×
2766
                                cont.setNodeMap(nodeMap, *endpoint.NodeName)
×
2767
                        }
×
2768
                }
2769
        }
2770
}
2771

2772
func (sep *serviceEndpoint) GetnodesMetadata(key string,
2773
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
2774
        cont := sep.cont
1✔
2775
        endpointsobj, exists, err := cont.endpointsIndexer.GetByKey(key)
1✔
2776
        if err != nil {
1✔
2777
                cont.log.Error("Could not lookup endpoints for " +
×
2778
                        key + ": " + err.Error())
×
2779
        }
×
2780
        if exists && endpointsobj != nil {
2✔
2781
                endpoints := endpointsobj.(*v1.Endpoints)
1✔
2782
                for _, subset := range endpoints.Subsets {
2✔
2783
                        for _, addr := range subset.Addresses {
2✔
2784
                                if addr.NodeName == nil {
2✔
2785
                                        continue
1✔
2786
                                }
2787
                                cont.setNodeMap(nodeMap, *addr.NodeName)
1✔
2788
                        }
2789
                }
2790
        }
2791
        cont.log.Info("NodeMap: ", nodeMap)
1✔
2792
}
2793

2794
func (seps *serviceEndpointSlice) GetnodesMetadata(key string,
2795
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
2796
        cont := seps.cont
1✔
2797
        // 1. Get all the Endpoint slices matching the label service-name
1✔
2798
        // 2. update the node map matching with endpoints nodes name
1✔
2799
        label := map[string]string{discovery.LabelServiceName: service.ObjectMeta.Name}
1✔
2800
        selector := labels.SelectorFromSet(label)
1✔
2801
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2802
                func(endpointSliceobj interface{}) {
2✔
2803
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2804
                        for ix := range endpointSlices.Endpoints {
2✔
2805
                                if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
2806
                                        cont.setNodeMapDelay(nodeMap, &endpointSlices.Endpoints[ix], service)
×
2807
                                } else if cont.config.NoWaitForServiceEpReadiness ||
1✔
2808
                                        (endpointSlices.Endpoints[ix].Conditions.Ready != nil && *endpointSlices.Endpoints[ix].Conditions.Ready) {
2✔
2809
                                        if endpointSlices.Endpoints[ix].NodeName != nil && *endpointSlices.Endpoints[ix].NodeName != "" {
2✔
2810
                                                cont.setNodeMap(nodeMap, *endpointSlices.Endpoints[ix].NodeName)
1✔
2811
                                        }
1✔
2812
                                }
2813
                        }
2814
                })
2815
        cont.log.Debug("NodeMap: ", nodeMap)
1✔
2816
}
2817

2818
func (sep *serviceEndpoint) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2819
        cont := sep.cont
1✔
2820
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
2821
        if err != nil {
1✔
2822
                serviceLogger(cont.log, service).
×
2823
                        Error("Could not create service key: ", err)
×
2824
                return false
×
2825
        }
×
2826
        endpointsobj, _, err := cont.endpointsIndexer.GetByKey(key)
1✔
2827
        if err != nil {
1✔
2828
                cont.log.Error("Could not lookup endpoints for " +
×
2829
                        key + ": " + err.Error())
×
2830
                return false
×
2831
        }
×
2832
        if endpointsobj != nil {
2✔
2833
                for _, subset := range endpointsobj.(*v1.Endpoints).Subsets {
2✔
2834
                        for _, addr := range subset.Addresses {
2✔
2835
                                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" {
1✔
2836
                                        continue
×
2837
                                }
2838
                                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(),
1✔
2839
                                        addr.TargetRef.Name))
1✔
2840
                        }
2841
                }
2842
        }
2843
        return true
1✔
2844
}
2845

2846
func (seps *serviceEndpointSlice) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2847
        cont := seps.cont
1✔
2848
        label := map[string]string{discovery.LabelServiceName: service.ObjectMeta.Name}
1✔
2849
        selector := labels.SelectorFromSet(label)
1✔
2850
        epcount := 0
1✔
2851
        childs := make(map[string]struct{})
1✔
2852
        var exists = struct{}{}
1✔
2853
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2854
                func(endpointSliceobj interface{}) {
2✔
2855
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2856
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2857
                                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" {
1✔
2858
                                        continue
×
2859
                                }
2860
                                epcount++
1✔
2861
                                childs[endpoint.TargetRef.Name] = exists
1✔
2862
                                cont.log.Debug("EndPoint added: ", endpoint.TargetRef.Name)
1✔
2863
                        }
2864
                })
2865
        for child := range childs {
2✔
2866
                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(), child))
1✔
2867
        }
1✔
2868
        return epcount != 0
1✔
2869
}
2870

2871
func getProtocolStr(proto v1.Protocol) string {
1✔
2872
        var protostring string
1✔
2873
        switch proto {
1✔
2874
        case v1.ProtocolUDP:
1✔
2875
                protostring = "udp"
1✔
2876
        case v1.ProtocolTCP:
1✔
2877
                protostring = "tcp"
1✔
2878
        case v1.ProtocolSCTP:
×
2879
                protostring = "sctp"
×
2880
        default:
×
2881
                protostring = "tcp"
×
2882
        }
2883
        return protostring
1✔
2884
}
2885

2886
func (cont *AciController) removeIpFromIngressIPList(ingressIps *[]net.IP, ip net.IP) {
×
2887
        cont.returnServiceIps([]net.IP{ip})
×
2888
        index := -1
×
2889
        for i, v := range *ingressIps {
×
2890
                if v.Equal(ip) {
×
2891
                        index = i
×
2892
                        break
×
2893
                }
2894
        }
2895
        if index == -1 {
×
2896
                return
×
2897
        }
×
2898
        *ingressIps = append((*ingressIps)[:index], (*ingressIps)[index+1:]...)
×
2899
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc