• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 11974

22 May 2026 08:25AM UTC coverage: 63.735% (+0.6%) from 63.185%
11974

Pull #1707

travis-pro

jeffinkottaram
Fix over-permissive named port resolution in NetworkPolicy

Named ports in NetworkPolicies were resolved too broadly, creating a
security gap where traffic to unintended ports was allowed.

When a named port (e.g., "http") resolved to different container port
numbers on different pods (e.g., 9090 on pod A, 8080 on pod B), the
generated rules allowed traffic to ALL resolved port numbers on ALL
matching pods — even when only a specific pod defined the named port
at that number. This meant any pod defining the same port name at a
different number would widen the policy for every other pod.

The service augment path had a related issue where it did not verify
that the service owning a target port mapping was the one matched by
the policy, further contributing to over-matching.

Changes:
- Unified peer and port resolution into a single per-pod pass so each
  rule carries only the IPs of pods that resolve the named port to a
  specific number (per-destination-IP scoping)
- Added support for ingress named ports with empty PodSelectors, which
  were previously skipped with a warning
- Guarded service augment port matching against the service key that
  registered the port mapping
- Moved pod informer startup before endpointSlice informer in
  PrepareRun so that podIndexer is populated when epSlice List fires
  during controller startup, preventing named-port resolution misses
- Removed redundant HasSynced entries from the final WaitForCacheSync
  (namespace and pod informers already synced earlier in the sequence)

Test changes:
- Added waitForPodIndexed helper to enforce the real K8s invariant
  (pods exist before their EndpointSlices) in npfirst test loops
- Reordered npfirst loops to add pods before services (post-run),
  matching the production event sequence
- Kept podsfirst loops with original pre-run ordering (pods + services
  before cont.run) to test the initial-sync path
- Test coverage added for getServiceAugmentByPort (port-range iteration,
  name... (continued)
Pull Request #1707: Fix over-permissive named port resolution in NetworkPolicy

408 of 466 new or added lines in 4 files covered. (87.55%)

13 existing lines in 4 files now uncovered.

13550 of 21260 relevant lines covered (63.73%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.39
/pkg/controller/services.go
1
// Copyright 2016 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "errors"
19
        "fmt"
20
        "net"
21
        "reflect"
22
        "regexp"
23
        "sort"
24
        "strconv"
25
        "strings"
26
        "time"
27

28
        "github.com/noironetworks/aci-containers/pkg/apicapi"
29
        "github.com/noironetworks/aci-containers/pkg/metadata"
30
        "github.com/noironetworks/aci-containers/pkg/util"
31
        "github.com/sirupsen/logrus"
32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35
        "k8s.io/apimachinery/pkg/fields"
36
        "k8s.io/apimachinery/pkg/labels"
37
        "k8s.io/apimachinery/pkg/util/intstr"
38
        "k8s.io/client-go/kubernetes"
39
        "k8s.io/client-go/tools/cache"
40
)
41

42
// Default service contract scope value
43
const DefaultServiceContractScope = "context"
44

45
// Default service ext subnet scope - enable shared security
46
const DefaultServiceExtSubNetShared = false
47

48
func (cont *AciController) initEndpointSliceInformerFromClient(
49
        kubeClient kubernetes.Interface) {
×
50
        cont.initEndpointSliceInformerBase(
×
51
                cache.NewListWatchFromClient(
×
52
                        kubeClient.DiscoveryV1().RESTClient(), "endpointslices",
×
53
                        metav1.NamespaceAll, fields.Everything()))
×
54
}
×
55

56
func (cont *AciController) initEndpointSliceInformerBase(listWatch *cache.ListWatch) {
1✔
57
        cont.endpointSliceIndexer, cont.endpointSliceInformer = cache.NewIndexerInformer(
1✔
58
                listWatch, &discovery.EndpointSlice{}, 0,
1✔
59
                cache.ResourceEventHandlerFuncs{
1✔
60
                        AddFunc: func(obj interface{}) {
2✔
61
                                cont.endpointSliceAdded(obj)
1✔
62
                        },
1✔
63
                        UpdateFunc: func(oldobj interface{}, newobj interface{}) {
1✔
64
                                cont.endpointSliceUpdated(oldobj, newobj)
1✔
65
                        },
1✔
66
                        DeleteFunc: func(obj interface{}) {
1✔
67
                                cont.endpointSliceDeleted(obj)
1✔
68
                        },
1✔
69
                },
70
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
71
        )
72
}
73

74
func (cont *AciController) initServiceInformerFromClient(
75
        kubeClient *kubernetes.Clientset) {
×
76
        cont.initServiceInformerBase(
×
77
                cache.NewListWatchFromClient(
×
78
                        kubeClient.CoreV1().RESTClient(), "services",
×
79
                        metav1.NamespaceAll, fields.Everything()))
×
80
}
×
81

82
func (cont *AciController) initServiceInformerBase(listWatch *cache.ListWatch) {
1✔
83
        cont.serviceIndexer, cont.serviceInformer = cache.NewIndexerInformer(
1✔
84
                listWatch, &v1.Service{}, 0,
1✔
85
                cache.ResourceEventHandlerFuncs{
1✔
86
                        AddFunc: func(obj interface{}) {
2✔
87
                                cont.serviceAdded(obj)
1✔
88
                        },
1✔
89
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
90
                                cont.serviceUpdated(old, new)
1✔
91
                        },
1✔
92
                        DeleteFunc: func(obj interface{}) {
×
93
                                cont.serviceDeleted(obj)
×
94
                        },
×
95
                },
96
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
97
        )
98
}
99

100
func serviceLogger(log *logrus.Logger, as *v1.Service) *logrus.Entry {
1✔
101
        return log.WithFields(logrus.Fields{
1✔
102
                "namespace": as.ObjectMeta.Namespace,
1✔
103
                "name":      as.ObjectMeta.Name,
1✔
104
                "type":      as.Spec.Type,
1✔
105
        })
1✔
106
}
1✔
107

108
func (cont *AciController) queueIPNetPolUpdates(ips map[string]bool) {
1✔
109
        for ipStr := range ips {
2✔
110
                ip := net.ParseIP(ipStr)
1✔
111
                if ip == nil {
1✔
112
                        continue
×
113
                }
114
                entries, err := cont.netPolSubnetIndex.ContainingNetworks(ip)
1✔
115
                if err != nil {
1✔
116
                        cont.log.Error("Corrupted network policy IP index, err: ", err)
×
117
                        return
×
118
                }
×
119
                for _, entry := range entries {
2✔
120
                        for npkey := range entry.(*ipIndexEntry).keys {
2✔
121
                                cont.queueNetPolUpdateByKey(npkey)
1✔
122
                        }
1✔
123
                }
124
        }
125
}
126

127
func (cont *AciController) queuePortNetPolUpdates(ports map[string]targetPort) {
1✔
128
        for portkey := range ports {
2✔
129
                entry := cont.targetPortIndex[portkey]
1✔
130
                if entry == nil {
2✔
131
                        continue
1✔
132
                }
133
                for npkey := range entry.networkPolicyKeys {
2✔
134
                        cont.queueNetPolUpdateByKey(npkey)
1✔
135
                }
1✔
136
        }
137
}
138

139
func (cont *AciController) queueMatchingNamedNp(served map[string]bool, podkey string) {
1✔
140
        cont.indexMutex.Lock()
1✔
141
        for npkey := range cont.nmPortNp {
2✔
142
                if _, ok := served[npkey]; !ok {
2✔
143
                        if cont.checkPodNmpMatchesNp(npkey, podkey) {
2✔
144
                                cont.queueNetPolUpdateByKey(npkey)
1✔
145
                        }
1✔
146
                }
147
        }
148
        cont.indexMutex.Unlock()
1✔
149
}
150

151
func (cont *AciController) returnServiceIps(ips []net.IP) {
1✔
152
        for _, ip := range ips {
2✔
153
                if ip.To4() != nil {
2✔
154
                        cont.serviceIps.DeallocateIp(ip)
1✔
155
                } else if ip.To16() != nil {
3✔
156
                        cont.serviceIps.DeallocateIp(ip)
1✔
157
                }
1✔
158
        }
159
}
160

161
func returnIps(pool *netIps, ips []net.IP) {
1✔
162
        for _, ip := range ips {
2✔
163
                if ip.To4() != nil {
2✔
164
                        pool.V4.AddIp(ip)
1✔
165
                } else if ip.To16() != nil {
3✔
166
                        pool.V6.AddIp(ip)
1✔
167
                }
1✔
168
        }
169
}
170

171
func (cont *AciController) staticMonPolName() string {
1✔
172
        return cont.aciNameForKey("monPol", cont.env.ServiceBd())
1✔
173
}
1✔
174

175
func (cont *AciController) staticMonPolDn() string {
1✔
176
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
177
                return fmt.Sprintf("uni/tn-%s/ipslaMonitoringPol-%s",
1✔
178
                        cont.config.AciVrfTenant, cont.staticMonPolName())
1✔
179
        }
1✔
180
        return ""
×
181
}
182

183
func (cont *AciController) staticServiceObjs() apicapi.ApicSlice {
1✔
184
        var serviceObjs apicapi.ApicSlice
1✔
185

1✔
186
        // Service bridge domain
1✔
187
        bdName := cont.aciNameForKey("bd", cont.env.ServiceBd())
1✔
188
        bd := apicapi.NewFvBD(cont.config.AciVrfTenant, bdName)
1✔
189
        if apicapi.ApicVersion >= "6.0(4c)" {
1✔
190
                bd.SetAttr("serviceBdRoutingDisable", "yes")
×
191
        }
×
192
        bd.SetAttr("arpFlood", "yes")
1✔
193
        bd.SetAttr("ipLearning", "no")
1✔
194
        bd.SetAttr("unkMacUcastAct", cont.config.UnknownMacUnicastAction)
1✔
195
        bdToOut := apicapi.NewRsBdToOut(bd.GetDn(), cont.config.AciL3Out)
1✔
196
        bd.AddChild(bdToOut)
1✔
197
        bdToVrf := apicapi.NewRsCtx(bd.GetDn(), cont.config.AciVrf)
1✔
198
        bd.AddChild(bdToVrf)
1✔
199

1✔
200
        bdn := bd.GetDn()
1✔
201
        for _, cidr := range cont.config.NodeServiceSubnets {
2✔
202
                sn := apicapi.NewFvSubnet(bdn, cidr)
1✔
203
                bd.AddChild(sn)
1✔
204
        }
1✔
205
        serviceObjs = append(serviceObjs, bd)
1✔
206

1✔
207
        // Service IP SLA monitoring policy
1✔
208
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
209
                monPol := apicapi.NewFvIPSLAMonitoringPol(cont.config.AciVrfTenant,
1✔
210
                        cont.staticMonPolName())
1✔
211
                monPol.SetAttr("slaFrequency",
1✔
212
                        strconv.Itoa(cont.config.AciServiceMonitorInterval))
1✔
213
                serviceObjs = append(serviceObjs, monPol)
1✔
214
        }
1✔
215

216
        return serviceObjs
1✔
217
}
218

219
func (cont *AciController) initStaticServiceObjs() {
1✔
220
        cont.apicConn.WriteApicObjects(cont.config.AciPrefix+"_service_static",
1✔
221
                cont.staticServiceObjs())
1✔
222
}
1✔
223

224
func (cont *AciController) updateServicesForNode(nodename string) {
1✔
225
        cont.serviceEndPoints.UpdateServicesForNode(nodename)
1✔
226
}
1✔
227

228
// must have index lock
229
func (cont *AciController) getActiveFabricPathDn(node string) string {
×
230
        var fabricPathDn string
×
231
        sz := len(cont.nodeOpflexDevice[node])
×
232
        for i := range cont.nodeOpflexDevice[node] {
×
233
                device := cont.nodeOpflexDevice[node][sz-1-i]
×
234
                if device.GetAttrStr("state") == "connected" {
×
235
                        fabricPathDn = device.GetAttrStr("fabricPathDn")
×
236
                        break
×
237
                }
238
        }
239
        return fabricPathDn
×
240
}
241

242
func (cont *AciController) getOpflexOdevCount(node string) (int, string) {
×
243
        fabricPathDnCount := make(map[string]int)
×
244
        var maxCount int
×
245
        var fabricPathDn string
×
246
        for _, device := range cont.nodeOpflexDevice[node] {
×
247
                fabricpathdn := device.GetAttrStr("fabricPathDn")
×
248
                count, ok := fabricPathDnCount[fabricpathdn]
×
249
                if ok {
×
250
                        fabricPathDnCount[fabricpathdn] = count + 1
×
251
                } else {
×
252
                        fabricPathDnCount[fabricpathdn] = 1
×
253
                }
×
254
                if fabricPathDnCount[fabricpathdn] >= maxCount {
×
255
                        maxCount = fabricPathDnCount[fabricpathdn]
×
256
                        fabricPathDn = fabricpathdn
×
257
                }
×
258
        }
259
        return maxCount, fabricPathDn
×
260
}
261

262
func deleteDevicesFromList(delDevices, devices apicapi.ApicSlice) apicapi.ApicSlice {
×
263
        var newDevices apicapi.ApicSlice
×
264
        for _, device := range devices {
×
265
                found := false
×
266
                for _, delDev := range delDevices {
×
267
                        if reflect.DeepEqual(delDev, device) {
×
268
                                found = true
×
269
                        }
×
270
                }
271
                if !found {
×
272
                        newDevices = append(newDevices, device)
×
273
                }
×
274
        }
275
        return newDevices
×
276
}
277

278
func (cont *AciController) getAciPodSubnet(pod string) (string, error) {
×
279
        podslice := strings.Split(pod, "-")
×
280
        if len(podslice) < 2 {
×
281
                return "", fmt.Errorf("Failed to get podid from pod")
×
282
        }
×
283
        podid := podslice[1]
×
284
        var subnet string
×
285
        args := []string{
×
286
                "query-target=self",
×
287
        }
×
288
        url := fmt.Sprintf("/api/node/mo/uni/controller/setuppol/setupp-%s.json?%s", podid, strings.Join(args, "&"))
×
289
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
290
        if err != nil {
×
291
                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
292
                return subnet, err
×
293
        }
×
294
        for _, obj := range apicresp.Imdata {
×
295
                for _, body := range obj {
×
296
                        tepPool, ok := body.Attributes["tepPool"].(string)
×
297
                        if ok {
×
298
                                subnet = tepPool
×
299
                                break
×
300
                        }
301
                }
302
        }
303
        return subnet, nil
×
304
}
305

306
func (cont *AciController) isSingleOpflexOdev(fabricPathDn string) (bool, error) {
×
307
        pathSlice := strings.Split(fabricPathDn, "/")
×
308
        if len(pathSlice) > 2 {
×
309
                path := pathSlice[2]
×
310
                // topology/<aci_pod_name>/paths-<id>/pathep-<iface> - fabricPathDn if
×
311
                // host is connnected via single link
×
312
                // topology/<aci_pod_name>/protpaths-<id_1>-<id_2>/pathep-<iface> - fabricPathDn if
×
313
                // host is connected to vpc pair
×
314
                if strings.Contains(path, "protpaths-") {
×
315
                        args := []string{
×
316
                                "query-target=self",
×
317
                        }
×
318
                        url := fmt.Sprintf("/api/node/class/vpcDom.json?%s", strings.Join(args, "&"))
×
319
                        apicresp, err := cont.apicConn.GetApicResponse(url)
×
320
                        if err != nil {
×
321
                                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
322
                                return false, err
×
323
                        }
×
324
                        nodeIdSlice := strings.Split(path, "-")
×
325
                        if len(nodeIdSlice) != 3 {
×
326
                                cont.log.Error("Invalid fabricPathDn ", fabricPathDn)
×
327
                                return false, fmt.Errorf("Invalid path in fabricPathDn %s", fabricPathDn)
×
328
                        }
×
329
                        // As host is connected to vpc pair, check if the status of any of the vpcDom is down
330
                        upCount := 0
×
331
                        for i, nodeId := range nodeIdSlice {
×
332
                                instDn := fmt.Sprintf("topology/%s/node-%s/sys/vpc/inst", pathSlice[1], nodeId)
×
333
                                if i == 0 {
×
334
                                        continue
×
335
                                }
336
                                for _, obj := range apicresp.Imdata {
×
337
                                        for _, body := range obj {
×
338
                                                dn, ok := body.Attributes["dn"].(string)
×
339
                                                if ok && strings.Contains(dn, instDn) {
×
340
                                                        peerSt, ok := body.Attributes["peerSt"].(string)
×
341
                                                        if ok && peerSt == "up" {
×
342
                                                                upCount++
×
343
                                                        }
×
344
                                                }
345
                                        }
346
                                }
347
                        }
348
                        if upCount < 2 {
×
349
                                return true, nil
×
350
                        }
×
351
                        return false, nil
×
352
                } else {
×
353
                        return true, nil
×
354
                }
×
355
        }
356
        return false, fmt.Errorf("Invalid fabricPathDn %s ", fabricPathDn)
×
357
}
358

359
func (cont *AciController) createAciPodAnnotation(node string) (aciPodAnnot, error) {
×
360
        odevCount, fabricPathDn := cont.getOpflexOdevCount(node)
×
361
        nodeAciPodAnnot := cont.nodeACIPod[node]
×
362
        isSingleOdev := false
×
363
        if odevCount == 1 {
×
364
                var err error
×
365
                isSingleOdev, err = cont.isSingleOpflexOdev(fabricPathDn)
×
366
                if err != nil {
×
367
                        cont.log.Error(err)
×
368
                        return nodeAciPodAnnot, err
×
369
                }
×
370
        }
371
        if (odevCount == 0) ||
×
372
                (odevCount == 1 && !isSingleOdev) {
×
373
                if nodeAciPodAnnot.aciPod != "none" {
×
374
                        if nodeAciPodAnnot.disconnectTime.IsZero() {
×
375
                                nodeAciPodAnnot.disconnectTime = time.Now()
×
376
                        } else {
×
377
                                currentTime := time.Now()
×
378
                                diff := currentTime.Sub(nodeAciPodAnnot.disconnectTime)
×
379
                                if diff.Seconds() > float64(cont.config.OpflexDeviceReconnectWaitTimeout) {
×
380
                                        nodeAciPodAnnot.aciPod = "none"
×
381
                                        nodeAciPodAnnot.disconnectTime = time.Time{}
×
382
                                }
×
383
                        }
384
                }
385
                return nodeAciPodAnnot, nil
×
386
        } else if (odevCount == 2) ||
×
387
                (odevCount == 1 && isSingleOdev) {
×
388
                // when there is already a connected opflex device,
×
389
                // fabricPathDn will have latest pod iformation
×
390
                // and annotation will be in the form pod-<podid>-<subnet of pod>
×
391
                nodeAciPodAnnot.disconnectTime = time.Time{}
×
392
                nodeAciPod := nodeAciPodAnnot.aciPod
×
393
                pathSlice := strings.Split(fabricPathDn, "/")
×
394
                if len(pathSlice) > 1 {
×
395
                        pod := pathSlice[1]
×
396

×
397
                        // when there is difference in pod info avaliable from fabricPathDn
×
398
                        // and what we have in cache, update info in cache and change annotation on node
×
399
                        if !strings.Contains(nodeAciPod, pod) {
×
400
                                subnet, err := cont.getAciPodSubnet(pod)
×
401
                                if err != nil {
×
402
                                        cont.log.Error("Failed to get subnet of aci pod ", err.Error())
×
403
                                        return nodeAciPodAnnot, err
×
404
                                } else {
×
405
                                        nodeAciPodAnnot.aciPod = pod + "-" + subnet
×
406
                                        return nodeAciPodAnnot, nil
×
407
                                }
×
408
                        } else {
×
409
                                return nodeAciPodAnnot, nil
×
410
                        }
×
411
                } else {
×
412
                        cont.log.Error("Invalid fabricPathDn of opflexOdev of node ", node)
×
413
                        return nodeAciPodAnnot, fmt.Errorf("Invalid fabricPathDn of opflexOdev")
×
414
                }
×
415
        }
416
        return nodeAciPodAnnot, fmt.Errorf("Failed to get annotation for node %s", node)
×
417
}
418

419
func (cont *AciController) createNodeAciPodAnnotation(node string) (aciPodAnnot, error) {
×
420
        odevCount, fabricPathDn := cont.getOpflexOdevCount(node)
×
421
        nodeAciPodAnnot := cont.nodeACIPodAnnot[node]
×
422
        isSingleOdev := false
×
423
        if odevCount == 1 {
×
424
                var err error
×
425
                isSingleOdev, err = cont.isSingleOpflexOdev(fabricPathDn)
×
426
                if err != nil {
×
427
                        cont.log.Error(err)
×
428
                        return nodeAciPodAnnot, err
×
429
                }
×
430
        }
431
        if (odevCount == 0) ||
×
432
                (odevCount == 1 && !isSingleOdev) {
×
433
                if nodeAciPodAnnot.aciPod != "none" {
×
434
                        nodeAciPodAnnot.aciPod = "none"
×
435
                }
×
436
                return nodeAciPodAnnot, nil
×
437
        } else if (odevCount == 2) ||
×
438
                (odevCount == 1 && isSingleOdev) {
×
439
                pathSlice := strings.Split(fabricPathDn, "/")
×
440
                if len(pathSlice) > 1 {
×
441

×
442
                        nodeAciPodAnnot.aciPod = pathSlice[1]
×
443
                        return nodeAciPodAnnot, nil
×
444
                } else {
×
445
                        cont.log.Error("Invalid fabricPathDn of opflexOdev of node ", node)
×
446
                        return nodeAciPodAnnot, fmt.Errorf("Invalid fabricPathDn of opflexOdev")
×
447
                }
×
448
        }
449
        return nodeAciPodAnnot, fmt.Errorf("Failed to get annotation for node %s", node)
×
450
}
451

452
func (cont *AciController) checkChangeOfOpflexOdevAciPod() {
×
453
        var nodeAnnotationUpdates []string
×
454
        cont.apicConn.SyncMutex.Lock()
×
455
        syncDone := cont.apicConn.SyncDone
×
456
        cont.apicConn.SyncMutex.Unlock()
×
457

×
458
        if !syncDone {
×
459
                return
×
460
        }
×
461

462
        cont.indexMutex.Lock()
×
463
        for node := range cont.nodeACIPodAnnot {
×
464
                annot, err := cont.createNodeAciPodAnnotation(node)
×
465
                if err != nil {
×
466
                        if strings.Contains(fmt.Sprint(err), "Failed to get annotation") {
×
467
                                now := time.Now()
×
468
                                if annot.lastErrorTime.IsZero() || now.Sub(annot.lastErrorTime).Seconds() >= 60 {
×
469
                                        annot.lastErrorTime = now
×
470
                                        cont.nodeACIPodAnnot[node] = annot
×
471
                                        cont.log.Error(err.Error())
×
472
                                }
×
473
                        } else {
×
474
                                cont.log.Error(err.Error())
×
475
                        }
×
476
                } else {
×
477
                        if annot != cont.nodeACIPodAnnot[node] {
×
478
                                cont.nodeACIPodAnnot[node] = annot
×
479
                                nodeAnnotationUpdates = append(nodeAnnotationUpdates, node)
×
480
                        }
×
481
                }
482
        }
483
        cont.indexMutex.Unlock()
×
484
        if len(nodeAnnotationUpdates) > 0 {
×
485
                for _, updatednode := range nodeAnnotationUpdates {
×
486
                        go cont.env.NodeAnnotationChanged(updatednode)
×
487
                }
×
488
        }
489
}
490

491
func (cont *AciController) checkChangeOfOdevAciPod() {
×
492
        var nodeAnnotationUpdates []string
×
493
        cont.apicConn.SyncMutex.Lock()
×
494
        syncDone := cont.apicConn.SyncDone
×
495
        cont.apicConn.SyncMutex.Unlock()
×
496

×
497
        if !syncDone {
×
498
                return
×
499
        }
×
500

501
        cont.indexMutex.Lock()
×
502
        for node := range cont.nodeACIPod {
×
503
                annot, err := cont.createAciPodAnnotation(node)
×
504
                if err != nil {
×
505
                        if strings.Contains(fmt.Sprint(err), "Failed to get annotation") {
×
506
                                now := time.Now()
×
507
                                if annot.lastErrorTime.IsZero() || now.Sub(annot.lastErrorTime).Seconds() >= 60 {
×
508
                                        annot.lastErrorTime = now
×
509
                                        cont.nodeACIPod[node] = annot
×
510
                                        cont.log.Error(err.Error())
×
511
                                }
×
512
                        } else {
×
513
                                cont.log.Error(err.Error())
×
514
                        }
×
515
                } else {
×
516
                        if annot != cont.nodeACIPod[node] {
×
517
                                cont.nodeACIPod[node] = annot
×
518
                                nodeAnnotationUpdates = append(nodeAnnotationUpdates, node)
×
519
                        }
×
520
                }
521
        }
522
        cont.indexMutex.Unlock()
×
523
        if len(nodeAnnotationUpdates) > 0 {
×
524
                for _, updatednode := range nodeAnnotationUpdates {
×
525
                        go cont.env.NodeAnnotationChanged(updatednode)
×
526
                }
×
527
        }
528
}
529

530
func (cont *AciController) deleteOldOpflexDevices() {
1✔
531
        var nodeUpdates []string
1✔
532
        cont.indexMutex.Lock()
1✔
533
        for node, devices := range cont.nodeOpflexDevice {
1✔
534
                var delDevices apicapi.ApicSlice
×
535
                fabricPathDn := cont.getActiveFabricPathDn(node)
×
536
                if fabricPathDn != "" {
×
537
                        for _, device := range devices {
×
538
                                if device.GetAttrStr("delete") == "true" && device.GetAttrStr("fabricPathDn") != fabricPathDn {
×
539
                                        deleteTimeStr := device.GetAttrStr("deleteTime")
×
540
                                        deleteTime, err := time.Parse(time.RFC3339, deleteTimeStr)
×
541
                                        if err != nil {
×
542
                                                cont.log.Error("Failed to parse opflex device delete time: ", err)
×
543
                                                continue
×
544
                                        }
545
                                        now := time.Now()
×
546
                                        diff := now.Sub(deleteTime)
×
547
                                        if diff.Seconds() >= cont.config.OpflexDeviceDeleteTimeout {
×
548
                                                delDevices = append(delDevices, device)
×
549
                                        }
×
550
                                }
551
                        }
552
                        if len(delDevices) > 0 {
×
553
                                newDevices := deleteDevicesFromList(delDevices, devices)
×
554
                                cont.nodeOpflexDevice[node] = newDevices
×
555
                                cont.log.Info("Opflex device list for node ", node, " after deleting stale entries: ", cont.nodeOpflexDevice[node])
×
556
                                if len(newDevices) == 0 {
×
557
                                        delete(cont.nodeOpflexDevice, node)
×
558
                                }
×
559
                                nodeUpdates = append(nodeUpdates, node)
×
560
                        }
561
                }
562
        }
563
        cont.indexMutex.Unlock()
1✔
564
        if len(nodeUpdates) > 0 {
1✔
565
                cont.postOpflexDeviceDelete(nodeUpdates)
×
566
        }
×
567
}
568

569
// must have index lock
570
func (cont *AciController) setDeleteFlagForOldDevices(node, fabricPathDn string) {
1✔
571
        for _, device := range cont.nodeOpflexDevice[node] {
2✔
572
                if device.GetAttrStr("fabricPathDn") != fabricPathDn {
1✔
573
                        t := time.Now()
×
574
                        device.SetAttr("delete", "true")
×
575
                        device.SetAttr("deleteTime", t.Format(time.RFC3339))
×
576
                }
×
577
        }
578
}
579

580
// must have index lock
581
func (cont *AciController) fabricPathForNode(name string) (string, bool) {
1✔
582
        sz := len(cont.nodeOpflexDevice[name])
1✔
583
        for i := range cont.nodeOpflexDevice[name] {
2✔
584
                device := cont.nodeOpflexDevice[name][sz-1-i]
1✔
585
                deviceState := device.GetAttrStr("state")
1✔
586
                if deviceState == "connected" {
2✔
587
                        if deviceState != device.GetAttrStr("prevState") {
2✔
588
                                cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Info("Processing fabric path for node ",
1✔
589
                                        "when connected device state is found")
1✔
590
                                device.SetAttr("prevState", deviceState)
1✔
591
                        }
1✔
592
                        fabricPathDn := device.GetAttrStr("fabricPathDn")
1✔
593
                        cont.setDeleteFlagForOldDevices(name, fabricPathDn)
1✔
594
                        return fabricPathDn, true
1✔
595
                } else {
1✔
596
                        device.SetAttr("prevState", deviceState)
1✔
597
                }
1✔
598
        }
599
        if sz > 0 {
2✔
600
                // When the opflex-device for a node changes, for example during a live migration,
1✔
601
                // we end up with both the old and the new device objects till the old object
1✔
602
                // ages out on APIC. The new object is at end of the devices list (see opflexDeviceChanged),
1✔
603
                // so we return the fabricPathDn of the last opflex-device.
1✔
604
                cont.fabricPathLogger(cont.nodeOpflexDevice[name][sz-1].GetAttrStr("hostName"),
1✔
605
                        cont.nodeOpflexDevice[name][sz-1]).Info("Processing fabricPathDn for node")
1✔
606
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("fabricPathDn"), true
1✔
607
        }
1✔
608
        return "", false
1✔
609
}
610

611
// must have index lock
612
func (cont *AciController) deviceMacForNode(name string) (string, bool) {
1✔
613
        sz := len(cont.nodeOpflexDevice[name])
1✔
614
        if sz > 0 {
2✔
615
                // When the opflex-device for a node changes, for example when the
1✔
616
                // node is recreated, we end up with both the old and the new
1✔
617
                // device objects till the old object ages out on APIC. The
1✔
618
                // new object is at end of the devices list (see
1✔
619
                // opflexDeviceChanged), so we return the MAC address of the
1✔
620
                // last opflex-device.
1✔
621
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("mac"), true
1✔
622
        }
1✔
623
        return "", false
1✔
624
}
625

626
func apicRedirectDst(rpDn string, ip string, mac string,
627
        descr string, healthGroupDn string, enablePbrTracking bool) apicapi.ApicObject {
1✔
628
        dst := apicapi.NewVnsRedirectDest(rpDn, ip, mac).SetAttr("descr", descr)
1✔
629
        if healthGroupDn != "" && enablePbrTracking {
2✔
630
                dst.AddChild(apicapi.NewVnsRsRedirectHealthGroup(dst.GetDn(),
1✔
631
                        healthGroupDn))
1✔
632
        }
1✔
633
        return dst
1✔
634
}
635

636
func (cont *AciController) apicRedirectPol(name string, tenantName string, nodes []string,
637
        nodeMap map[string]*metadata.ServiceEndpoint,
638
        monPolDn string, enablePbrTracking bool) (apicapi.ApicObject, string) {
1✔
639
        rp := apicapi.NewVnsSvcRedirectPol(tenantName, name)
1✔
640
        rp.SetAttr("thresholdDownAction", "deny")
1✔
641
        if cont.config.DisableResilientHashing {
1✔
642
                rp.SetAttr("resilientHashEnabled", "no")
×
643
        }
×
644
        rpDn := rp.GetDn()
1✔
645
        for _, node := range nodes {
2✔
646
                cont.indexMutex.Lock()
1✔
647
                serviceEp, ok := nodeMap[node]
1✔
648
                if !ok {
1✔
649
                        continue
×
650
                }
651
                if serviceEp.Ipv4 != nil {
2✔
652
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv4.String(),
1✔
653
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
1✔
654
                }
1✔
655
                if serviceEp.Ipv6 != nil {
1✔
656
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv6.String(),
×
657
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
×
658
                }
×
659
                cont.indexMutex.Unlock()
1✔
660
        }
661
        if monPolDn != "" && enablePbrTracking {
2✔
662
                rp.AddChild(apicapi.NewVnsRsIPSLAMonitoringPol(rpDn, monPolDn))
1✔
663
        }
1✔
664
        return rp, rpDn
1✔
665
}
666

667
func apicExtNetCreate(enDn string, ingress string, ipv4 bool,
668
        cidr bool, sharedSec bool) apicapi.ApicObject {
1✔
669
        if !cidr {
2✔
670
                if ipv4 {
2✔
671
                        ingress += "/32"
1✔
672
                } else {
1✔
673
                        ingress += "/128"
×
674
                }
×
675
        }
676
        subnet := apicapi.NewL3extSubnet(enDn, ingress, "", "")
1✔
677
        if sharedSec {
2✔
678
                subnet.SetAttr("scope", "import-security,shared-security")
1✔
679
        }
1✔
680
        return subnet
1✔
681
}
682

683
func apicExtNet(name string, tenantName string, l3Out string,
684
        ingresses []string, sharedSecurity bool, snat bool) apicapi.ApicObject {
1✔
685
        en := apicapi.NewL3extInstP(tenantName, l3Out, name)
1✔
686
        enDn := en.GetDn()
1✔
687
        if snat {
2✔
688
                en.AddChild(apicapi.NewFvRsCons(enDn, name))
1✔
689
        } else {
2✔
690
                en.AddChild(apicapi.NewFvRsProv(enDn, name))
1✔
691
        }
1✔
692

693
        for _, ingress := range ingresses {
2✔
694
                ip, _, _ := net.ParseCIDR(ingress)
1✔
695
                // If ingress is a subnet
1✔
696
                if ip != nil {
2✔
697
                        if ip != nil && ip.To4() != nil {
2✔
698
                                subnet := apicExtNetCreate(enDn, ingress, true, true, sharedSecurity)
1✔
699
                                en.AddChild(subnet)
1✔
700
                        } else if ip != nil && ip.To16() != nil {
1✔
701
                                subnet := apicExtNetCreate(enDn, ingress, false, true, sharedSecurity)
×
702
                                en.AddChild(subnet)
×
703
                        }
×
704
                } else {
1✔
705
                        // If ingress is an IP address
1✔
706
                        ip := net.ParseIP(ingress)
1✔
707
                        if ip != nil && ip.To4() != nil {
2✔
708
                                subnet := apicExtNetCreate(enDn, ingress, true, false, sharedSecurity)
1✔
709
                                en.AddChild(subnet)
1✔
710
                        } else if ip != nil && ip.To16() != nil {
1✔
711
                                subnet := apicExtNetCreate(enDn, ingress, false, false, sharedSecurity)
×
712
                                en.AddChild(subnet)
×
713
                        }
×
714
                }
715
        }
716
        return en
1✔
717
}
718

719
func apicDefaultEgCons(conName string, tenantName string,
720
        appProfile string, epg string) apicapi.ApicObject {
×
721
        enDn := fmt.Sprintf("uni/tn-%s/ap-%s/epg-%s", tenantName, appProfile, epg)
×
722
        return apicapi.NewFvRsCons(enDn, conName)
×
723
}
×
724

725
func apicExtNetCons(conName string, tenantName string,
726
        l3Out string, net string) apicapi.ApicObject {
1✔
727
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
728
        return apicapi.NewFvRsCons(enDn, conName)
1✔
729
}
1✔
730

731
func apicExtNetProv(conName string, tenantName string,
732
        l3Out string, net string) apicapi.ApicObject {
1✔
733
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
734
        return apicapi.NewFvRsProv(enDn, conName)
1✔
735
}
1✔
736

737
// Helper function to check if a string item exists in a slice
738
func stringInSlice(str string, list []string) bool {
1✔
739
        for _, v := range list {
2✔
740
                if v == str {
2✔
741
                        return true
1✔
742
                }
1✔
743
        }
744
        return false
×
745
}
746

747
func validScope(scope string) bool {
1✔
748
        validValues := []string{"", "context", "tenant", "global"}
1✔
749
        return stringInSlice(scope, validValues)
1✔
750
}
1✔
751

752
func (cont *AciController) getGraphNameFromContract(name, tenantName string) (string, error) {
×
753
        var graphName string
×
754
        args := []string{
×
755
                "query-target=subtree",
×
756
        }
×
757
        url := fmt.Sprintf("/api/node/mo/uni/tn-%s/brc-%s.json?%s", tenantName, name, strings.Join(args, "&"))
×
758
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
759
        if err != nil {
×
760
                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
761
                return graphName, err
×
762
        }
×
763
        for _, obj := range apicresp.Imdata {
×
764
                for class, body := range obj {
×
765
                        if class == "vzRsSubjGraphAtt" {
×
766
                                tnVnsAbsGraphName, ok := body.Attributes["tnVnsAbsGraphName"].(string)
×
767
                                if ok {
×
768
                                        graphName = tnVnsAbsGraphName
×
769
                                }
×
770
                                break
×
771
                        }
772
                }
773
        }
774
        cont.log.Debug("graphName: ", graphName)
×
775
        return graphName, err
×
776
}
777

778
func apicContract(conName string, tenantName string,
779
        graphName string, scopeName string, isSnatPbrFltrChain bool,
780
        customSGAnnot bool) apicapi.ApicObject {
1✔
781
        con := apicapi.NewVzBrCP(tenantName, conName)
1✔
782
        if scopeName != "" && scopeName != "context" {
2✔
783
                con.SetAttr("scope", scopeName)
1✔
784
        }
1✔
785
        cs := apicapi.NewVzSubj(con.GetDn(), "loadbalancedservice")
1✔
786
        csDn := cs.GetDn()
1✔
787
        if isSnatPbrFltrChain {
2✔
788
                cs.SetAttr("revFltPorts", "no")
1✔
789
                inTerm := apicapi.NewVzInTerm(csDn)
1✔
790
                outTerm := apicapi.NewVzOutTerm(csDn)
1✔
791
                inTerm.AddChild(apicapi.NewVzRsInTermGraphAtt(inTerm.GetDn(), graphName))
1✔
792
                inTerm.AddChild(apicapi.NewVzRsFiltAtt(inTerm.GetDn(), conName+"_fromCons-toProv"))
1✔
793
                outTerm.AddChild(apicapi.NewVzRsOutTermGraphAtt(outTerm.GetDn(), graphName))
1✔
794
                outTerm.AddChild(apicapi.NewVzRsFiltAtt(outTerm.GetDn(), conName+"_fromProv-toCons"))
1✔
795
                cs.AddChild(inTerm)
1✔
796
                cs.AddChild(outTerm)
1✔
797
        } else {
2✔
798
                cs.AddChild(apicapi.NewVzRsSubjGraphAtt(csDn, graphName, customSGAnnot))
1✔
799
                cs.AddChild(apicapi.NewVzRsSubjFiltAtt(csDn, conName))
1✔
800
        }
1✔
801
        con.AddChild(cs)
1✔
802
        return con
1✔
803
}
804

805
func apicDevCtx(name string, tenantName string,
806
        graphName string, deviceName string, bdName string, rpDn string, isSnatPbrFltrChain bool) apicapi.ApicObject {
1✔
807
        cc := apicapi.NewVnsLDevCtx(tenantName, name, graphName, "loadbalancer")
1✔
808
        ccDn := cc.GetDn()
1✔
809
        graphDn := fmt.Sprintf("uni/tn-%s/lDevVip-%s", tenantName, deviceName)
1✔
810
        lifDn := fmt.Sprintf("%s/lIf-%s", graphDn, "interface")
1✔
811
        bdDn := fmt.Sprintf("uni/tn-%s/BD-%s", tenantName, bdName)
1✔
812
        cc.AddChild(apicapi.NewVnsRsLDevCtxToLDev(ccDn, graphDn))
1✔
813
        rpDnBase := rpDn
1✔
814
        for _, ctxConn := range []string{"consumer", "provider"} {
2✔
815
                lifCtx := apicapi.NewVnsLIfCtx(ccDn, ctxConn)
1✔
816
                if isSnatPbrFltrChain {
2✔
817
                        if ctxConn == "consumer" {
2✔
818
                                rpDn = rpDnBase + "_Cons"
1✔
819
                        } else {
2✔
820
                                rpDn = rpDnBase + "_Prov"
1✔
821
                        }
1✔
822
                }
823
                lifCtxDn := lifCtx.GetDn()
1✔
824
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToSvcRedirectPol(lifCtxDn, rpDn))
1✔
825
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToBD(lifCtxDn, bdDn))
1✔
826
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToLIf(lifCtxDn, lifDn))
1✔
827
                cc.AddChild(lifCtx)
1✔
828
        }
829
        return cc
1✔
830
}
831

832
func apicFilterEntry(filterDn string, count string, p_start string,
833
        p_end string, protocol string, stateful string, snat bool, outTerm bool) apicapi.ApicObject {
1✔
834
        fe := apicapi.NewVzEntry(filterDn, count)
1✔
835
        fe.SetAttr("etherT", "ip")
1✔
836
        fe.SetAttr("prot", protocol)
1✔
837
        if snat {
2✔
838
                if outTerm {
2✔
839
                        if protocol == "tcp" {
2✔
840
                                fe.SetAttr("tcpRules", "est")
1✔
841
                        }
1✔
842
                        // Reverse the ports for outTerm
843
                        fe.SetAttr("dFromPort", p_start)
1✔
844
                        fe.SetAttr("dToPort", p_end)
1✔
845
                } else {
1✔
846
                        fe.SetAttr("sFromPort", p_start)
1✔
847
                        fe.SetAttr("sToPort", p_end)
1✔
848
                }
1✔
849
        } else {
1✔
850
                fe.SetAttr("dFromPort", p_start)
1✔
851
                fe.SetAttr("dToPort", p_end)
1✔
852
        }
1✔
853
        fe.SetAttr("stateful", stateful)
1✔
854
        return fe
1✔
855
}
856
func apicFilter(name string, tenantName string,
857
        portSpec []v1.ServicePort, snat bool, snatRange portRangeSnat) apicapi.ApicObject {
1✔
858
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
859
        filterDn := filter.GetDn()
1✔
860

1✔
861
        var i int
1✔
862
        var port v1.ServicePort
1✔
863
        for i, port = range portSpec {
2✔
864
                pstr := strconv.Itoa(int(port.Port))
1✔
865
                proto := getProtocolStr(port.Protocol)
1✔
866
                fe := apicFilterEntry(filterDn, strconv.Itoa(i), pstr,
1✔
867
                        pstr, proto, "no", false, false)
1✔
868
                filter.AddChild(fe)
1✔
869
        }
1✔
870

871
        if snat {
1✔
872
                portSpec := []portRangeSnat{snatRange}
×
873
                p_start := strconv.Itoa(portSpec[0].start)
×
874
                p_end := strconv.Itoa(portSpec[0].end)
×
875

×
876
                fe1 := apicFilterEntry(filterDn, strconv.Itoa(i+1), p_start,
×
877
                        p_end, "tcp", "no", false, false)
×
878
                filter.AddChild(fe1)
×
879
                fe2 := apicFilterEntry(filterDn, strconv.Itoa(i+2), p_start,
×
880
                        p_end, "udp", "no", false, false)
×
881
                filter.AddChild(fe2)
×
882
        }
×
883
        return filter
1✔
884
}
885

886
func apicFilterSnat(name string, tenantName string,
887
        portSpec []portRangeSnat, outTerm bool) apicapi.ApicObject {
1✔
888
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
889
        filterDn := filter.GetDn()
1✔
890

1✔
891
        p_start := strconv.Itoa(portSpec[0].start)
1✔
892
        p_end := strconv.Itoa(portSpec[0].end)
1✔
893

1✔
894
        fe := apicFilterEntry(filterDn, "0", p_start,
1✔
895
                p_end, "tcp", "no", true, outTerm)
1✔
896
        filter.AddChild(fe)
1✔
897
        fe1 := apicFilterEntry(filterDn, "1", p_start,
1✔
898
                p_end, "udp", "no", true, outTerm)
1✔
899
        filter.AddChild(fe1)
1✔
900

1✔
901
        return filter
1✔
902
}
1✔
903

904
func (cont *AciController) updateServiceDeviceInstance(key string,
905
        service *v1.Service) error {
1✔
906
        cont.indexMutex.Lock()
1✔
907
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
908
        cont.serviceEndPoints.GetnodesMetadata(key, service, nodeMap)
1✔
909
        cont.indexMutex.Unlock()
1✔
910

1✔
911
        var nodes []string
1✔
912
        for node := range nodeMap {
2✔
913
                nodes = append(nodes, node)
1✔
914
        }
1✔
915
        sort.Strings(nodes)
1✔
916
        name := cont.aciNameForKey("svc", key)
1✔
917
        var conScope string
1✔
918
        scopeVal, ok := service.ObjectMeta.Annotations[metadata.ServiceContractScopeAnnotation]
1✔
919
        if ok {
2✔
920
                normScopeVal := strings.ToLower(scopeVal)
1✔
921
                if !validScope(normScopeVal) {
1✔
922
                        errString := "Invalid service contract scope value provided " + scopeVal
×
923
                        err := errors.New(errString)
×
924
                        serviceLogger(cont.log, service).Error("Could not create contract: ", err)
×
925
                        return err
×
926
                } else {
1✔
927
                        conScope = normScopeVal
1✔
928
                }
1✔
929
        } else {
1✔
930
                conScope = DefaultServiceContractScope
1✔
931
        }
1✔
932

933
        var sharedSecurity bool
1✔
934
        if conScope == "global" {
2✔
935
                sharedSecurity = true
1✔
936
        } else {
2✔
937
                sharedSecurity = DefaultServiceExtSubNetShared
1✔
938
        }
1✔
939

940
        graphName := cont.aciNameForKey("svc", "global")
1✔
941
        deviceName := cont.aciNameForKey("svc", "global")
1✔
942
        _, customSGAnnPresent := service.ObjectMeta.Annotations[metadata.ServiceGraphNameAnnotation]
1✔
943
        if customSGAnnPresent {
1✔
944
                customSG, err := cont.getGraphNameFromContract(name, cont.config.AciVrfTenant)
×
945
                if err == nil {
×
946
                        graphName = customSG
×
947
                }
×
948
        }
949
        cont.log.Debug("Using service graph ", graphName, " for service ", key)
1✔
950

1✔
951
        var serviceObjs apicapi.ApicSlice
1✔
952
        if len(nodes) > 0 {
2✔
953
                // 1. Service redirect policy
1✔
954
                // The service redirect policy contains the MAC address
1✔
955
                // and IP address of each of the service endpoints for
1✔
956
                // each node that hosts a pod for this service.  The
1✔
957
                // example below shows the case of two nodes.
1✔
958
                rp, rpDn :=
1✔
959
                        cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
1✔
960
                                nodeMap, cont.staticMonPolDn(), cont.config.AciPbrTrackingNonSnat)
1✔
961
                serviceObjs = append(serviceObjs, rp)
1✔
962

1✔
963
                // 2. Service graph contract and external network
1✔
964
                // The service graph contract must be bound to the service
1✔
965
                // graph.  This contract must be consumed by the default
1✔
966
                // layer 3 network and provided by the service layer 3
1✔
967
                // network.
1✔
968
                {
2✔
969
                        var ingresses []string
1✔
970
                        for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
971
                                ingresses = append(ingresses, ingress.IP)
1✔
972
                        }
1✔
973
                        serviceObjs = append(serviceObjs,
1✔
974
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
975
                                        cont.config.AciL3Out, ingresses, sharedSecurity, false))
1✔
976
                }
977

978
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, false, customSGAnnPresent)
1✔
979
                serviceObjs = append(serviceObjs, contract)
1✔
980
                for _, net := range cont.config.AciExtNetworks {
2✔
981
                        serviceObjs = append(serviceObjs,
1✔
982
                                apicExtNetCons(name, cont.config.AciVrfTenant,
1✔
983
                                        cont.config.AciL3Out, net))
1✔
984
                }
1✔
985

986
                if cont.config.AddExternalContractToDefaultEPG && service.Spec.Type == v1.ServiceTypeLoadBalancer {
1✔
987
                        defaultEpgTenant := cont.config.DefaultEg.PolicySpace
×
988
                        defaultEpgStringSplit := strings.Split(cont.config.DefaultEg.Name, "|")
×
989
                        var defaultEpgName, appProfile string
×
990
                        if len(defaultEpgStringSplit) > 1 {
×
991
                                appProfile = defaultEpgStringSplit[0]
×
992
                                defaultEpgName = defaultEpgStringSplit[1]
×
993
                        } else {
×
994
                                appProfile = cont.config.AppProfile
×
995
                                defaultEpgName = defaultEpgStringSplit[0]
×
996
                        }
×
997
                        serviceObjs = append(serviceObjs,
×
998
                                apicDefaultEgCons(name, defaultEpgTenant, appProfile, defaultEpgName))
×
999
                }
1000

1001
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
1002
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
1003

1✔
1004
                _, snat := cont.snatServices[key]
1✔
1005
                filter := apicFilter(name, cont.config.AciVrfTenant,
1✔
1006
                        service.Spec.Ports, snat, defaultPortRange)
1✔
1007
                serviceObjs = append(serviceObjs, filter)
1✔
1008

1✔
1009
                // 3. Device cluster context
1✔
1010
                // The logical device context binds the service contract
1✔
1011
                // to the redirect policy and the device cluster and
1✔
1012
                // bridge domain for the device cluster.
1✔
1013
                serviceObjs = append(serviceObjs,
1✔
1014
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName, deviceName,
1✔
1015
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, false))
1✔
1016
        }
1017

1018
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1019
        return nil
1✔
1020
}
1021

1022
func (cont *AciController) updateServiceDeviceInstanceSnat(key string) error {
1✔
1023
        nodeList := cont.nodeIndexer.List()
1✔
1024
        cont.indexMutex.Lock()
1✔
1025
        if len(cont.nodeServiceMetaCache) == 0 {
2✔
1026
                cont.indexMutex.Unlock()
1✔
1027
                return nil
1✔
1028
        }
1✔
1029
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
1030
        sort.Slice(nodeList, func(i, j int) bool {
2✔
1031
                nodeA := nodeList[i].(*v1.Node)
1✔
1032
                nodeB := nodeList[j].(*v1.Node)
1✔
1033
                return nodeA.ObjectMeta.Name < nodeB.ObjectMeta.Name
1✔
1034
        })
1✔
1035
        for itr, nodeItem := range nodeList {
2✔
1036
                if itr == cont.config.MaxSvcGraphNodes {
1✔
1037
                        break
×
1038
                }
1039
                node := nodeItem.(*v1.Node)
1✔
1040
                nodeName := node.ObjectMeta.Name
1✔
1041
                nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
1042
                if !ok {
2✔
1043
                        continue
1✔
1044
                }
1045
                _, ok = cont.fabricPathForNode(nodeName)
1✔
1046
                if !ok {
1✔
1047
                        continue
×
1048
                }
1049
                nodeLabels := node.ObjectMeta.Labels
1✔
1050
                excludeNode := cont.nodeLabelsInExcludeList(nodeLabels)
1✔
1051
                if excludeNode {
1✔
1052
                        continue
×
1053
                }
1054
                nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
1055
        }
1056
        cont.indexMutex.Unlock()
1✔
1057

1✔
1058
        var nodes []string
1✔
1059
        for node := range nodeMap {
2✔
1060
                nodes = append(nodes, node)
1✔
1061
        }
1✔
1062
        sort.Strings(nodes)
1✔
1063
        name := cont.aciNameForKey("snat", key)
1✔
1064
        var conScope = cont.config.SnatSvcContractScope
1✔
1065
        sharedSecurity := true
1✔
1066

1✔
1067
        graphName := cont.aciNameForKey("svc", "global")
1✔
1068
        var serviceObjs apicapi.ApicSlice
1✔
1069
        if len(nodes) > 0 {
2✔
1070
                // 1. Service redirect policy
1✔
1071
                // The service redirect policy contains the MAC address
1✔
1072
                // and IP address of each of the service endpoints for
1✔
1073
                // each node that hosts a pod for this service.
1✔
1074
                // For SNAT with the introduction of filter-chain usage, to work-around
1✔
1075
                // an APIC limitation, creating two PBR policies with same nodes.
1✔
1076
                var rpDn string
1✔
1077
                var rp apicapi.ApicObject
1✔
1078
                if cont.apicConn.SnatPbrFltrChain {
2✔
1079
                        rpCons, rpDnCons :=
1✔
1080
                                cont.apicRedirectPol(name+"_Cons", cont.config.AciVrfTenant, nodes,
1✔
1081
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
1082
                        serviceObjs = append(serviceObjs, rpCons)
1✔
1083
                        rpProv, _ :=
1✔
1084
                                cont.apicRedirectPol(name+"_Prov", cont.config.AciVrfTenant, nodes,
1✔
1085
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
1086
                        serviceObjs = append(serviceObjs, rpProv)
1✔
1087
                        rpDn = strings.TrimSuffix(rpDnCons, "_Cons")
1✔
1088
                } else {
1✔
1089
                        rp, rpDn =
×
1090
                                cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
×
1091
                                        nodeMap, cont.staticMonPolDn(), true)
×
1092
                        serviceObjs = append(serviceObjs, rp)
×
1093
                }
×
1094
                // 2. Service graph contract and external network
1095
                // The service graph contract must be bound to the
1096
                // service
1097
                // graph.  This contract must be consumed by the default
1098
                // layer 3 network and provided by the service layer 3
1099
                // network.
1100
                {
1✔
1101
                        var ingresses []string
1✔
1102
                        for _, policy := range cont.snatPolicyCache {
2✔
1103
                                ingresses = append(ingresses, policy.SnatIp...)
1✔
1104
                        }
1✔
1105
                        serviceObjs = append(serviceObjs,
1✔
1106
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
1107
                                        cont.config.AciL3Out, ingresses, sharedSecurity, true))
1✔
1108
                }
1109

1110
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, cont.apicConn.SnatPbrFltrChain, false)
1✔
1111
                serviceObjs = append(serviceObjs, contract)
1✔
1112

1✔
1113
                for _, net := range cont.config.AciExtNetworks {
2✔
1114
                        serviceObjs = append(serviceObjs,
1✔
1115
                                apicExtNetProv(name, cont.config.AciVrfTenant,
1✔
1116
                                        cont.config.AciL3Out, net))
1✔
1117
                }
1✔
1118

1119
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
1120
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
1121
                portSpec := []portRangeSnat{defaultPortRange}
1✔
1122
                if cont.apicConn.SnatPbrFltrChain {
2✔
1123
                        filterIn := apicFilterSnat(name+"_fromCons-toProv", cont.config.AciVrfTenant, portSpec, false)
1✔
1124
                        serviceObjs = append(serviceObjs, filterIn)
1✔
1125
                        filterOut := apicFilterSnat(name+"_fromProv-toCons", cont.config.AciVrfTenant, portSpec, true)
1✔
1126
                        serviceObjs = append(serviceObjs, filterOut)
1✔
1127
                } else {
1✔
1128
                        filter := apicFilterSnat(name, cont.config.AciVrfTenant, portSpec, false)
×
1129
                        serviceObjs = append(serviceObjs, filter)
×
1130
                }
×
1131
                // 3. Device cluster context
1132
                // The logical device context binds the service contract
1133
                // to the redirect policy and the device cluster and
1134
                // bridge domain for the device cluster.
1135
                serviceObjs = append(serviceObjs,
1✔
1136
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName, graphName,
1✔
1137
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, cont.apicConn.SnatPbrFltrChain))
1✔
1138
        }
1139
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1140
        return nil
1✔
1141
}
1142

1143
func (cont *AciController) nodeLabelsInExcludeList(Labels map[string]string) bool {
1✔
1144
        nodeSnatRedirectExclude := cont.config.NodeSnatRedirectExclude
1✔
1145

1✔
1146
        for _, nodeGroup := range nodeSnatRedirectExclude {
1✔
1147
                if len(nodeGroup.Labels) == 0 {
×
1148
                        continue
×
1149
                }
1150
                matchFound := true
×
1151
                for _, label := range nodeGroup.Labels {
×
1152
                        if _, ok := Labels["node-role.kubernetes.io/"+label]; !ok {
×
1153
                                matchFound = false
×
1154
                                break
×
1155
                        }
1156
                }
1157
                if matchFound {
×
1158
                        return true
×
1159
                }
×
1160
        }
1161
        return false
1✔
1162
}
1163

1164
func (cont *AciController) queueServiceUpdateByKey(key string) {
1✔
1165
        cont.serviceQueue.Add(key)
1✔
1166
}
1✔
1167

1168
func (cont *AciController) queueServiceUpdate(service *v1.Service) {
1✔
1169
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
1170
        if err != nil {
1✔
1171
                serviceLogger(cont.log, service).
×
1172
                        Error("Could not create service key: ", err)
×
1173
                return
×
1174
        }
×
1175
        cont.serviceQueue.Add(key)
1✔
1176
}
1177

1178
func apicDeviceCluster(name string, vrfTenant string,
1179
        physDom string, encap string,
1180
        nodes []string, nodeMap map[string]string) (apicapi.ApicObject, string) {
1✔
1181
        dc := apicapi.NewVnsLDevVip(vrfTenant, name)
1✔
1182
        dc.SetAttr("managed", "no")
1✔
1183
        dcDn := dc.GetDn()
1✔
1184
        dc.AddChild(apicapi.NewVnsRsALDevToPhysDomP(dcDn,
1✔
1185
                fmt.Sprintf("uni/phys-%s", physDom)))
1✔
1186
        lif := apicapi.NewVnsLIf(dcDn, "interface")
1✔
1187
        lif.SetAttr("encap", encap)
1✔
1188
        lifDn := lif.GetDn()
1✔
1189

1✔
1190
        for _, node := range nodes {
2✔
1191
                path, ok := nodeMap[node]
1✔
1192
                if !ok {
1✔
1193
                        continue
×
1194
                }
1195

1196
                cdev := apicapi.NewVnsCDev(dcDn, node)
1✔
1197
                cif := apicapi.NewVnsCif(cdev.GetDn(), "interface")
1✔
1198
                cif.AddChild(apicapi.NewVnsRsCIfPathAtt(cif.GetDn(), path))
1✔
1199
                cdev.AddChild(cif)
1✔
1200
                lif.AddChild(apicapi.NewVnsRsCIfAttN(lifDn, cif.GetDn()))
1✔
1201
                dc.AddChild(cdev)
1✔
1202
        }
1203

1204
        dc.AddChild(lif)
1✔
1205

1✔
1206
        return dc, dcDn
1✔
1207
}
1208

1209
func apicServiceGraph(name string, tenantName string,
1210
        dcDn string) apicapi.ApicObject {
1✔
1211
        sg := apicapi.NewVnsAbsGraph(tenantName, name)
1✔
1212
        sgDn := sg.GetDn()
1✔
1213
        var provDn string
1✔
1214
        var consDn string
1✔
1215
        var cTermDn string
1✔
1216
        var pTermDn string
1✔
1217
        {
2✔
1218
                an := apicapi.NewVnsAbsNode(sgDn, "loadbalancer")
1✔
1219
                an.SetAttr("managed", "no")
1✔
1220
                an.SetAttr("routingMode", "Redirect")
1✔
1221
                anDn := an.GetDn()
1✔
1222
                cons := apicapi.NewVnsAbsFuncConn(anDn, "consumer")
1✔
1223
                consDn = cons.GetDn()
1✔
1224
                an.AddChild(cons)
1✔
1225
                prov := apicapi.NewVnsAbsFuncConn(anDn, "provider")
1✔
1226
                provDn = prov.GetDn()
1✔
1227
                an.AddChild(prov)
1✔
1228
                an.AddChild(apicapi.NewVnsRsNodeToLDev(anDn, dcDn))
1✔
1229
                sg.AddChild(an)
1✔
1230
        }
1✔
1231
        {
1✔
1232
                tnc := apicapi.NewVnsAbsTermNodeCon(sgDn, "T1")
1✔
1233
                tncDn := tnc.GetDn()
1✔
1234
                cTerm := apicapi.NewVnsAbsTermConn(tncDn)
1✔
1235
                cTermDn = cTerm.GetDn()
1✔
1236
                tnc.AddChild(cTerm)
1✔
1237
                tnc.AddChild(apicapi.NewVnsInTerm(tncDn))
1✔
1238
                tnc.AddChild(apicapi.NewVnsOutTerm(tncDn))
1✔
1239
                sg.AddChild(tnc)
1✔
1240
        }
1✔
1241
        {
1✔
1242
                tnp := apicapi.NewVnsAbsTermNodeProv(sgDn, "T2")
1✔
1243
                tnpDn := tnp.GetDn()
1✔
1244
                pTerm := apicapi.NewVnsAbsTermConn(tnpDn)
1✔
1245
                pTermDn = pTerm.GetDn()
1✔
1246
                tnp.AddChild(pTerm)
1✔
1247
                tnp.AddChild(apicapi.NewVnsInTerm(tnpDn))
1✔
1248
                tnp.AddChild(apicapi.NewVnsOutTerm(tnpDn))
1✔
1249
                sg.AddChild(tnp)
1✔
1250
        }
1✔
1251
        {
1✔
1252
                acc := apicapi.NewVnsAbsConnection(sgDn, "C1")
1✔
1253
                acc.SetAttr("connDir", "provider")
1✔
1254
                accDn := acc.GetDn()
1✔
1255
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, consDn))
1✔
1256
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, cTermDn))
1✔
1257
                sg.AddChild(acc)
1✔
1258
        }
1✔
1259
        {
1✔
1260
                acp := apicapi.NewVnsAbsConnection(sgDn, "C2")
1✔
1261
                acp.SetAttr("connDir", "provider")
1✔
1262
                acpDn := acp.GetDn()
1✔
1263
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, provDn))
1✔
1264
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, pTermDn))
1✔
1265
                sg.AddChild(acp)
1✔
1266
        }
1✔
1267
        return sg
1✔
1268
}
1269
func (cont *AciController) updateDeviceCluster() {
1✔
1270
        nodeMap := make(map[string]string)
1✔
1271
        cont.indexMutex.Lock()
1✔
1272
        for node := range cont.nodeOpflexDevice {
2✔
1273
                cont.log.Debug("Processing node in nodeOpflexDevice cache : ", node)
1✔
1274
                fabricPath, ok := cont.fabricPathForNode(node)
1✔
1275
                if !ok {
2✔
1276
                        continue
1✔
1277
                }
1278
                nodeMap[node] = fabricPath
1✔
1279
        }
1280

1281
        // For clusters other than OpenShift On OpenStack,
1282
        // openStackFabricPathDnMap will be empty
1283
        for host, opflexOdevInfo := range cont.openStackFabricPathDnMap {
1✔
1284
                nodeMap[host] = opflexOdevInfo.fabricPathDn
×
1285
        }
×
1286

1287
        // For OpenShift On OpenStack clusters,
1288
        // hostFabricPathDnMap will be empty
1289
        for _, hostInfo := range cont.hostFabricPathDnMap {
1✔
1290
                if hostInfo.fabricPathDn != "" {
×
1291
                        nodeMap[hostInfo.host] = hostInfo.fabricPathDn
×
1292
                }
×
1293
        }
1294
        cont.indexMutex.Unlock()
1✔
1295

1✔
1296
        var nodes []string
1✔
1297
        for node := range nodeMap {
2✔
1298
                nodes = append(nodes, node)
1✔
1299
        }
1✔
1300
        sort.Strings(nodes)
1✔
1301

1✔
1302
        name := cont.aciNameForKey("svc", "global")
1✔
1303
        var serviceObjs apicapi.ApicSlice
1✔
1304

1✔
1305
        // 1. Device cluster:
1✔
1306
        // The device cluster is a set of physical paths that need to be
1✔
1307
        // created for each node in the cluster, that correspond to the
1✔
1308
        // service interface for each node.
1✔
1309
        dc, dcDn := apicDeviceCluster(name, cont.config.AciVrfTenant,
1✔
1310
                cont.config.AciServicePhysDom, cont.config.AciServiceEncap,
1✔
1311
                nodes, nodeMap)
1✔
1312
        serviceObjs = append(serviceObjs, dc)
1✔
1313

1✔
1314
        // 2. Service graph template
1✔
1315
        // The service graph controls how the traffic will be redirected.
1✔
1316
        // A service graph must be created for each device cluster.
1✔
1317
        serviceObjs = append(serviceObjs,
1✔
1318
                apicServiceGraph(name, cont.config.AciVrfTenant, dcDn))
1✔
1319

1✔
1320
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1321
}
1322

1323
func (cont *AciController) fabricPathLogger(node string,
1324
        obj apicapi.ApicObject) *logrus.Entry {
1✔
1325
        return cont.log.WithFields(logrus.Fields{
1✔
1326
                "fabricPath": obj.GetAttr("fabricPathDn"),
1✔
1327
                "mac":        obj.GetAttr("mac"),
1✔
1328
                "node":       node,
1✔
1329
                "obj":        obj,
1✔
1330
        })
1✔
1331
}
1✔
1332

1333
func (cont *AciController) setOpenStackSystemId() string {
×
1334

×
1335
        // 1) get opflexIDEp with containerName == <node name of any one of the openshift nodes>
×
1336
        // 2) extract OpenStack system id from compHvDn attribute
×
1337
        //    comp/prov-OpenStack/ctrlr-[k8s-scale]-k8s-scale/hv-overcloud-novacompute-0 - sample compHvDn,
×
1338
        //    where k8s-scale is the system id
×
1339

×
1340
        var systemId string
×
1341
        nodeList := cont.nodeIndexer.List()
×
1342
        if len(nodeList) < 1 {
×
1343
                return systemId
×
1344
        }
×
1345
        node := nodeList[0].(*v1.Node)
×
1346
        nodeName := node.ObjectMeta.Name
×
1347
        opflexIDEpFilter := fmt.Sprintf("query-target-filter=and(eq(opflexIDEp.containerName,\"%s\"))", nodeName)
×
1348
        opflexIDEpArgs := []string{
×
1349
                opflexIDEpFilter,
×
1350
        }
×
1351
        url := fmt.Sprintf("/api/node/class/opflexIDEp.json?%s", strings.Join(opflexIDEpArgs, "&"))
×
1352
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
1353
        if err != nil {
×
1354
                cont.log.Error("Failed to get APIC response, err: ", err.Error())
×
1355
                return systemId
×
1356
        }
×
1357
        for _, obj := range apicresp.Imdata {
×
1358
                for _, body := range obj {
×
1359
                        compHvDn, ok := body.Attributes["compHvDn"].(string)
×
1360
                        if ok {
×
1361
                                systemId = compHvDn[strings.IndexByte(compHvDn, '[')+1 : strings.IndexByte(compHvDn, ']')]
×
1362
                                break
×
1363
                        }
1364
                }
1365
        }
1366
        cont.indexMutex.Lock()
×
1367
        cont.openStackSystemId = systemId
×
1368
        cont.log.Info("Setting OpenStack system id : ", cont.openStackSystemId)
×
1369
        cont.indexMutex.Unlock()
×
1370
        return systemId
×
1371
}
1372

1373
// Returns true when a new OpenStack opflexODev is added
1374
func (cont *AciController) openStackOpflexOdevUpdate(obj apicapi.ApicObject) bool {
×
1375

×
1376
        // If opflexOdev compHvDn contains comp/prov-OpenShift/ctrlr-[<systemid>]-<systemid>,
×
1377
        // it means that it is an OpenStack OpflexOdev which belongs to OpenStack with system id <systemid>
×
1378

×
1379
        var deviceClusterUpdate bool
×
1380
        compHvDn := obj.GetAttrStr("compHvDn")
×
1381
        if strings.Contains(compHvDn, "prov-OpenStack") {
×
1382
                cont.indexMutex.Lock()
×
1383
                systemId := cont.openStackSystemId
×
1384
                cont.indexMutex.Unlock()
×
1385
                if systemId == "" {
×
1386
                        systemId = cont.setOpenStackSystemId()
×
1387
                }
×
1388
                if systemId == "" {
×
1389
                        cont.log.Error("Failed  to get OpenStack system id")
×
1390
                        return deviceClusterUpdate
×
1391
                }
×
1392
                prefix := fmt.Sprintf("comp/prov-OpenStack/ctrlr-[%s]-%s", systemId, systemId)
×
1393
                if strings.Contains(compHvDn, prefix) {
×
1394
                        cont.log.Info("Received notification for OpenStack opflexODev update, hostName: ",
×
1395
                                obj.GetAttrStr("hostName"), " dn: ", obj.GetAttrStr("dn"))
×
1396
                        cont.indexMutex.Lock()
×
1397
                        opflexOdevInfo, ok := cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")]
×
1398
                        if ok {
×
1399
                                opflexOdevInfo.opflexODevDn[obj.GetAttrStr("dn")] = struct{}{}
×
1400
                                cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")] = opflexOdevInfo
×
1401
                        } else {
×
1402
                                var openstackopflexodevinfo openstackOpflexOdevInfo
×
1403
                                opflexODevDn := make(map[string]struct{})
×
1404
                                opflexODevDn[obj.GetAttrStr("dn")] = struct{}{}
×
1405
                                openstackopflexodevinfo.fabricPathDn = obj.GetAttrStr("fabricPathDn")
×
1406
                                openstackopflexodevinfo.opflexODevDn = opflexODevDn
×
1407
                                cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")] = openstackopflexodevinfo
×
1408
                                deviceClusterUpdate = true
×
1409
                        }
×
1410
                        cont.indexMutex.Unlock()
×
1411
                }
1412
        }
1413
        return deviceClusterUpdate
×
1414
}
1415

1416
func (cont *AciController) infraRtAttEntPDeleted(dn string) {
×
1417
        // dn format : uni/infra/attentp-k8s-scale-esxi-aaep/rtattEntP-[uni/infra/funcprof/accbundle-esxi1-vpc-ipg]
×
1418
        cont.log.Info("Processing delete of infraRtAttEntP: ", dn)
×
1419

×
1420
        // extract uni/infra/funcprof/accbundle-esxi1-vpc-ipg
×
1421
        re := regexp.MustCompile(`\[(.*?)\]`)
×
1422
        matches := re.FindStringSubmatch(dn)
×
1423

×
1424
        if len(matches) < 2 {
×
1425
                cont.log.Error("Failed to extract ipg from dn : ", dn)
×
1426
                return
×
1427
        }
×
1428
        tdn := matches[1]
×
1429

×
1430
        cont.indexMutex.Lock()
×
1431
        _, ok := cont.hostFabricPathDnMap[tdn]
×
1432
        if ok {
×
1433
                delete(cont.hostFabricPathDnMap, tdn)
×
1434
                cont.log.Info("Deleted ipg : ", tdn)
×
1435
        }
×
1436
        cont.indexMutex.Unlock()
×
1437

×
1438
        if ok {
×
1439
                cont.updateDeviceCluster()
×
1440
        }
×
1441
}
1442

1443
func (cont *AciController) vpcIfDeleted(dn string) {
×
1444
        var deleted bool
×
1445
        cont.indexMutex.Lock()
×
1446
        for tDn, hostInfo := range cont.hostFabricPathDnMap {
×
1447
                if _, present := hostInfo.vpcIfDn[dn]; present {
×
1448
                        cont.log.Info("Deleting vpcIf, dn :", dn)
×
1449
                        delete(hostInfo.vpcIfDn, dn)
×
1450
                        if len(hostInfo.vpcIfDn) == 0 {
×
1451
                                cont.log.Infof("Removing fabricPathDn(%s) of ipg : %s ", hostInfo.fabricPathDn, hostInfo.host)
×
1452
                                hostInfo.fabricPathDn = ""
×
1453
                                deleted = true
×
1454
                        }
×
1455
                        cont.hostFabricPathDnMap[tDn] = hostInfo
×
1456
                }
1457
        }
1458
        cont.indexMutex.Unlock()
×
1459
        if deleted {
×
1460
                cont.updateDeviceCluster()
×
1461
        }
×
1462
}
1463

1464
func (cont *AciController) vpcIfChanged(obj apicapi.ApicObject) {
×
1465
        if cont.updateHostFabricPathDnMap(obj) {
×
1466
                cont.updateDeviceCluster()
×
1467
        }
×
1468
}
1469

1470
func (cont *AciController) updateHostFabricPathDnMap(obj apicapi.ApicObject) bool {
×
1471
        var accBndlGrpDn, fabricPathDn, dn string
×
1472
        for _, body := range obj {
×
1473
                var ok bool
×
1474
                accBndlGrpDn, ok = body.Attributes["accBndlGrpDn"].(string)
×
1475
                if !ok || (ok && accBndlGrpDn == "") {
×
1476
                        cont.log.Error("accBndlGrpDn missing/empty in vpcIf")
×
1477
                        return false
×
1478
                }
×
1479
                fabricPathDn, ok = body.Attributes["fabricPathDn"].(string)
×
1480
                if !ok && (ok && fabricPathDn == "") {
×
1481
                        cont.log.Error("fabricPathDn missing/empty in vpcIf")
×
1482
                        return false
×
1483
                }
×
1484
                dn, ok = body.Attributes["dn"].(string)
×
1485
                if !ok && (ok && dn == "") {
×
1486
                        cont.log.Error("dn missing/empty in vpcIf")
×
1487
                        return false
×
1488
                }
×
1489
        }
1490
        var updated bool
×
1491
        cont.indexMutex.Lock()
×
1492
        // If accBndlGrpDn exists in hostFabricPathDnMap, the vpcIf belongs to the cluster AEP
×
1493
        hostInfo, exists := cont.hostFabricPathDnMap[accBndlGrpDn]
×
1494
        if exists {
×
1495
                if _, present := hostInfo.vpcIfDn[dn]; !present {
×
1496
                        hostInfo.vpcIfDn[dn] = struct{}{}
×
1497
                        cont.log.Infof("vpcIf processing, dn : %s, accBndlGrpDn: %s", dn, accBndlGrpDn)
×
1498
                }
×
1499
                if hostInfo.fabricPathDn != fabricPathDn {
×
1500
                        hostInfo.fabricPathDn = fabricPathDn
×
1501
                        cont.log.Info("Updated fabricPathDn of ipg :", hostInfo.host, " to: ", hostInfo.fabricPathDn)
×
1502
                        updated = true
×
1503
                }
×
1504
                cont.hostFabricPathDnMap[accBndlGrpDn] = hostInfo
×
1505
        }
1506
        cont.indexMutex.Unlock()
×
1507
        return updated
×
1508
}
1509

1510
func (cont *AciController) infraRtAttEntPChanged(obj apicapi.ApicObject) {
×
1511
        var tdn string
×
1512
        for _, body := range obj {
×
1513
                var ok bool
×
1514
                tdn, ok = body.Attributes["tDn"].(string)
×
1515
                if !ok || (ok && tdn == "") {
×
1516
                        cont.log.Error("tDn missing/empty in infraRtAttEntP")
×
1517
                        return
×
1518
                }
×
1519
        }
1520
        var updated bool
×
1521
        cont.log.Info("infraRtAttEntP updated, tDn : ", tdn)
×
1522

×
1523
        // tdn format for vpc : /uni/infra/funcprof/accbundle-esxi1-vpc-ipg
×
1524
        // tdn format for single leaf : /uni/infra/funcprof/accportgrp-IPG_CLIENT_SIM
×
1525

×
1526
        // Ignore processing of single leaf
×
1527
        if !strings.Contains(tdn, "/accbundle-") {
×
1528
                cont.log.Info("Skipping processing of infraRtAttEntP update, not applicable for non-VPC configuration: ", tdn)
×
1529
                return
×
1530
        }
×
1531

1532
        // extract esxi1-vpc-ipg
1533
        parts := strings.Split(tdn, "/")
×
1534
        lastPart := parts[len(parts)-1]
×
1535
        host := strings.TrimPrefix(lastPart, "accbundle-")
×
1536

×
1537
        // adding entry for ipg in hostFabricPathDnMap
×
1538
        cont.indexMutex.Lock()
×
1539
        _, exists := cont.hostFabricPathDnMap[tdn]
×
1540
        if !exists {
×
1541
                var hostInfo hostFabricInfo
×
1542
                hostInfo.host = host
×
1543
                hostInfo.vpcIfDn = make(map[string]struct{})
×
1544
                cont.hostFabricPathDnMap[tdn] = hostInfo
×
1545
        }
×
1546
        cont.indexMutex.Unlock()
×
1547

×
1548
        accBndlGrpFilter := fmt.Sprintf(`query-target-filter=and(eq(vpcIf.accBndlGrpDn,"%s"))`, tdn)
×
1549
        url := fmt.Sprintf("/api/class/vpcIf.json?%s", accBndlGrpFilter)
×
1550
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
1551
        if err != nil {
×
1552
                cont.log.Error("Failed to get APIC response, err: ", err.Error())
×
1553
                return
×
1554
        }
×
1555

1556
        for _, obj := range apicresp.Imdata {
×
1557
                if cont.updateHostFabricPathDnMap(obj) && !updated {
×
1558
                        updated = true
×
1559
                }
×
1560
        }
1561

1562
        if updated {
×
1563
                cont.updateDeviceCluster()
×
1564
        }
×
1565
        return
×
1566
}
1567

1568
func (cont *AciController) opflexDeviceChanged(obj apicapi.ApicObject) {
1✔
1569
        devType := obj.GetAttrStr("devType")
1✔
1570
        domName := obj.GetAttrStr("domName")
1✔
1571
        ctrlrName := obj.GetAttrStr("ctrlrName")
1✔
1572

1✔
1573
        if !cont.config.DisableServiceVlanPreprovisioning && strings.Contains(cont.config.Flavor, "openstack") {
1✔
1574
                if cont.openStackOpflexOdevUpdate(obj) {
×
1575
                        cont.log.Info("OpenStack opflexODev for ", obj.GetAttrStr("hostName"), " is added")
×
1576
                        cont.updateDeviceCluster()
×
1577
                }
×
1578
        }
1579
        if (devType == cont.env.OpFlexDeviceType()) && (domName == cont.config.AciVmmDomain) && (ctrlrName == cont.config.AciVmmController) {
2✔
1580
                cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Processing opflex device update")
1✔
1581
                if obj.GetAttrStr("state") == "disconnected" {
2✔
1582
                        cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Opflex device disconnected")
1✔
1583
                        cont.indexMutex.Lock()
1✔
1584
                        for node, devices := range cont.nodeOpflexDevice {
1✔
1585
                                if node == obj.GetAttrStr("hostName") {
×
1586
                                        for _, device := range devices {
×
1587
                                                if device.GetDn() == obj.GetDn() {
×
1588
                                                        device.SetAttr("state", "disconnected")
×
1589
                                                        cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Debug("Opflex device cache updated for disconnected node")
×
1590
                                                }
×
1591
                                        }
1592
                                        cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", devices)
×
1593
                                        break
×
1594
                                }
1595
                        }
1596
                        cont.indexMutex.Unlock()
1✔
1597
                        cont.updateDeviceCluster()
1✔
1598
                        return
1✔
1599
                }
1600
                var nodeUpdates []string
1✔
1601

1✔
1602
                cont.indexMutex.Lock()
1✔
1603
                nodefound := false
1✔
1604
                for node, devices := range cont.nodeOpflexDevice {
2✔
1605
                        found := false
1✔
1606

1✔
1607
                        if node == obj.GetAttrStr("hostName") {
2✔
1608
                                nodefound = true
1✔
1609
                        }
1✔
1610

1611
                        for i, device := range devices {
2✔
1612
                                if device.GetDn() != obj.GetDn() {
2✔
1613
                                        continue
1✔
1614
                                }
1615
                                found = true
1✔
1616

1✔
1617
                                if obj.GetAttrStr("hostName") != node {
2✔
1618
                                        cont.fabricPathLogger(node, device).
1✔
1619
                                                Debug("Moving opflex device from node")
1✔
1620

1✔
1621
                                        devices = append(devices[:i], devices[i+1:]...)
1✔
1622
                                        cont.nodeOpflexDevice[node] = devices
1✔
1623
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1624
                                        break
1✔
1625
                                } else if (device.GetAttrStr("mac") != obj.GetAttrStr("mac")) ||
1✔
1626
                                        (device.GetAttrStr("fabricPathDn") != obj.GetAttrStr("fabricPathDn")) ||
1✔
1627
                                        (device.GetAttrStr("state") != obj.GetAttrStr("state")) {
2✔
1628
                                        cont.fabricPathLogger(node, obj).
1✔
1629
                                                Debug("Updating opflex device")
1✔
1630

1✔
1631
                                        devices = append(append(devices[:i], devices[i+1:]...), obj)
1✔
1632
                                        cont.nodeOpflexDevice[node] = devices
1✔
1633
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1634
                                        break
1✔
1635
                                }
1636
                        }
1637
                        if !found && obj.GetAttrStr("hostName") == node {
2✔
1638
                                cont.fabricPathLogger(node, obj).
1✔
1639
                                        Debug("Appending opflex device")
1✔
1640

1✔
1641
                                devices = append(devices, obj)
1✔
1642
                                cont.nodeOpflexDevice[node] = devices
1✔
1643
                                nodeUpdates = append(nodeUpdates, node)
1✔
1644
                        }
1✔
1645
                }
1646
                if !nodefound {
2✔
1647
                        node := obj.GetAttrStr("hostName")
1✔
1648
                        cont.fabricPathLogger(node, obj).Debug("Adding opflex device")
1✔
1649
                        cont.nodeOpflexDevice[node] = apicapi.ApicSlice{obj}
1✔
1650
                        nodeUpdates = append(nodeUpdates, node)
1✔
1651
                }
1✔
1652
                cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", cont.nodeOpflexDevice[obj.GetAttrStr("hostName")])
1✔
1653
                cont.indexMutex.Unlock()
1✔
1654

1✔
1655
                for _, node := range nodeUpdates {
2✔
1656
                        cont.env.NodeServiceChanged(node)
1✔
1657
                        cont.erspanSyncOpflexDev()
1✔
1658
                }
1✔
1659
                cont.updateDeviceCluster()
1✔
1660
        }
1661
}
1662

1663
func (cont *AciController) postOpflexDeviceDelete(nodes []string) {
1✔
1664
        cont.updateDeviceCluster()
1✔
1665
        for _, node := range nodes {
2✔
1666
                cont.env.NodeServiceChanged(node)
1✔
1667
                cont.erspanSyncOpflexDev()
1✔
1668
        }
1✔
1669
}
1670

1671
func (cont *AciController) opflexDeviceDeleted(dn string) {
1✔
1672
        var nodeUpdates []string
1✔
1673
        var dnFound bool //to check if the dn belongs to this cluster
1✔
1674
        cont.log.Info("Processing opflex device delete notification of ", dn)
1✔
1675
        cont.indexMutex.Lock()
1✔
1676
        for node, devices := range cont.nodeOpflexDevice {
2✔
1677
                for i, device := range devices {
2✔
1678
                        if device.GetDn() != dn {
2✔
1679
                                continue
1✔
1680
                        }
1681
                        dnFound = true
1✔
1682
                        cont.fabricPathLogger(node, device).
1✔
1683
                                Debug("Deleting opflex device path")
1✔
1684
                        devices = append(devices[:i], devices[i+1:]...)
1✔
1685
                        cont.nodeOpflexDevice[node] = devices
1✔
1686
                        cont.log.Info("Deleted opflex device of node ", node, ": ", dn)
1✔
1687
                        nodeUpdates = append(nodeUpdates, node)
1✔
1688
                        break
1✔
1689
                }
1690
                if len(devices) == 0 {
2✔
1691
                        delete(cont.nodeOpflexDevice, node)
1✔
1692
                }
1✔
1693
        }
1694

1695
        // For clusters other than OpenShift On OpenStack,
1696
        // openStackFabricPathDnMap will be empty
1697
        for host, opflexOdevInfo := range cont.openStackFabricPathDnMap {
1✔
1698
                if _, ok := opflexOdevInfo.opflexODevDn[dn]; ok {
×
1699
                        cont.log.Info("Received OpenStack opflexODev delete notification for ", dn)
×
1700
                        delete(opflexOdevInfo.opflexODevDn, dn)
×
1701
                        if len(opflexOdevInfo.opflexODevDn) < 1 {
×
1702
                                delete(cont.openStackFabricPathDnMap, host)
×
1703
                                cont.log.Info("OpenStack opflexODev of host ", host, " is deleted from cache")
×
1704
                                dnFound = true
×
1705
                        } else {
×
1706
                                cont.openStackFabricPathDnMap[host] = opflexOdevInfo
×
1707
                        }
×
1708
                        break
×
1709
                }
1710
        }
1711
        cont.indexMutex.Unlock()
1✔
1712

1✔
1713
        if dnFound {
2✔
1714
                cont.postOpflexDeviceDelete(nodeUpdates)
1✔
1715
        }
1✔
1716
}
1717

1718
func (cont *AciController) writeApicSvc(key string, service *v1.Service) {
1✔
1719
        if cont.isCNOEnabled() {
1✔
1720
                return
×
1721
        }
×
1722
        aobj := apicapi.NewVmmInjectedSvc(cont.vmmDomainProvider(),
1✔
1723
                cont.config.AciVmmDomain, cont.config.AciVmmController,
1✔
1724
                service.Namespace, service.Name)
1✔
1725
        aobjDn := aobj.GetDn()
1✔
1726
        aobj.SetAttr("guid", string(service.UID))
1✔
1727

1✔
1728
        svcns := service.ObjectMeta.Namespace
1✔
1729
        _, exists, err := cont.namespaceIndexer.GetByKey(svcns)
1✔
1730
        if err != nil {
1✔
1731
                cont.log.Error("Failed to lookup ns : ", svcns, " ", err)
×
1732
                return
×
1733
        }
×
1734
        if !exists {
2✔
1735
                cont.log.Debug("Namespace of service ", service.ObjectMeta.Name, ": ", svcns, " doesn't exist, hence not sending an update to the APIC")
1✔
1736
                return
1✔
1737
        }
1✔
1738

1739
        if !cont.serviceEndPoints.SetServiceApicObject(aobj, service) {
2✔
1740
                return
1✔
1741
        }
1✔
1742
        var setApicSvcDnsName bool
1✔
1743
        if len(cont.config.ApicHosts) != 0 && apicapi.ApicVersion >= "5.1" {
1✔
1744
                setApicSvcDnsName = true
×
1745
        }
×
1746
        // APIC model only allows one of these
1747
        for _, ingress := range service.Status.LoadBalancer.Ingress {
1✔
1748
                if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
1749
                        aobj.SetAttr("lbIp", ingress.IP)
×
1750
                } else if ingress.Hostname != "" {
×
1751
                        ipList, err := net.LookupHost(ingress.Hostname)
×
1752
                        if err == nil && len(ipList) > 0 {
×
1753
                                aobj.SetAttr("lbIp", ipList[0])
×
1754
                        } else {
×
1755
                                cont.log.Errorf("Lookup: err: %v, ipList: %+v", err, ipList)
×
1756
                        }
×
1757
                }
1758
                break
×
1759
        }
1760
        if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != "None" {
2✔
1761
                aobj.SetAttr("clusterIp", service.Spec.ClusterIP)
1✔
1762
        }
1✔
1763

1764
        var t string
1✔
1765
        switch service.Spec.Type {
1✔
1766
        case v1.ServiceTypeClusterIP:
×
1767
                t = "clusterIp"
×
1768
        case v1.ServiceTypeNodePort:
×
1769
                t = "nodePort"
×
1770
        case v1.ServiceTypeLoadBalancer:
1✔
1771
                t = "loadBalancer"
1✔
1772
        case v1.ServiceTypeExternalName:
×
1773
                t = "externalName"
×
1774
        }
1775
        if t != "" {
2✔
1776
                aobj.SetAttr("type", t)
1✔
1777
        }
1✔
1778

1779
        if setApicSvcDnsName || cont.config.Flavor == "k8s-overlay" {
1✔
1780
                dnsName := fmt.Sprintf("%s.%s.svc.cluster.local", service.Name, service.Namespace)
×
1781

×
1782
                for _, ingress := range service.Status.LoadBalancer.Ingress {
×
1783
                        if ingress.Hostname != "" {
×
1784
                                aobj.SetAttr("dnsName", ingress.Hostname)
×
1785
                        } else if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
1786
                                aobj.SetAttr("dnsName", dnsName)
×
1787
                        }
×
1788
                }
1789
                if t == "clusterIp" || t == "nodePort" || t == "externalName" {
×
1790
                        aobj.SetAttr("dnsName", dnsName)
×
1791
                }
×
1792
        }
1793
        for _, port := range service.Spec.Ports {
2✔
1794
                proto := getProtocolStr(port.Protocol)
1✔
1795
                p := apicapi.NewVmmInjectedSvcPort(aobjDn,
1✔
1796
                        strconv.Itoa(int(port.Port)), proto, port.TargetPort.String())
1✔
1797
                p.SetAttr("nodePort", strconv.Itoa(int(port.NodePort)))
1✔
1798
                aobj.AddChild(p)
1✔
1799
        }
1✔
1800
        if cont.config.EnableVmmInjectedLabels && service.ObjectMeta.Labels != nil && apicapi.ApicVersion >= "5.2" {
1✔
1801
                for key, val := range service.ObjectMeta.Labels {
×
1802
                        newLabelKey := cont.aciNameForKey("label", key)
×
1803
                        label := apicapi.NewVmmInjectedLabel(aobj.GetDn(),
×
1804
                                newLabelKey, val)
×
1805
                        aobj.AddChild(label)
×
1806
                }
×
1807
        }
1808
        name := cont.aciNameForKey("service-vmm", key)
1✔
1809
        cont.log.Debug("Write Service Object: ", aobj)
1✔
1810
        cont.apicConn.WriteApicObjects(name, apicapi.ApicSlice{aobj})
1✔
1811
        cont.log.Debugf("svcObject: %+v", aobj)
1✔
1812
}
1813

1814
func removeAllConditions(conditions []metav1.Condition, conditionType string) []metav1.Condition {
1✔
1815
        i := 0
1✔
1816
        for _, cond := range conditions {
1✔
1817
                if cond.Type != conditionType {
×
1818
                        conditions[i] = cond
×
1819
                }
×
1820
        }
1821
        return conditions[:i]
1✔
1822
}
1823

1824
func (cont *AciController) updateServiceCondition(service *v1.Service, success bool, reason string, message string) bool {
1✔
1825
        conditionType := "LbIpamAllocation"
1✔
1826

1✔
1827
        var condition metav1.Condition
1✔
1828
        if success {
2✔
1829
                condition.Status = metav1.ConditionTrue
1✔
1830
        } else {
2✔
1831
                condition.Status = metav1.ConditionFalse
1✔
1832
                condition.Message = message
1✔
1833
        }
1✔
1834
        condition.Type = conditionType
1✔
1835
        condition.Reason = reason
1✔
1836
        condition.LastTransitionTime = metav1.Time{time.Now()}
1✔
1837
        for _, cond := range service.Status.Conditions {
2✔
1838
                if cond.Type == conditionType &&
1✔
1839
                        cond.Status == condition.Status &&
1✔
1840
                        cond.Message == condition.Message &&
1✔
1841
                        cond.Reason == condition.Reason {
2✔
1842
                        return false
1✔
1843
                }
1✔
1844
        }
1845

1846
        service.Status.Conditions = removeAllConditions(service.Status.Conditions, conditionType)
1✔
1847
        service.Status.Conditions = append(service.Status.Conditions, condition)
1✔
1848
        return true
1✔
1849
}
1850

1851
func (cont *AciController) validateRequestedIps(lbIpList []string) (net.IP, net.IP, bool) {
1✔
1852
        var ipv4, ipv6 net.IP
1✔
1853
        for _, lbIp := range lbIpList {
2✔
1854
                ip := net.ParseIP(lbIp)
1✔
1855
                if ip != nil {
2✔
1856
                        if ip.To4() != nil {
2✔
1857
                                if ipv4.Equal(net.IP{}) {
2✔
1858
                                        ipv4 = ip
1✔
1859
                                } else {
2✔
1860
                                        cont.log.Error("Annotation should have only one ipv4")
1✔
1861
                                        return ipv4, ipv6, false
1✔
1862
                                }
1✔
1863
                        } else if ip.To16() != nil {
2✔
1864
                                if ipv6.Equal(net.IP{}) {
2✔
1865
                                        ipv6 = ip
1✔
1866
                                } else {
2✔
1867
                                        cont.log.Error("Annotation should have only one ipv6")
1✔
1868
                                        return ipv4, ipv6, false
1✔
1869
                                }
1✔
1870
                        }
1871
                }
1872
        }
1873
        return ipv4, ipv6, true
1✔
1874
}
1875

1876
func (cont *AciController) returnUnusedStaticIngressIps(staticIngressIps, requestedIps []net.IP) {
1✔
1877
        for _, staticIp := range staticIngressIps {
2✔
1878
                found := false
1✔
1879
                for _, reqIp := range requestedIps {
2✔
1880
                        if reqIp.Equal(staticIp) {
2✔
1881
                                found = true
1✔
1882
                        }
1✔
1883
                }
1884
                if !found {
2✔
1885
                        returnIps(cont.staticServiceIps, []net.IP{staticIp})
1✔
1886
                }
1✔
1887
        }
1888
}
1889

1890
func (cont *AciController) allocateServiceIps(servicekey string,
1891
        service *v1.Service) bool {
1✔
1892
        logger := serviceLogger(cont.log, service)
1✔
1893
        cont.indexMutex.Lock()
1✔
1894
        meta, ok := cont.serviceMetaCache[servicekey]
1✔
1895
        if !ok {
2✔
1896
                meta = &serviceMeta{}
1✔
1897
                cont.serviceMetaCache[servicekey] = meta
1✔
1898

1✔
1899
                // Read any existing IPs and attempt to allocate them to the pod
1✔
1900
                for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
1901
                        ip := net.ParseIP(ingress.IP)
1✔
1902
                        if ip == nil {
1✔
1903
                                continue
×
1904
                        }
1905
                        if ip.To4() != nil {
2✔
1906
                                if cont.serviceIps.GetV4IpCache()[0].RemoveIp(ip) {
2✔
1907
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
1908
                                } else if cont.staticServiceIps.V4.RemoveIp(ip) {
3✔
1909
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
1910
                                }
1✔
1911
                        } else if ip.To16() != nil {
2✔
1912
                                if cont.serviceIps.GetV6IpCache()[0].RemoveIp(ip) {
2✔
1913
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
1914
                                } else if cont.staticServiceIps.V6.RemoveIp(ip) {
3✔
1915
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
1916
                                }
1✔
1917
                        }
1918
                }
1919
        }
1920

1921
        if !cont.serviceSyncEnabled {
2✔
1922
                cont.indexMutex.Unlock()
1✔
1923
                return false
1✔
1924
        }
1✔
1925

1926
        var requestedIps []net.IP
1✔
1927
        // try to give the requested load balancer IP to the pod
1✔
1928
        lbIps, ok := service.ObjectMeta.Annotations[metadata.LbIpAnnotation]
1✔
1929
        if ok {
2✔
1930
                lbIpList := strings.Split(lbIps, ",")
1✔
1931
                ipv4, ipv6, valid := cont.validateRequestedIps(lbIpList)
1✔
1932
                if valid {
2✔
1933
                        if ipv4 != nil {
2✔
1934
                                requestedIps = append(requestedIps, ipv4)
1✔
1935
                        }
1✔
1936
                        if ipv6 != nil {
2✔
1937
                                requestedIps = append(requestedIps, ipv6)
1✔
1938
                        }
1✔
1939
                } else {
1✔
1940
                        cont.returnServiceIps(meta.ingressIps)
1✔
1941
                        cont.log.Error("Invalid LB IP annotation for service ", servicekey)
1✔
1942
                        condUpdated := cont.updateServiceCondition(service, false, "InvalidAnnotation", "Invalid Loadbalancer IP annotation")
1✔
1943
                        if condUpdated {
2✔
1944
                                _, err := cont.updateServiceStatus(service)
1✔
1945
                                if err != nil {
1✔
1946
                                        logger.Error("Failed to update service status : ", err)
×
1947
                                        cont.indexMutex.Unlock()
×
1948
                                        return true
×
1949
                                }
×
1950
                        }
1951
                        cont.indexMutex.Unlock()
1✔
1952
                        return false
1✔
1953
                }
1954
        } else {
1✔
1955
                requestedIp := net.ParseIP(service.Spec.LoadBalancerIP)
1✔
1956
                if requestedIp != nil {
2✔
1957
                        requestedIps = append(requestedIps, requestedIp)
1✔
1958
                }
1✔
1959
        }
1960
        if len(requestedIps) > 0 {
2✔
1961
                meta.requestedIps = []net.IP{}
1✔
1962
                for _, requestedIp := range requestedIps {
2✔
1963
                        hasRequestedIp := false
1✔
1964
                        for _, ip := range meta.staticIngressIps {
2✔
1965
                                if reflect.DeepEqual(requestedIp, ip) {
2✔
1966
                                        hasRequestedIp = true
1✔
1967
                                }
1✔
1968
                        }
1969
                        if !hasRequestedIp {
2✔
1970
                                if requestedIp.To4() != nil &&
1✔
1971
                                        cont.staticServiceIps.V4.RemoveIp(requestedIp) {
2✔
1972
                                        hasRequestedIp = true
1✔
1973
                                } else if requestedIp.To16() != nil &&
2✔
1974
                                        cont.staticServiceIps.V6.RemoveIp(requestedIp) {
2✔
1975
                                        hasRequestedIp = true
1✔
1976
                                }
1✔
1977
                        }
1978
                        if hasRequestedIp {
2✔
1979
                                meta.requestedIps = append(meta.requestedIps, requestedIp)
1✔
1980
                        }
1✔
1981
                }
1982
                cont.returnUnusedStaticIngressIps(meta.staticIngressIps, meta.requestedIps)
1✔
1983
                meta.staticIngressIps = meta.requestedIps
1✔
1984
                cont.returnServiceIps(meta.ingressIps)
1✔
1985
                meta.ingressIps = nil
1✔
1986
                // If no requested ips are allocatable
1✔
1987
                if len(meta.requestedIps) < 1 {
2✔
1988
                        logger.Error("No Requested Ip addresses available for service ", servicekey)
1✔
1989
                        condUpdated := cont.updateServiceCondition(service, false, "RequestedIpsNotAllocatable", "The requested ips for loadbalancer service are not available or not in extern static range")
1✔
1990
                        if condUpdated {
2✔
1991
                                _, err := cont.updateServiceStatus(service)
1✔
1992
                                if err != nil {
1✔
1993
                                        cont.indexMutex.Unlock()
×
1994
                                        logger.Error("Failed to update service status: ", err)
×
1995
                                        return true
×
1996
                                }
×
1997
                        }
1998
                        cont.indexMutex.Unlock()
1✔
1999
                        return false
1✔
2000
                }
2001
        } else if len(meta.requestedIps) > 0 {
1✔
2002
                meta.requestedIps = nil
×
2003
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
×
2004
                meta.staticIngressIps = nil
×
2005
        }
×
2006
        ingressIps := make([]net.IP, 0)
1✔
2007
        ingressIps = append(ingressIps, meta.ingressIps...)
1✔
2008
        ingressIps = append(ingressIps, meta.staticIngressIps...)
1✔
2009

1✔
2010
        var ipv4, ipv6 net.IP
1✔
2011
        for _, ip := range ingressIps {
2✔
2012
                if ip.To4() != nil {
2✔
2013
                        ipv4 = ip
1✔
2014
                } else if ip.To16() != nil {
3✔
2015
                        ipv6 = ip
1✔
2016
                }
1✔
2017
        }
2018
        var clusterIPv4, clusterIPv6 net.IP
1✔
2019
        clusterIPs := append([]string{service.Spec.ClusterIP}, service.Spec.ClusterIPs...)
1✔
2020
        for _, ipStr := range clusterIPs {
2✔
2021
                ip := net.ParseIP(ipStr)
1✔
2022
                if ip == nil {
1✔
2023
                        continue
×
2024
                }
2025
                if ip.To4() != nil && clusterIPv4 == nil {
2✔
2026
                        clusterIPv4 = ip
1✔
2027
                } else if ip.To16() != nil && strings.Contains(ip.String(), ":") && clusterIPv6 == nil {
3✔
2028
                        clusterIPv6 = ip
1✔
2029
                }
1✔
2030
        }
2031
        if clusterIPv4 != nil && ipv4 == nil {
2✔
2032
                if len(requestedIps) < 1 {
2✔
2033
                        ipv4, _ = cont.serviceIps.AllocateIp(true)
1✔
2034
                        if ipv4 != nil {
2✔
2035
                                ingressIps = append(ingressIps, ipv4)
1✔
2036
                        }
1✔
2037
                }
2038
        } else if clusterIPv4 == nil && ipv4 != nil {
1✔
2039
                cont.removeIpFromIngressIPList(&ingressIps, ipv4)
×
2040
        }
×
2041

2042
        if clusterIPv6 != nil && ipv6 == nil {
2✔
2043
                if len(requestedIps) < 1 {
2✔
2044
                        ipv6, _ = cont.serviceIps.AllocateIp(false)
1✔
2045
                        if ipv6 != nil {
2✔
2046
                                ingressIps = append(ingressIps, ipv6)
1✔
2047
                        }
1✔
2048
                }
2049
        } else if clusterIPv6 == nil && ipv6 != nil {
1✔
2050
                cont.removeIpFromIngressIPList(&ingressIps, ipv6)
×
2051
        }
×
2052

2053
        if len(requestedIps) < 1 {
2✔
2054
                meta.ingressIps = ingressIps
1✔
2055
        }
1✔
2056
        if ipv4 == nil && ipv6 == nil {
2✔
2057
                logger.Error("No IP addresses available for service")
1✔
2058
                cont.indexMutex.Unlock()
1✔
2059
                return true
1✔
2060
        }
1✔
2061
        cont.indexMutex.Unlock()
1✔
2062
        var newIngress []v1.LoadBalancerIngress
1✔
2063
        for _, ip := range meta.ingressIps {
2✔
2064
                newIngress = append(newIngress, v1.LoadBalancerIngress{IP: ip.String()})
1✔
2065
        }
1✔
2066
        for _, ip := range meta.staticIngressIps {
2✔
2067
                newIngress = append(newIngress, v1.LoadBalancerIngress{IP: ip.String()})
1✔
2068
        }
1✔
2069

2070
        ipUpdated := false
1✔
2071
        if !reflect.DeepEqual(newIngress, service.Status.LoadBalancer.Ingress) {
2✔
2072
                service.Status.LoadBalancer.Ingress = newIngress
1✔
2073

1✔
2074
                logger.WithFields(logrus.Fields{
1✔
2075
                        "status": service.Status.LoadBalancer.Ingress,
1✔
2076
                }).Info("Updating service load balancer status")
1✔
2077

1✔
2078
                ipUpdated = true
1✔
2079
        }
1✔
2080

2081
        success := true
1✔
2082
        reason := "Success"
1✔
2083
        message := ""
1✔
2084
        if len(requestedIps) > 0 && len(requestedIps) != len(meta.staticIngressIps) {
1✔
2085
                success = false
×
2086
                reason = "OneIpNotAllocatable"
×
2087
                message = "One of the requested Ips is not allocatable"
×
2088
        }
×
2089
        condUpdated := cont.updateServiceCondition(service, success, reason, message)
1✔
2090
        if ipUpdated || condUpdated {
2✔
2091
                _, err := cont.updateServiceStatus(service)
1✔
2092
                if err != nil {
1✔
2093
                        logger.Error("Failed to update service status: ", err)
×
2094
                        return true
×
2095
                }
×
2096
        }
2097
        return false
1✔
2098
}
2099

2100
func (cont *AciController) handleServiceDelete(servicekey string) bool {
1✔
2101
        if cont.isCNOEnabled() {
1✔
2102
                return false
×
2103
        }
×
2104
        cont.clearLbService(servicekey)
1✔
2105
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("service-vmm",
1✔
2106
                servicekey))
1✔
2107
        return false
1✔
2108
}
2109

2110
func (cont *AciController) handleServiceUpdate(service *v1.Service) bool {
1✔
2111
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2112
        if err != nil {
1✔
2113
                serviceLogger(cont.log, service).
×
2114
                        Error("Could not create service key: ", err)
×
2115
                return false
×
2116
        }
×
2117
        if cont.isCNOEnabled() {
1✔
2118
                return false
×
2119
        }
×
2120
        var requeue bool
1✔
2121
        isLoadBalancer := service.Spec.Type == v1.ServiceTypeLoadBalancer
1✔
2122
        if isLoadBalancer {
2✔
2123
                if *cont.config.AllocateServiceIps {
2✔
2124
                        requeue = cont.allocateServiceIps(servicekey, service)
1✔
2125
                }
1✔
2126
                // Check if any existing SNAT policy (with no explicit SnatIp)
2127
                // matches this service. This handles the case where the SNAT
2128
                // policy was created before the service existed.
2129
                cont.indexMutex.Lock()
1✔
2130
                if _, exists := cont.snatServices[servicekey]; !exists {
2✔
2131
                        for _, policy := range cont.snatPolicyCache {
1✔
2132
                                if len(policy.SnatIp) == 0 {
×
2133
                                        if len(policy.Selector.Namespace) == 0 ||
×
2134
                                                policy.Selector.Namespace == service.ObjectMeta.Namespace {
×
2135
                                                selector := labels.SelectorFromSet(labels.Set(policy.Selector.Labels))
×
2136
                                                if selector.Matches(labels.Set(service.ObjectMeta.Labels)) {
×
2137
                                                        cont.snatServices[servicekey] = true
×
2138
                                                        break
×
2139
                                                }
2140
                                        }
2141
                                }
2142
                        }
2143
                }
2144
                if cont.serviceSyncEnabled {
2✔
2145
                        cont.indexMutex.Unlock()
1✔
2146
                        err = cont.updateServiceDeviceInstance(servicekey, service)
1✔
2147
                        if err != nil {
1✔
2148
                                serviceLogger(cont.log, service).
×
2149
                                        Error("Failed to update service device Instance: ", err)
×
2150
                                return true
×
2151
                        }
×
2152
                } else {
1✔
2153
                        cont.indexMutex.Unlock()
1✔
2154
                }
1✔
2155
        } else {
1✔
2156
                cont.clearLbService(servicekey)
1✔
2157
        }
1✔
2158
        cont.writeApicSvc(servicekey, service)
1✔
2159
        return requeue
1✔
2160
}
2161

2162
func (cont *AciController) clearLbService(servicekey string) {
1✔
2163
        cont.indexMutex.Lock()
1✔
2164
        if meta, ok := cont.serviceMetaCache[servicekey]; ok {
2✔
2165
                cont.returnServiceIps(meta.ingressIps)
1✔
2166
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
1✔
2167
                delete(cont.serviceMetaCache, servicekey)
1✔
2168
                delete(cont.snatServices, servicekey)
1✔
2169
        }
1✔
2170
        cont.indexMutex.Unlock()
1✔
2171
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("svc", servicekey))
1✔
2172
}
2173

2174
func (cont *AciController) processServiceTargetPorts(service *v1.Service, svcKey string, old bool) map[string]targetPort {
1✔
2175
        ports := make(map[string]targetPort)
1✔
2176
        for _, port := range service.Spec.Ports {
2✔
2177
                var key string
1✔
2178
                portnums := make(map[int]map[string]bool)
1✔
2179

1✔
2180
                if port.TargetPort.Type == intstr.String {
2✔
2181
                        entry, exists := cont.namedPortServiceIndex[svcKey]
1✔
2182
                        if !old {
2✔
2183
                                if !exists {
2✔
2184
                                        cont.log.Debugf("Creating named port index for service: %s, port: %s", svcKey, port.Name)
1✔
2185
                                        newEntry := make(namedPortServiceIndexEntry)
1✔
2186
                                        entry = &newEntry
1✔
2187
                                }
1✔
2188
                                (*entry)[port.Name] = &namedPortServiceIndexPort{
1✔
2189
                                        targetPortName: port.TargetPort.String(),
1✔
2190
                                        resolvedPorts:  make(map[int]bool),
1✔
2191
                                }
1✔
2192
                                cont.namedPortServiceIndex[svcKey] = entry
1✔
2193
                        } else if exists {
2✔
2194
                                delete(*entry, port.Name)
1✔
2195
                                cont.log.Debugf("Removed named port index for service: %s port: %s, entry: %v", svcKey, port.Name, entry)
1✔
2196
                                if len(*entry) == 0 {
2✔
2197
                                        delete(cont.namedPortServiceIndex, svcKey)
1✔
2198
                                } else {
2✔
2199
                                        cont.namedPortServiceIndex[svcKey] = entry
1✔
2200
                                }
1✔
2201
                        }
2202
                        key = portProto(&port.Protocol) + "-name-" + port.TargetPort.String()
1✔
2203
                } else {
1✔
2204
                        portNum := port.TargetPort.IntValue()
1✔
2205
                        if portNum <= 0 {
2✔
2206
                                portNum = int(port.Port)
1✔
2207
                        }
1✔
2208
                        key = portProto(&port.Protocol) + "-num-" + strconv.Itoa(portNum)
1✔
2209
                        portnums[portNum] = map[string]bool{svcKey: true}
1✔
2210
                }
2211

2212
                ports[key] = targetPort{
1✔
2213
                        proto:          port.Protocol,
1✔
2214
                        portServiceMap: portnums,
1✔
2215
                }
1✔
2216
        }
2217
        return ports
1✔
2218
}
2219

2220
func (cont *AciController) serviceAdded(obj interface{}) {
1✔
2221
        service := obj.(*v1.Service)
1✔
2222
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2223
        if err != nil {
1✔
2224
                serviceLogger(cont.log, service).
×
2225
                        Error("Could not create service key: ", err)
×
2226
                return
×
2227
        }
×
2228

2229
        cont.indexMutex.Lock()
1✔
2230
        ports := cont.processServiceTargetPorts(service, servicekey, false)
1✔
2231
        cont.queuePortNetPolUpdates(ports)
1✔
2232
        cont.updateTargetPortIndex(true, servicekey, nil, ports)
1✔
2233
        cont.indexMutex.Unlock()
1✔
2234

1✔
2235
        cont.queueServiceUpdateByKey(servicekey)
1✔
2236
}
2237

2238
func (cont *AciController) serviceUpdated(oldSvc, newSvc interface{}) {
1✔
2239
        oldservice := oldSvc.(*v1.Service)
1✔
2240
        newservice := newSvc.(*v1.Service)
1✔
2241
        servicekey, err := cache.MetaNamespaceKeyFunc(newservice)
1✔
2242
        if err != nil {
1✔
2243
                serviceLogger(cont.log, newservice).
×
2244
                        Error("Could not create service key: ", err)
×
2245
                return
×
2246
        }
×
2247
        if !reflect.DeepEqual(oldservice.Spec.Ports, newservice.Spec.Ports) {
1✔
2248
                cont.indexMutex.Lock()
×
2249
                oldPorts := cont.processServiceTargetPorts(oldservice, servicekey, true)
×
2250
                newPorts := cont.processServiceTargetPorts(newservice, servicekey, false)
×
2251
                cont.queuePortNetPolUpdates(oldPorts)
×
2252
                cont.updateTargetPortIndex(true, servicekey, oldPorts, newPorts)
×
2253
                cont.queuePortNetPolUpdates(newPorts)
×
2254
                cont.indexMutex.Unlock()
×
2255
        }
×
2256
        cont.queueServiceUpdateByKey(servicekey)
1✔
2257
}
2258

2259
func (cont *AciController) serviceDeleted(obj interface{}) {
1✔
2260
        service, isService := obj.(*v1.Service)
1✔
2261
        if !isService {
1✔
2262
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2263
                if !ok {
×
2264
                        serviceLogger(cont.log, service).
×
2265
                                Error("Received unexpected object: ", obj)
×
2266
                        return
×
2267
                }
×
2268
                service, ok = deletedState.Obj.(*v1.Service)
×
2269
                if !ok {
×
2270
                        serviceLogger(cont.log, service).
×
2271
                                Error("DeletedFinalStateUnknown contained non-Services object: ", deletedState.Obj)
×
2272
                        return
×
2273
                }
×
2274
        }
2275
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2276
        if err != nil {
1✔
2277
                serviceLogger(cont.log, service).
×
2278
                        Error("Could not create service key: ", err)
×
2279
                return
×
2280
        }
×
2281

2282
        cont.indexMutex.Lock()
1✔
2283
        ports := cont.processServiceTargetPorts(service, servicekey, true)
1✔
2284
        cont.updateTargetPortIndex(true, servicekey, ports, nil)
1✔
2285
        cont.queuePortNetPolUpdates(ports)
1✔
2286
        cont.indexMutex.Unlock()
1✔
2287

1✔
2288
        deletedServiceKey := "DELETED_" + servicekey
1✔
2289
        cont.queueServiceUpdateByKey(deletedServiceKey)
1✔
2290
}
2291

2292
func (cont *AciController) serviceFullSync() {
1✔
2293
        cache.ListAll(cont.serviceIndexer, labels.Everything(),
1✔
2294
                func(sobj interface{}) {
2✔
2295
                        cont.queueServiceUpdate(sobj.(*v1.Service))
1✔
2296
                })
1✔
2297
}
2298

2299
func (cont *AciController) getEndpointSliceIps(endpointSlice *discovery.EndpointSlice) map[string]bool {
1✔
2300
        ips := make(map[string]bool)
1✔
2301
        for _, endpoints := range endpointSlice.Endpoints {
2✔
2302
                for _, addr := range endpoints.Addresses {
2✔
2303
                        ips[addr] = true
1✔
2304
                }
1✔
2305
        }
2306
        return ips
1✔
2307
}
2308

2309
func (cont *AciController) notReadyEndpointPresent(endpointSlice *discovery.EndpointSlice) bool {
×
2310
        for _, endpoints := range endpointSlice.Endpoints {
×
2311
                if (endpoints.Conditions.Ready != nil && !*endpoints.Conditions.Ready) &&
×
2312
                        (endpoints.Conditions.Terminating == nil || !*endpoints.Conditions.Terminating) {
×
2313
                        return true
×
2314
                }
×
2315
        }
2316
        return false
×
2317
}
2318

2319
func (cont *AciController) getEndpointSliceEpIps(endpoints *discovery.Endpoint) map[string]bool {
×
2320
        ips := make(map[string]bool)
×
2321
        for _, addr := range endpoints.Addresses {
×
2322
                ips[addr] = true
×
2323
        }
×
2324
        return ips
×
2325
}
2326

2327
func (cont *AciController) processDelayedEpSlices() {
1✔
2328
        var processEps []DelayedEpSlice
1✔
2329
        cont.indexMutex.Lock()
1✔
2330
        for i := 0; i < len(cont.delayedEpSlices); i++ {
1✔
2331
                delayedepslice := cont.delayedEpSlices[i]
×
2332
                if time.Now().After(delayedepslice.DelayedTime) {
×
2333
                        var toprocess DelayedEpSlice
×
2334
                        err := util.DeepCopyObj(&delayedepslice, &toprocess)
×
2335
                        if err != nil {
×
2336
                                cont.log.Error(err)
×
2337
                                continue
×
2338
                        }
2339
                        processEps = append(processEps, toprocess)
×
2340
                        cont.delayedEpSlices = append(cont.delayedEpSlices[:i], cont.delayedEpSlices[i+1:]...)
×
2341
                }
2342
        }
2343

2344
        cont.indexMutex.Unlock()
1✔
2345
        for _, epslice := range processEps {
1✔
2346
                //ignore the epslice if newly added endpoint is not ready
×
2347
                if cont.notReadyEndpointPresent(epslice.NewEpSlice) {
×
2348
                        cont.log.Debug("Ignoring the update as the new endpoint is not ready : ", epslice.NewEpSlice)
×
2349
                } else {
×
2350
                        cont.log.Debug("Processing update of epslice : ", epslice.NewEpSlice)
×
2351
                        cont.doendpointSliceUpdated(epslice.OldEpSlice, epslice.NewEpSlice)
×
2352
                }
×
2353
        }
2354
}
2355

2356
func (cont *AciController) updateTargetPortIndexEpSlice(key string, port discovery.EndpointPort, serviceKey string, old bool) {
1✔
2357
        portNum := int(*port.Port)
1✔
2358

1✔
2359
        if old {
2✔
2360
                entry := cont.targetPortIndex[key]
1✔
2361
                if entry != nil {
2✔
2362
                        if svcKeys := entry.portMapping.portServiceMap[portNum]; svcKeys != nil {
2✔
2363
                                delete(svcKeys, serviceKey)
1✔
2364
                                if len(svcKeys) == 0 {
2✔
2365
                                        delete(entry.portMapping.portServiceMap, portNum)
1✔
2366
                                }
1✔
2367
                        }
2368
                        if !entry.hasServiceKeys() && len(entry.networkPolicyKeys) == 0 {
2✔
2369
                                delete(cont.targetPortIndex, key)
1✔
2370
                        }
1✔
2371
                }
2372
        } else {
1✔
2373
                entry := cont.targetPortIndex[key]
1✔
2374
                if entry == nil {
2✔
2375
                        proto := v1.ProtocolTCP
1✔
2376
                        if port.Protocol != nil {
2✔
2377
                                proto = *port.Protocol
1✔
2378
                        }
1✔
2379
                        entry = &portIndexEntry{
1✔
2380
                                portMapping: targetPort{
1✔
2381
                                        proto:          proto,
1✔
2382
                                        portServiceMap: make(map[int]map[string]bool),
1✔
2383
                                },
1✔
2384
                                networkPolicyKeys: make(map[string]bool),
1✔
2385
                        }
1✔
2386
                        cont.targetPortIndex[key] = entry
1✔
2387
                }
2388
                if entry.portMapping.portServiceMap[portNum] == nil {
2✔
2389
                        entry.portMapping.portServiceMap[portNum] = make(map[string]bool)
1✔
2390
                }
1✔
2391
                entry.portMapping.portServiceMap[portNum][serviceKey] = true
1✔
2392
        }
2393
}
2394

2395
// epSlicePortInfo captures per-key metadata needed for diffing old vs new
2396
// endpoint slice port entries in resolveServiceNamedPortFromEpSlice.
2397
type epSlicePortInfo struct {
2398
        port        discovery.EndpointPort
2399
        portNum     int
2400
        svcPortName string // non-empty when the entry tracks a named service port in namedPortServiceIndex
2401
}
2402

2403
// getEpSlicePortKeys computes the set of targetPortIndex keys that an
2404
// endpoint slice would produce, keyed by targetPortIndex key string.
2405
func (cont *AciController) getEpSlicePortKeys(epSlice *discovery.EndpointSlice, serviceKey string, hasIndex bool) map[string]*epSlicePortInfo {
1✔
2406
        if epSlice == nil {
2✔
2407
                return nil
1✔
2408
        }
1✔
2409
        result := make(map[string]*epSlicePortInfo)
1✔
2410

1✔
2411
        var containerPortNames map[int]string
1✔
2412
        for i := range epSlice.Endpoints {
2✔
2413
                ep := &epSlice.Endpoints[i]
1✔
2414
                if ep.TargetRef != nil && ep.TargetRef.Kind == "Pod" {
2✔
2415
                        podObj, exists, err := cont.podIndexer.GetByKey(
1✔
2416
                                ep.TargetRef.Namespace + "/" + ep.TargetRef.Name)
1✔
2417
                        if exists && err == nil {
2✔
2418
                                pod := podObj.(*v1.Pod)
1✔
2419
                                containerPortNames = make(map[int]string)
1✔
2420
                                for ci := range pod.Spec.Containers {
2✔
2421
                                        for _, cp := range pod.Spec.Containers[ci].Ports {
2✔
2422
                                                if cp.Name != "" {
2✔
2423
                                                        containerPortNames[int(cp.ContainerPort)] = cp.Name
1✔
2424
                                                }
1✔
2425
                                        }
2426
                                }
2427
                                break
1✔
2428
                        }
2429
                }
2430
        }
2431

2432
        for _, port := range epSlice.Ports {
2✔
2433
                if port.Port == nil {
1✔
2434
                        continue
×
2435
                }
2436
                portNum := int(*port.Port)
1✔
2437

1✔
2438
                if hasIndex && port.Name != nil {
2✔
2439
                        key := portProto(port.Protocol) + "-num-" + strconv.Itoa(portNum)
1✔
2440
                        result[key] = &epSlicePortInfo{
1✔
2441
                                port:        port,
1✔
2442
                                portNum:     portNum,
1✔
2443
                                svcPortName: *port.Name,
1✔
2444
                        }
1✔
2445
                }
1✔
2446

2447
                if cpName, found := containerPortNames[portNum]; found {
2✔
2448
                        key := portProto(port.Protocol) + "-name-" + cpName
1✔
2449
                        result[key] = &epSlicePortInfo{
1✔
2450
                                port:    port,
1✔
2451
                                portNum: portNum,
1✔
2452
                        }
1✔
2453
                }
1✔
2454
        }
2455
        return result
1✔
2456
}
2457

2458
func (cont *AciController) resolveServiceNamedPortFromEpSlice(oldEpSlice, newEpSlice *discovery.EndpointSlice, serviceKey string) {
1✔
2459
        indexEntry, hasIndex := cont.namedPortServiceIndex[serviceKey]
1✔
2460

1✔
2461
        oldKeys := cont.getEpSlicePortKeys(oldEpSlice, serviceKey, hasIndex)
1✔
2462
        newKeys := cont.getEpSlicePortKeys(newEpSlice, serviceKey, hasIndex)
1✔
2463

1✔
2464
        // Remove entries present in old but not in new
1✔
2465
        for key, info := range oldKeys {
2✔
2466
                if _, ok := newKeys[key]; ok {
1✔
NEW
2467
                        continue
×
2468
                }
2469
                if hasIndex && info.svcPortName != "" {
2✔
2470
                        if portEntry, ok := (*indexEntry)[info.svcPortName]; ok && portEntry != nil {
2✔
2471
                                delete(portEntry.resolvedPorts, info.portNum)
1✔
2472
                                cont.log.Debugf("Deleting port: %d from service %s resolved target port. Resolved ports: %v", info.portNum, serviceKey, portEntry.resolvedPorts)
1✔
2473
                        }
1✔
2474
                }
2475
                cont.updateTargetPortIndexEpSlice(key, info.port, serviceKey, true)
1✔
2476
        }
2477

2478
        // Add entries present in new but not in old
2479
        for key, info := range newKeys {
2✔
2480
                if _, ok := oldKeys[key]; ok {
1✔
NEW
2481
                        continue
×
2482
                }
2483
                if hasIndex && info.svcPortName != "" {
2✔
2484
                        if portEntry, ok := (*indexEntry)[info.svcPortName]; ok && portEntry != nil {
2✔
2485
                                portEntry.resolvedPorts[info.portNum] = true
1✔
2486
                                cont.log.Debugf("Adding port: %d to service %s resolved target port. Resolved ports: %v", info.portNum, serviceKey, portEntry.resolvedPorts)
1✔
2487
                        }
1✔
2488
                }
2489
                cont.updateTargetPortIndexEpSlice(key, info.port, serviceKey, false)
1✔
2490
        }
2491
}
2492

2493
func (cont *AciController) endpointSliceAdded(obj interface{}) {
1✔
2494
        endpointslice, ok := obj.(*discovery.EndpointSlice)
1✔
2495
        if !ok {
1✔
2496
                cont.log.Error("error processing Endpointslice object: ", obj)
×
2497
                return
×
2498
        }
×
2499
        servicekey, valid := getServiceKey(endpointslice)
1✔
2500
        if !valid {
1✔
2501
                return
×
2502
        }
×
2503
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
2504
        cont.indexMutex.Lock()
1✔
2505
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
2506
        cont.resolveServiceNamedPortFromEpSlice(nil, endpointslice, servicekey)
1✔
2507
        cont.queueIPNetPolUpdates(ips)
1✔
2508
        cont.indexMutex.Unlock()
1✔
2509

1✔
2510
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
2511

1✔
2512
        cont.queueServiceUpdateByKey(servicekey)
1✔
2513
        cont.log.Info("EndPointSlice Object Added: ", servicekey)
1✔
2514
}
2515

2516
func (cont *AciController) endpointSliceDeleted(obj interface{}) {
1✔
2517
        endpointslice, isEndpointslice := obj.(*discovery.EndpointSlice)
1✔
2518
        if !isEndpointslice {
1✔
2519
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2520
                if !ok {
×
2521
                        cont.log.Error("Received unexpected object: ", obj)
×
2522
                        return
×
2523
                }
×
2524
                endpointslice, ok = deletedState.Obj.(*discovery.EndpointSlice)
×
2525
                if !ok {
×
2526
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpointslice object: ", deletedState.Obj)
×
2527
                        return
×
2528
                }
×
2529
        }
2530
        servicekey, valid := getServiceKey(endpointslice)
1✔
2531
        if !valid {
1✔
2532
                return
×
2533
        }
×
2534
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
2535
        cont.indexMutex.Lock()
1✔
2536
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
2537
        cont.resolveServiceNamedPortFromEpSlice(endpointslice, nil, servicekey)
1✔
2538
        cont.queueIPNetPolUpdates(ips)
1✔
2539
        cont.indexMutex.Unlock()
1✔
2540
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
2541
        cont.queueServiceUpdateByKey(servicekey)
1✔
2542
}
2543

2544
// Checks if the given service is present in the user configured list of services
2545
// for pbr delay and if present, returns the servie specific delay if configured
2546
func (cont *AciController) svcInAddDelayList(name, ns string) (int, bool) {
×
2547
        for _, svc := range cont.config.ServiceGraphEndpointAddDelay.Services {
×
2548
                if svc.Name == name && svc.Namespace == ns {
×
2549
                        return svc.Delay, true
×
2550
                }
×
2551
        }
2552
        return 0, false
×
2553
}
2554

2555
// Check if the endpointslice update notification has any deletion of enpoint
2556
func (cont *AciController) isDeleteEndpointSlice(oldendpointslice, newendpointslice *discovery.EndpointSlice) bool {
×
2557
        del := false
×
2558

×
2559
        // if any endpoint is removed from endpontslice
×
2560
        if len(newendpointslice.Endpoints) < len(oldendpointslice.Endpoints) {
×
2561
                del = true
×
2562
        }
×
2563

2564
        if !del {
×
2565
                // if any one of the endpoint is in terminating state
×
2566
                for _, endpoint := range newendpointslice.Endpoints {
×
2567
                        if endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating {
×
2568
                                del = true
×
2569
                                break
×
2570
                        }
2571
                }
2572
        }
2573
        if !del {
×
2574
                // if any one of endpoint moved from ready state to not-ready state
×
2575
                for ix := range oldendpointslice.Endpoints {
×
2576
                        oldips := cont.getEndpointSliceEpIps(&oldendpointslice.Endpoints[ix])
×
2577
                        for newIx := range newendpointslice.Endpoints {
×
2578
                                newips := cont.getEndpointSliceEpIps(&newendpointslice.Endpoints[newIx])
×
2579
                                if reflect.DeepEqual(oldips, newips) {
×
2580
                                        if (oldendpointslice.Endpoints[ix].Conditions.Ready != nil && *oldendpointslice.Endpoints[ix].Conditions.Ready) &&
×
2581
                                                (newendpointslice.Endpoints[newIx].Conditions.Ready != nil && !*newendpointslice.Endpoints[newIx].Conditions.Ready) {
×
2582
                                                del = true
×
2583
                                        }
×
2584
                                        break
×
2585
                                }
2586
                        }
2587
                }
2588
        }
2589
        return del
×
2590
}
2591

2592
func (cont *AciController) doendpointSliceUpdatedDelay(oldendpointslice *discovery.EndpointSlice,
2593
        newendpointslice *discovery.EndpointSlice) {
×
2594
        svc, ns, valid := getServiceNameAndNs(newendpointslice)
×
2595
        if !valid {
×
2596
                return
×
2597
        }
×
2598
        svckey, valid := getServiceKey(newendpointslice)
×
2599
        if !valid {
×
2600
                return
×
2601
        }
×
2602
        delay := cont.config.ServiceGraphEndpointAddDelay.Delay
×
2603
        svcDelay, exists := cont.svcInAddDelayList(svc, ns)
×
2604
        if svcDelay > 0 {
×
2605
                delay = svcDelay
×
2606
        }
×
2607
        delayedsvc := exists && delay > 0
×
2608
        if delayedsvc {
×
2609
                cont.log.Debug("Delay of ", delay, " seconds is applicable for svc :", svc, " in ns: ", ns)
×
2610
                var delayedepslice DelayedEpSlice
×
2611
                delayedepslice.OldEpSlice = oldendpointslice
×
2612
                delayedepslice.ServiceKey = svckey
×
2613
                delayedepslice.NewEpSlice = newendpointslice
×
2614
                currentTime := time.Now()
×
2615
                delayedepslice.DelayedTime = currentTime.Add(time.Duration(delay) * time.Second)
×
2616
                cont.indexMutex.Lock()
×
2617
                cont.delayedEpSlices = append(cont.delayedEpSlices, &delayedepslice)
×
2618
                cont.indexMutex.Unlock()
×
2619
        } else {
×
2620
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
2621
        }
×
2622

2623
        if delayedsvc && cont.isDeleteEndpointSlice(oldendpointslice, newendpointslice) {
×
2624
                cont.log.Debug("Proceeding by ignoring delay as the update is due to delete of endpoint")
×
2625
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
2626
        }
×
2627
}
2628

2629
func (cont *AciController) endpointSliceUpdated(oldobj, newobj interface{}) {
1✔
2630
        oldendpointslice, ok := oldobj.(*discovery.EndpointSlice)
1✔
2631
        if !ok {
1✔
2632
                cont.log.Error("error processing Endpointslice object: ", oldobj)
×
2633
                return
×
2634
        }
×
2635
        newendpointslice, ok := newobj.(*discovery.EndpointSlice)
1✔
2636
        if !ok {
1✔
2637
                cont.log.Error("error processing Endpointslice object: ", newobj)
×
2638
                return
×
2639
        }
×
2640
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
2641
                cont.doendpointSliceUpdatedDelay(oldendpointslice, newendpointslice)
×
2642
        } else {
1✔
2643
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
1✔
2644
        }
1✔
2645
}
2646

2647
func (cont *AciController) doendpointSliceUpdated(oldendpointslice *discovery.EndpointSlice,
2648
        newendpointslice *discovery.EndpointSlice) {
1✔
2649
        servicekey, valid := getServiceKey(newendpointslice)
1✔
2650
        if !valid {
1✔
2651
                return
×
2652
        }
×
2653
        oldIps := cont.getEndpointSliceIps(oldendpointslice)
1✔
2654
        newIps := cont.getEndpointSliceIps(newendpointslice)
1✔
2655
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
2656
                cont.indexMutex.Lock()
1✔
2657
                cont.resolveServiceNamedPortFromEpSlice(oldendpointslice, newendpointslice, servicekey)
1✔
2658
                cont.queueIPNetPolUpdates(oldIps)
1✔
2659
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
2660
                cont.queueIPNetPolUpdates(newIps)
1✔
2661
                cont.indexMutex.Unlock()
1✔
2662
        }
1✔
2663

2664
        if !reflect.DeepEqual(oldendpointslice.Endpoints, newendpointslice.Endpoints) {
2✔
2665
                cont.queueEndpointSliceNetPolUpdates(oldendpointslice)
1✔
2666
                cont.queueEndpointSliceNetPolUpdates(newendpointslice)
1✔
2667
        }
1✔
2668
        cont.log.Debug("EndPointSlice Object Update: ", servicekey)
1✔
2669
        cont.queueServiceUpdateByKey(servicekey)
1✔
2670
}
2671

2672
func (cont *AciController) queueEndpointSliceNetPolUpdates(endpointslice *discovery.EndpointSlice) {
1✔
2673
        for _, endpoint := range endpointslice.Endpoints {
2✔
2674
                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" ||
1✔
2675
                        endpoint.TargetRef.Namespace == "" || endpoint.TargetRef.Name == "" {
2✔
2676
                        continue
1✔
2677
                }
2678
                if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
1✔
2679
                        continue
×
2680
                }
2681
                podkey := endpoint.TargetRef.Namespace + "/" + endpoint.TargetRef.Name
1✔
2682
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
2683
                ps := make(map[string]bool)
1✔
2684
                for _, npkey := range npkeys {
1✔
UNCOV
2685
                        cont.queueNetPolUpdateByKey(npkey)
×
UNCOV
2686
                }
×
2687
                // Process if the  any matching namedport wildcard policy is present
2688
                // ignore np already processed policies
2689
                cont.queueMatchingNamedNp(ps, podkey)
1✔
2690
        }
2691
}
2692

2693
func getServiceKey(endPointSlice *discovery.EndpointSlice) (string, bool) {
1✔
2694
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
1✔
2695
        if !ok {
1✔
2696
                return "", false
×
2697
        }
×
2698
        return endPointSlice.ObjectMeta.Namespace + "/" + serviceName, true
1✔
2699
}
2700

2701
func getServiceNameAndNs(endPointSlice *discovery.EndpointSlice) (string, string, bool) {
×
2702
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
×
2703
        if !ok {
×
2704
                return "", "", false
×
2705
        }
×
2706
        return serviceName, endPointSlice.ObjectMeta.Namespace, true
×
2707
}
2708

2709
func (seps *serviceEndpointSlice) UpdateServicesForNode(nodename string) {
1✔
2710
        // 1. List all the endpointslice and check for matching nodename
1✔
2711
        // 2. if it matches trigger the Service update and mark it visited
1✔
2712
        cont := seps.cont
1✔
2713
        visited := make(map[string]bool)
1✔
2714
        cache.ListAll(cont.endpointSliceIndexer, labels.Everything(),
1✔
2715
                func(endpointSliceobj interface{}) {
2✔
2716
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2717
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2718
                                if endpoint.NodeName != nil && *endpoint.NodeName == nodename {
2✔
2719
                                        servicekey, valid := getServiceKey(endpointSlices)
1✔
2720
                                        if !valid {
1✔
2721
                                                return
×
2722
                                        }
×
2723
                                        if _, ok := visited[servicekey]; !ok {
2✔
2724
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
2725
                                                visited[servicekey] = true
1✔
2726
                                                return
1✔
2727
                                        }
1✔
2728
                                }
2729
                        }
2730
                })
2731
}
2732
func (cont *AciController) setNodeMap(nodeMap map[string]*metadata.ServiceEndpoint, nodeName string) {
1✔
2733
        nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
2734
        if !ok {
1✔
2735
                return
×
2736
        }
×
2737
        _, ok = cont.fabricPathForNode(nodeName)
1✔
2738
        if !ok {
2✔
2739
                return
1✔
2740
        }
1✔
2741
        nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
2742
}
2743

2744
// 2 cases when epslices corresponding to given service is presnt in delayedEpSlices:
2745
//  1. endpoint not present in delayedEpSlices of the service
2746
//  2. endpoint present in delayedEpSlices of the service but in not ready state
2747
//
2748
// indexMutex lock must be acquired before calling the function
2749
func (cont *AciController) isDelayedEndpoint(endpoint *discovery.Endpoint, svckey string) bool {
×
2750
        delayed := false
×
2751
        endpointips := cont.getEndpointSliceEpIps(endpoint)
×
2752
        for _, delayedepslices := range cont.delayedEpSlices {
×
2753
                if delayedepslices.ServiceKey == svckey {
×
2754
                        var found bool
×
2755
                        epslice := delayedepslices.OldEpSlice
×
2756
                        for ix := range epslice.Endpoints {
×
2757
                                epips := cont.getEndpointSliceEpIps(&epslice.Endpoints[ix])
×
2758
                                if reflect.DeepEqual(endpointips, epips) {
×
2759
                                        // case 2
×
2760
                                        if epslice.Endpoints[ix].Conditions.Ready != nil && !*epslice.Endpoints[ix].Conditions.Ready {
×
2761
                                                delayed = true
×
2762
                                        }
×
2763
                                        found = true
×
2764
                                }
2765
                        }
2766
                        // case 1
2767
                        if !found {
×
2768
                                delayed = true
×
2769
                        }
×
2770
                }
2771
        }
2772
        return delayed
×
2773
}
2774

2775
// set nodemap only if endoint is ready and not in delayedEpSlices
2776
func (cont *AciController) setNodeMapDelay(nodeMap map[string]*metadata.ServiceEndpoint,
2777
        endpoint *discovery.Endpoint, service *v1.Service) {
×
2778
        svckey, err := cache.MetaNamespaceKeyFunc(service)
×
2779
        if err != nil {
×
2780
                cont.log.Error("Could not create service key: ", err)
×
2781
                return
×
2782
        }
×
2783
        if cont.config.NoWaitForServiceEpReadiness ||
×
2784
                (endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready) {
×
2785
                if endpoint.NodeName != nil && *endpoint.NodeName != "" {
×
2786
                        // donot setNodeMap for endpoint if:
×
2787
                        //   endpoint is newly added
×
2788
                        //   endpoint status changed from not ready to ready
×
2789
                        if !cont.isDelayedEndpoint(endpoint, svckey) {
×
2790
                                cont.setNodeMap(nodeMap, *endpoint.NodeName)
×
2791
                        }
×
2792
                }
2793
        }
2794
}
2795

2796
func (seps *serviceEndpointSlice) GetnodesMetadata(key string,
2797
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
2798
        cont := seps.cont
1✔
2799
        // 1. Get all the Endpoint slices matching the label service-name
1✔
2800
        // 2. update the node map matching with endpoints nodes name
1✔
2801
        label := map[string]string{discovery.LabelServiceName: service.ObjectMeta.Name}
1✔
2802
        selector := labels.SelectorFromSet(label)
1✔
2803
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2804
                func(endpointSliceobj interface{}) {
2✔
2805
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2806
                        for ix := range endpointSlices.Endpoints {
2✔
2807
                                if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
2808
                                        cont.setNodeMapDelay(nodeMap, &endpointSlices.Endpoints[ix], service)
×
2809
                                } else if cont.config.NoWaitForServiceEpReadiness ||
1✔
2810
                                        (endpointSlices.Endpoints[ix].Conditions.Ready != nil && *endpointSlices.Endpoints[ix].Conditions.Ready) {
2✔
2811
                                        if endpointSlices.Endpoints[ix].NodeName != nil && *endpointSlices.Endpoints[ix].NodeName != "" {
2✔
2812
                                                cont.setNodeMap(nodeMap, *endpointSlices.Endpoints[ix].NodeName)
1✔
2813
                                        }
1✔
2814
                                }
2815
                        }
2816
                })
2817
        cont.log.Debug("NodeMap: ", nodeMap)
1✔
2818
}
2819

2820
func (seps *serviceEndpointSlice) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2821
        cont := seps.cont
1✔
2822
        label := map[string]string{discovery.LabelServiceName: service.ObjectMeta.Name}
1✔
2823
        selector := labels.SelectorFromSet(label)
1✔
2824
        epcount := 0
1✔
2825
        childs := make(map[string]struct{})
1✔
2826
        var exists = struct{}{}
1✔
2827
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2828
                func(endpointSliceobj interface{}) {
2✔
2829
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2830
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2831
                                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" {
1✔
2832
                                        continue
×
2833
                                }
2834
                                epcount++
1✔
2835
                                childs[endpoint.TargetRef.Name] = exists
1✔
2836
                                cont.log.Debug("EndPoint added: ", endpoint.TargetRef.Name)
1✔
2837
                        }
2838
                })
2839
        for child := range childs {
2✔
2840
                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(), child))
1✔
2841
        }
1✔
2842
        return epcount != 0
1✔
2843
}
2844

2845
func getProtocolStr(proto v1.Protocol) string {
1✔
2846
        var protostring string
1✔
2847
        switch proto {
1✔
2848
        case v1.ProtocolUDP:
1✔
2849
                protostring = "udp"
1✔
2850
        case v1.ProtocolTCP:
1✔
2851
                protostring = "tcp"
1✔
2852
        case v1.ProtocolSCTP:
×
2853
                protostring = "sctp"
×
2854
        default:
×
2855
                protostring = "tcp"
×
2856
        }
2857
        return protostring
1✔
2858
}
2859

2860
func (cont *AciController) removeIpFromIngressIPList(ingressIps *[]net.IP, ip net.IP) {
×
2861
        cont.returnServiceIps([]net.IP{ip})
×
2862
        index := -1
×
2863
        for i, v := range *ingressIps {
×
2864
                if v.Equal(ip) {
×
2865
                        index = i
×
2866
                        break
×
2867
                }
2868
        }
2869
        if index == -1 {
×
2870
                return
×
2871
        }
×
2872
        *ingressIps = append((*ingressIps)[:index], (*ingressIps)[index+1:]...)
×
2873
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc