• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 11557

28 Jan 2026 10:15AM UTC coverage: 62.791% (-0.3%) from 63.113%
11557

Pull #1658

travis-pro

jeffinkottaram
Add named port support for NetworkPolicy service augmentation

The controller previously did not handle named target ports when
applying NetworkPolicy egress rules with service augmentation. This
caused egress policies to fail for services using named ports, as
the controller could not match named ports specified in NetworkPolicy
rules against actual service endpoints.

This commit adds tracking and resolution of named target ports and
enforces stricter all-or-nothing port coverage semantics for service
augmentation.

Key changes:
- Add namedPortServiceIndex for empty egress 'To' rules (no peer
  selectors) to track and resolve services with named target ports
- For rules with peer selectors, resolve named ports directly from
  EndpointSlices during NetworkPolicy port processing
- Change targetPort.ports from []int to map[int]bool to efficiently
  verify all endpoint IPs are covered by allowed ports
- Enforce exclusion of services with incomplete endpoint coverage
  (previously not properly enforced)
- Fix ruleCounter to generate unique HPP rule names when multiple
  named ports exist in same rule (prevents duplicate identifiers)
- Support port range specifications that involve named ports

Service augmentation now correctly handles named ports and ensures
services are only augmented when all endpoint ports are covered.
Pull Request #1658: Add named port support for NetworkPolicy service augmentation

125 of 265 new or added lines in 3 files covered. (47.17%)

6 existing lines in 3 files now uncovered.

13414 of 21363 relevant lines covered (62.79%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.53
/pkg/controller/services.go
1
// Copyright 2016 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "errors"
19
        "fmt"
20
        "net"
21
        "reflect"
22
        "regexp"
23
        "sort"
24
        "strconv"
25
        "strings"
26
        "time"
27

28
        "github.com/noironetworks/aci-containers/pkg/apicapi"
29
        "github.com/noironetworks/aci-containers/pkg/metadata"
30
        "github.com/noironetworks/aci-containers/pkg/util"
31
        "github.com/sirupsen/logrus"
32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35
        "k8s.io/apimachinery/pkg/fields"
36
        "k8s.io/apimachinery/pkg/labels"
37
        "k8s.io/apimachinery/pkg/util/intstr"
38
        "k8s.io/client-go/kubernetes"
39
        "k8s.io/client-go/tools/cache"
40
)
41

42
// Default service contract scope value
43
const DefaultServiceContractScope = "context"
44

45
// Default service ext subnet scope - enable shared security
46
const DefaultServiceExtSubNetShared = false
47

48
func (cont *AciController) initEndpointsInformerFromClient(
49
        kubeClient kubernetes.Interface) {
×
50
        cont.initEndpointsInformerBase(
×
51
                cache.NewListWatchFromClient(
×
52
                        kubeClient.CoreV1().RESTClient(), "endpoints",
×
53
                        metav1.NamespaceAll, fields.Everything()))
×
54
}
×
55

56
func (cont *AciController) initEndpointSliceInformerFromClient(
57
        kubeClient kubernetes.Interface) {
×
58
        cont.initEndpointSliceInformerBase(
×
59
                cache.NewListWatchFromClient(
×
60
                        kubeClient.DiscoveryV1().RESTClient(), "endpointslices",
×
61
                        metav1.NamespaceAll, fields.Everything()))
×
62
}
×
63

64
func (cont *AciController) initEndpointSliceInformerBase(listWatch *cache.ListWatch) {
1✔
65
        cont.endpointSliceIndexer, cont.endpointSliceInformer = cache.NewIndexerInformer(
1✔
66
                listWatch, &discovery.EndpointSlice{}, 0,
1✔
67
                cache.ResourceEventHandlerFuncs{
1✔
68
                        AddFunc: func(obj interface{}) {
2✔
69
                                cont.endpointSliceAdded(obj)
1✔
70
                        },
1✔
71
                        UpdateFunc: func(oldobj interface{}, newobj interface{}) {
1✔
72
                                cont.endpointSliceUpdated(oldobj, newobj)
1✔
73
                        },
1✔
74
                        DeleteFunc: func(obj interface{}) {
1✔
75
                                cont.endpointSliceDeleted(obj)
1✔
76
                        },
1✔
77
                },
78
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
79
        )
80
}
81

82
func (cont *AciController) initEndpointsInformerBase(listWatch *cache.ListWatch) {
1✔
83
        cont.endpointsIndexer, cont.endpointsInformer = cache.NewIndexerInformer(
1✔
84
                listWatch, &v1.Endpoints{}, 0,
1✔
85
                cache.ResourceEventHandlerFuncs{
1✔
86
                        AddFunc: func(obj interface{}) {
2✔
87
                                cont.endpointsAdded(obj)
1✔
88
                        },
1✔
89
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
90
                                cont.endpointsUpdated(old, new)
1✔
91
                        },
1✔
92
                        DeleteFunc: func(obj interface{}) {
1✔
93
                                cont.endpointsDeleted(obj)
1✔
94
                        },
1✔
95
                },
96
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
97
        )
98
}
99

100
func (cont *AciController) initServiceInformerFromClient(
101
        kubeClient *kubernetes.Clientset) {
×
102
        cont.initServiceInformerBase(
×
103
                cache.NewListWatchFromClient(
×
104
                        kubeClient.CoreV1().RESTClient(), "services",
×
105
                        metav1.NamespaceAll, fields.Everything()))
×
106
}
×
107

108
func (cont *AciController) initServiceInformerBase(listWatch *cache.ListWatch) {
1✔
109
        cont.serviceIndexer, cont.serviceInformer = cache.NewIndexerInformer(
1✔
110
                listWatch, &v1.Service{}, 0,
1✔
111
                cache.ResourceEventHandlerFuncs{
1✔
112
                        AddFunc: func(obj interface{}) {
2✔
113
                                cont.serviceAdded(obj)
1✔
114
                        },
1✔
115
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
116
                                cont.serviceUpdated(old, new)
1✔
117
                        },
1✔
118
                        DeleteFunc: func(obj interface{}) {
×
119
                                cont.serviceDeleted(obj)
×
120
                        },
×
121
                },
122
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
123
        )
124
}
125

126
func serviceLogger(log *logrus.Logger, as *v1.Service) *logrus.Entry {
1✔
127
        return log.WithFields(logrus.Fields{
1✔
128
                "namespace": as.ObjectMeta.Namespace,
1✔
129
                "name":      as.ObjectMeta.Name,
1✔
130
                "type":      as.Spec.Type,
1✔
131
        })
1✔
132
}
1✔
133

134
func (cont *AciController) queueIPNetPolUpdates(ips map[string]bool) {
1✔
135
        for ipStr := range ips {
2✔
136
                ip := net.ParseIP(ipStr)
1✔
137
                if ip == nil {
1✔
138
                        continue
×
139
                }
140
                entries, err := cont.netPolSubnetIndex.ContainingNetworks(ip)
1✔
141
                if err != nil {
1✔
142
                        cont.log.Error("Corrupted network policy IP index, err: ", err)
×
143
                        return
×
144
                }
×
145
                for _, entry := range entries {
2✔
146
                        for npkey := range entry.(*ipIndexEntry).keys {
2✔
147
                                cont.queueNetPolUpdateByKey(npkey)
1✔
148
                        }
1✔
149
                }
150
        }
151
}
152

153
func (cont *AciController) queuePortNetPolUpdates(ports map[string]targetPort) {
1✔
154
        for portkey := range ports {
2✔
155
                entry := cont.targetPortIndex[portkey]
1✔
156
                if entry == nil {
2✔
157
                        continue
1✔
158
                }
159
                for npkey := range entry.networkPolicyKeys {
2✔
160
                        cont.queueNetPolUpdateByKey(npkey)
1✔
161
                }
1✔
162
        }
163
}
164

165
func (cont *AciController) queueNetPolForEpAddrs(addrs []v1.EndpointAddress) {
1✔
166
        for _, addr := range addrs {
2✔
167
                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" ||
1✔
168
                        addr.TargetRef.Namespace == "" || addr.TargetRef.Name == "" {
2✔
169
                        continue
1✔
170
                }
171
                podkey := addr.TargetRef.Namespace + "/" + addr.TargetRef.Name
1✔
172
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
173
                ps := make(map[string]bool)
1✔
174
                for _, npkey := range npkeys {
2✔
175
                        cont.queueNetPolUpdateByKey(npkey)
1✔
176
                        ps[npkey] = true
1✔
177
                }
1✔
178
                // Process if the  any matching namedport wildcard policy is present
179
                // ignore np already processed policies
180
                cont.queueMatchingNamedNp(ps, podkey)
1✔
181
        }
182
}
183

184
func (cont *AciController) queueMatchingNamedNp(served map[string]bool, podkey string) {
1✔
185
        cont.indexMutex.Lock()
1✔
186
        for npkey := range cont.nmPortNp {
2✔
187
                if _, ok := served[npkey]; !ok {
2✔
188
                        if cont.checkPodNmpMatchesNp(npkey, podkey) {
2✔
189
                                cont.queueNetPolUpdateByKey(npkey)
1✔
190
                        }
1✔
191
                }
192
        }
193
        cont.indexMutex.Unlock()
1✔
194
}
195
func (cont *AciController) queueEndpointsNetPolUpdates(endpoints *v1.Endpoints) {
1✔
196
        for _, subset := range endpoints.Subsets {
2✔
197
                cont.queueNetPolForEpAddrs(subset.Addresses)
1✔
198
                cont.queueNetPolForEpAddrs(subset.NotReadyAddresses)
1✔
199
        }
1✔
200
}
201

202
func (cont *AciController) returnServiceIps(ips []net.IP) {
1✔
203
        for _, ip := range ips {
2✔
204
                if ip.To4() != nil {
2✔
205
                        cont.serviceIps.DeallocateIp(ip)
1✔
206
                } else if ip.To16() != nil {
3✔
207
                        cont.serviceIps.DeallocateIp(ip)
1✔
208
                }
1✔
209
        }
210
}
211

212
func returnIps(pool *netIps, ips []net.IP) {
1✔
213
        for _, ip := range ips {
2✔
214
                if ip.To4() != nil {
2✔
215
                        pool.V4.AddIp(ip)
1✔
216
                } else if ip.To16() != nil {
3✔
217
                        pool.V6.AddIp(ip)
1✔
218
                }
1✔
219
        }
220
}
221

222
func (cont *AciController) staticMonPolName() string {
1✔
223
        return cont.aciNameForKey("monPol", cont.env.ServiceBd())
1✔
224
}
1✔
225

226
func (cont *AciController) staticMonPolDn() string {
1✔
227
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
228
                return fmt.Sprintf("uni/tn-%s/ipslaMonitoringPol-%s",
1✔
229
                        cont.config.AciVrfTenant, cont.staticMonPolName())
1✔
230
        }
1✔
231
        return ""
×
232
}
233

234
func (cont *AciController) staticServiceObjs() apicapi.ApicSlice {
1✔
235
        var serviceObjs apicapi.ApicSlice
1✔
236

1✔
237
        // Service bridge domain
1✔
238
        bdName := cont.aciNameForKey("bd", cont.env.ServiceBd())
1✔
239
        bd := apicapi.NewFvBD(cont.config.AciVrfTenant, bdName)
1✔
240
        if apicapi.ApicVersion >= "6.0(4c)" {
1✔
241
                bd.SetAttr("serviceBdRoutingDisable", "yes")
×
242
        }
×
243
        bd.SetAttr("arpFlood", "yes")
1✔
244
        bd.SetAttr("ipLearning", "no")
1✔
245
        bd.SetAttr("unkMacUcastAct", cont.config.UnknownMacUnicastAction)
1✔
246
        bdToOut := apicapi.NewRsBdToOut(bd.GetDn(), cont.config.AciL3Out)
1✔
247
        bd.AddChild(bdToOut)
1✔
248
        bdToVrf := apicapi.NewRsCtx(bd.GetDn(), cont.config.AciVrf)
1✔
249
        bd.AddChild(bdToVrf)
1✔
250

1✔
251
        bdn := bd.GetDn()
1✔
252
        for _, cidr := range cont.config.NodeServiceSubnets {
2✔
253
                sn := apicapi.NewFvSubnet(bdn, cidr)
1✔
254
                bd.AddChild(sn)
1✔
255
        }
1✔
256
        serviceObjs = append(serviceObjs, bd)
1✔
257

1✔
258
        // Service IP SLA monitoring policy
1✔
259
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
260
                monPol := apicapi.NewFvIPSLAMonitoringPol(cont.config.AciVrfTenant,
1✔
261
                        cont.staticMonPolName())
1✔
262
                monPol.SetAttr("slaFrequency",
1✔
263
                        strconv.Itoa(cont.config.AciServiceMonitorInterval))
1✔
264
                serviceObjs = append(serviceObjs, monPol)
1✔
265
        }
1✔
266

267
        return serviceObjs
1✔
268
}
269

270
func (cont *AciController) initStaticServiceObjs() {
1✔
271
        cont.apicConn.WriteApicObjects(cont.config.AciPrefix+"_service_static",
1✔
272
                cont.staticServiceObjs())
1✔
273
}
1✔
274

275
func (cont *AciController) updateServicesForNode(nodename string) {
1✔
276
        cont.serviceEndPoints.UpdateServicesForNode(nodename)
1✔
277
}
1✔
278

279
// must have index lock
280
func (cont *AciController) getActiveFabricPathDn(node string) string {
×
281
        var fabricPathDn string
×
282
        sz := len(cont.nodeOpflexDevice[node])
×
283
        for i := range cont.nodeOpflexDevice[node] {
×
284
                device := cont.nodeOpflexDevice[node][sz-1-i]
×
285
                if device.GetAttrStr("state") == "connected" {
×
286
                        fabricPathDn = device.GetAttrStr("fabricPathDn")
×
287
                        break
×
288
                }
289
        }
290
        return fabricPathDn
×
291
}
292

293
func (cont *AciController) getOpflexOdevCount(node string) (int, string) {
×
294
        fabricPathDnCount := make(map[string]int)
×
295
        var maxCount int
×
296
        var fabricPathDn string
×
297
        for _, device := range cont.nodeOpflexDevice[node] {
×
298
                fabricpathdn := device.GetAttrStr("fabricPathDn")
×
299
                count, ok := fabricPathDnCount[fabricpathdn]
×
300
                if ok {
×
301
                        fabricPathDnCount[fabricpathdn] = count + 1
×
302
                } else {
×
303
                        fabricPathDnCount[fabricpathdn] = 1
×
304
                }
×
305
                if fabricPathDnCount[fabricpathdn] >= maxCount {
×
306
                        maxCount = fabricPathDnCount[fabricpathdn]
×
307
                        fabricPathDn = fabricpathdn
×
308
                }
×
309
        }
310
        return maxCount, fabricPathDn
×
311
}
312

313
func deleteDevicesFromList(delDevices, devices apicapi.ApicSlice) apicapi.ApicSlice {
×
314
        var newDevices apicapi.ApicSlice
×
315
        for _, device := range devices {
×
316
                found := false
×
317
                for _, delDev := range delDevices {
×
318
                        if reflect.DeepEqual(delDev, device) {
×
319
                                found = true
×
320
                        }
×
321
                }
322
                if !found {
×
323
                        newDevices = append(newDevices, device)
×
324
                }
×
325
        }
326
        return newDevices
×
327
}
328

329
func (cont *AciController) getAciPodSubnet(pod string) (string, error) {
×
330
        podslice := strings.Split(pod, "-")
×
331
        if len(podslice) < 2 {
×
332
                return "", fmt.Errorf("Failed to get podid from pod")
×
333
        }
×
334
        podid := podslice[1]
×
335
        var subnet string
×
336
        args := []string{
×
337
                "query-target=self",
×
338
        }
×
339
        url := fmt.Sprintf("/api/node/mo/uni/controller/setuppol/setupp-%s.json?%s", podid, strings.Join(args, "&"))
×
340
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
341
        if err != nil {
×
342
                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
343
                return subnet, err
×
344
        }
×
345
        for _, obj := range apicresp.Imdata {
×
346
                for _, body := range obj {
×
347
                        tepPool, ok := body.Attributes["tepPool"].(string)
×
348
                        if ok {
×
349
                                subnet = tepPool
×
350
                                break
×
351
                        }
352
                }
353
        }
354
        return subnet, nil
×
355
}
356

357
func (cont *AciController) isSingleOpflexOdev(fabricPathDn string) (bool, error) {
×
358
        pathSlice := strings.Split(fabricPathDn, "/")
×
359
        if len(pathSlice) > 2 {
×
360
                path := pathSlice[2]
×
361
                // topology/<aci_pod_name>/paths-<id>/pathep-<iface> - fabricPathDn if
×
362
                // host is connnected via single link
×
363
                // topology/<aci_pod_name>/protpaths-<id_1>-<id_2>/pathep-<iface> - fabricPathDn if
×
364
                // host is connected to vpc pair
×
365
                if strings.Contains(path, "protpaths-") {
×
366
                        args := []string{
×
367
                                "query-target=self",
×
368
                        }
×
369
                        url := fmt.Sprintf("/api/node/class/vpcDom.json?%s", strings.Join(args, "&"))
×
370
                        apicresp, err := cont.apicConn.GetApicResponse(url)
×
371
                        if err != nil {
×
372
                                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
373
                                return false, err
×
374
                        }
×
375
                        nodeIdSlice := strings.Split(path, "-")
×
376
                        if len(nodeIdSlice) != 3 {
×
377
                                cont.log.Error("Invalid fabricPathDn ", fabricPathDn)
×
378
                                return false, fmt.Errorf("Invalid path in fabricPathDn %s", fabricPathDn)
×
379
                        }
×
380
                        // As host is connected to vpc pair, check if the status of any of the vpcDom is down
381
                        upCount := 0
×
382
                        for i, nodeId := range nodeIdSlice {
×
383
                                instDn := fmt.Sprintf("topology/%s/node-%s/sys/vpc/inst", pathSlice[1], nodeId)
×
384
                                if i == 0 {
×
385
                                        continue
×
386
                                }
387
                                for _, obj := range apicresp.Imdata {
×
388
                                        for _, body := range obj {
×
389
                                                dn, ok := body.Attributes["dn"].(string)
×
390
                                                if ok && strings.Contains(dn, instDn) {
×
391
                                                        peerSt, ok := body.Attributes["peerSt"].(string)
×
392
                                                        if ok && peerSt == "up" {
×
393
                                                                upCount++
×
394
                                                        }
×
395
                                                }
396
                                        }
397
                                }
398
                        }
399
                        if upCount < 2 {
×
400
                                return true, nil
×
401
                        }
×
402
                        return false, nil
×
403
                } else {
×
404
                        return true, nil
×
405
                }
×
406
        }
407
        return false, fmt.Errorf("Invalid fabricPathDn %s ", fabricPathDn)
×
408
}
409

410
func (cont *AciController) createAciPodAnnotation(node string) (aciPodAnnot, error) {
×
411
        odevCount, fabricPathDn := cont.getOpflexOdevCount(node)
×
412
        nodeAciPodAnnot := cont.nodeACIPod[node]
×
413
        isSingleOdev := false
×
414
        if odevCount == 1 {
×
415
                var err error
×
416
                isSingleOdev, err = cont.isSingleOpflexOdev(fabricPathDn)
×
417
                if err != nil {
×
418
                        cont.log.Error(err)
×
419
                        return nodeAciPodAnnot, err
×
420
                }
×
421
        }
422
        if (odevCount == 0) ||
×
423
                (odevCount == 1 && !isSingleOdev) {
×
424
                if nodeAciPodAnnot.aciPod != "none" {
×
425
                        if nodeAciPodAnnot.disconnectTime.IsZero() {
×
426
                                nodeAciPodAnnot.disconnectTime = time.Now()
×
427
                        } else {
×
428
                                currentTime := time.Now()
×
429
                                diff := currentTime.Sub(nodeAciPodAnnot.disconnectTime)
×
430
                                if diff.Seconds() > float64(cont.config.OpflexDeviceReconnectWaitTimeout) {
×
431
                                        nodeAciPodAnnot.aciPod = "none"
×
432
                                        nodeAciPodAnnot.disconnectTime = time.Time{}
×
433
                                }
×
434
                        }
435
                }
436
                return nodeAciPodAnnot, nil
×
437
        } else if (odevCount == 2) ||
×
438
                (odevCount == 1 && isSingleOdev) {
×
439
                // when there is already a connected opflex device,
×
440
                // fabricPathDn will have latest pod iformation
×
441
                // and annotation will be in the form pod-<podid>-<subnet of pod>
×
442
                nodeAciPodAnnot.disconnectTime = time.Time{}
×
443
                nodeAciPod := nodeAciPodAnnot.aciPod
×
444
                pathSlice := strings.Split(fabricPathDn, "/")
×
445
                if len(pathSlice) > 1 {
×
446
                        pod := pathSlice[1]
×
447

×
448
                        // when there is difference in pod info avaliable from fabricPathDn
×
449
                        // and what we have in cache, update info in cache and change annotation on node
×
450
                        if !strings.Contains(nodeAciPod, pod) {
×
451
                                subnet, err := cont.getAciPodSubnet(pod)
×
452
                                if err != nil {
×
453
                                        cont.log.Error("Failed to get subnet of aci pod ", err.Error())
×
454
                                        return nodeAciPodAnnot, err
×
455
                                } else {
×
456
                                        nodeAciPodAnnot.aciPod = pod + "-" + subnet
×
457
                                        return nodeAciPodAnnot, nil
×
458
                                }
×
459
                        } else {
×
460
                                return nodeAciPodAnnot, nil
×
461
                        }
×
462
                } else {
×
463
                        cont.log.Error("Invalid fabricPathDn of opflexOdev of node ", node)
×
464
                        return nodeAciPodAnnot, fmt.Errorf("Invalid fabricPathDn of opflexOdev")
×
465
                }
×
466
        }
467
        return nodeAciPodAnnot, fmt.Errorf("Failed to get annotation for node %s", node)
×
468
}
469

470
func (cont *AciController) createNodeAciPodAnnotation(node string) (aciPodAnnot, error) {
×
471
        odevCount, fabricPathDn := cont.getOpflexOdevCount(node)
×
472
        nodeAciPodAnnot := cont.nodeACIPodAnnot[node]
×
473
        isSingleOdev := false
×
474
        if odevCount == 1 {
×
475
                var err error
×
476
                isSingleOdev, err = cont.isSingleOpflexOdev(fabricPathDn)
×
477
                if err != nil {
×
478
                        cont.log.Error(err)
×
479
                        return nodeAciPodAnnot, err
×
480
                }
×
481
        }
482
        if (odevCount == 0) ||
×
483
                (odevCount == 1 && !isSingleOdev) {
×
484
                if nodeAciPodAnnot.aciPod != "none" {
×
485
                        nodeAciPodAnnot.aciPod = "none"
×
486
                }
×
487
                return nodeAciPodAnnot, nil
×
488
        } else if (odevCount == 2) ||
×
489
                (odevCount == 1 && isSingleOdev) {
×
490
                pathSlice := strings.Split(fabricPathDn, "/")
×
491
                if len(pathSlice) > 1 {
×
492

×
493
                        nodeAciPodAnnot.aciPod = pathSlice[1]
×
494
                        return nodeAciPodAnnot, nil
×
495
                } else {
×
496
                        cont.log.Error("Invalid fabricPathDn of opflexOdev of node ", node)
×
497
                        return nodeAciPodAnnot, fmt.Errorf("Invalid fabricPathDn of opflexOdev")
×
498
                }
×
499
        }
500
        return nodeAciPodAnnot, fmt.Errorf("Failed to get annotation for node %s", node)
×
501
}
502

503
func (cont *AciController) checkChangeOfOpflexOdevAciPod() {
×
504
        var nodeAnnotationUpdates []string
×
505
        cont.apicConn.SyncMutex.Lock()
×
506
        syncDone := cont.apicConn.SyncDone
×
507
        cont.apicConn.SyncMutex.Unlock()
×
508

×
509
        if !syncDone {
×
510
                return
×
511
        }
×
512

513
        cont.indexMutex.Lock()
×
514
        for node := range cont.nodeACIPodAnnot {
×
515
                annot, err := cont.createNodeAciPodAnnotation(node)
×
516
                if err != nil {
×
517
                        if strings.Contains(fmt.Sprint(err), "Failed to get annotation") {
×
518
                                now := time.Now()
×
519
                                if annot.lastErrorTime.IsZero() || now.Sub(annot.lastErrorTime).Seconds() >= 60 {
×
520
                                        annot.lastErrorTime = now
×
521
                                        cont.nodeACIPodAnnot[node] = annot
×
522
                                        cont.log.Error(err.Error())
×
523
                                }
×
524
                        } else {
×
525
                                cont.log.Error(err.Error())
×
526
                        }
×
527
                } else {
×
528
                        if annot != cont.nodeACIPodAnnot[node] {
×
529
                                cont.nodeACIPodAnnot[node] = annot
×
530
                                nodeAnnotationUpdates = append(nodeAnnotationUpdates, node)
×
531
                        }
×
532
                }
533
        }
534
        cont.indexMutex.Unlock()
×
535
        if len(nodeAnnotationUpdates) > 0 {
×
536
                for _, updatednode := range nodeAnnotationUpdates {
×
537
                        go cont.env.NodeAnnotationChanged(updatednode)
×
538
                }
×
539
        }
540
}
541

542
func (cont *AciController) checkChangeOfOdevAciPod() {
×
543
        var nodeAnnotationUpdates []string
×
544
        cont.apicConn.SyncMutex.Lock()
×
545
        syncDone := cont.apicConn.SyncDone
×
546
        cont.apicConn.SyncMutex.Unlock()
×
547

×
548
        if !syncDone {
×
549
                return
×
550
        }
×
551

552
        cont.indexMutex.Lock()
×
553
        for node := range cont.nodeACIPod {
×
554
                annot, err := cont.createAciPodAnnotation(node)
×
555
                if err != nil {
×
556
                        if strings.Contains(fmt.Sprint(err), "Failed to get annotation") {
×
557
                                now := time.Now()
×
558
                                if annot.lastErrorTime.IsZero() || now.Sub(annot.lastErrorTime).Seconds() >= 60 {
×
559
                                        annot.lastErrorTime = now
×
560
                                        cont.nodeACIPod[node] = annot
×
561
                                        cont.log.Error(err.Error())
×
562
                                }
×
563
                        } else {
×
564
                                cont.log.Error(err.Error())
×
565
                        }
×
566
                } else {
×
567
                        if annot != cont.nodeACIPod[node] {
×
568
                                cont.nodeACIPod[node] = annot
×
569
                                nodeAnnotationUpdates = append(nodeAnnotationUpdates, node)
×
570
                        }
×
571
                }
572
        }
573
        cont.indexMutex.Unlock()
×
574
        if len(nodeAnnotationUpdates) > 0 {
×
575
                for _, updatednode := range nodeAnnotationUpdates {
×
576
                        go cont.env.NodeAnnotationChanged(updatednode)
×
577
                }
×
578
        }
579
}
580

581
func (cont *AciController) deleteOldOpflexDevices() {
1✔
582
        var nodeUpdates []string
1✔
583
        cont.indexMutex.Lock()
1✔
584
        for node, devices := range cont.nodeOpflexDevice {
1✔
585
                var delDevices apicapi.ApicSlice
×
586
                fabricPathDn := cont.getActiveFabricPathDn(node)
×
587
                if fabricPathDn != "" {
×
588
                        for _, device := range devices {
×
589
                                if device.GetAttrStr("delete") == "true" && device.GetAttrStr("fabricPathDn") != fabricPathDn {
×
590
                                        deleteTimeStr := device.GetAttrStr("deleteTime")
×
591
                                        deleteTime, err := time.Parse(time.RFC3339, deleteTimeStr)
×
592
                                        if err != nil {
×
593
                                                cont.log.Error("Failed to parse opflex device delete time: ", err)
×
594
                                                continue
×
595
                                        }
596
                                        now := time.Now()
×
597
                                        diff := now.Sub(deleteTime)
×
598
                                        if diff.Seconds() >= cont.config.OpflexDeviceDeleteTimeout {
×
599
                                                delDevices = append(delDevices, device)
×
600
                                        }
×
601
                                }
602
                        }
603
                        if len(delDevices) > 0 {
×
604
                                newDevices := deleteDevicesFromList(delDevices, devices)
×
605
                                cont.nodeOpflexDevice[node] = newDevices
×
606
                                cont.log.Info("Opflex device list for node ", node, " after deleting stale entries: ", cont.nodeOpflexDevice[node])
×
607
                                if len(newDevices) == 0 {
×
608
                                        delete(cont.nodeOpflexDevice, node)
×
609
                                }
×
610
                                nodeUpdates = append(nodeUpdates, node)
×
611
                        }
612
                }
613
        }
614
        cont.indexMutex.Unlock()
1✔
615
        if len(nodeUpdates) > 0 {
1✔
616
                cont.postOpflexDeviceDelete(nodeUpdates)
×
617
        }
×
618
}
619

620
// must have index lock
621
func (cont *AciController) setDeleteFlagForOldDevices(node, fabricPathDn string) {
1✔
622
        for _, device := range cont.nodeOpflexDevice[node] {
2✔
623
                if device.GetAttrStr("fabricPathDn") != fabricPathDn {
1✔
624
                        t := time.Now()
×
625
                        device.SetAttr("delete", "true")
×
626
                        device.SetAttr("deleteTime", t.Format(time.RFC3339))
×
627
                }
×
628
        }
629
}
630

631
// must have index lock
632
func (cont *AciController) fabricPathForNode(name string) (string, bool) {
1✔
633
        sz := len(cont.nodeOpflexDevice[name])
1✔
634
        for i := range cont.nodeOpflexDevice[name] {
2✔
635
                device := cont.nodeOpflexDevice[name][sz-1-i]
1✔
636
                deviceState := device.GetAttrStr("state")
1✔
637
                if deviceState == "connected" {
2✔
638
                        if deviceState != device.GetAttrStr("prevState") {
2✔
639
                                cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Info("Processing fabric path for node ",
1✔
640
                                        "when connected device state is found")
1✔
641
                                device.SetAttr("prevState", deviceState)
1✔
642
                        }
1✔
643
                        fabricPathDn := device.GetAttrStr("fabricPathDn")
1✔
644
                        cont.setDeleteFlagForOldDevices(name, fabricPathDn)
1✔
645
                        return fabricPathDn, true
1✔
646
                } else {
1✔
647
                        device.SetAttr("prevState", deviceState)
1✔
648
                }
1✔
649
        }
650
        if sz > 0 {
2✔
651
                // When the opflex-device for a node changes, for example during a live migration,
1✔
652
                // we end up with both the old and the new device objects till the old object
1✔
653
                // ages out on APIC. The new object is at end of the devices list (see opflexDeviceChanged),
1✔
654
                // so we return the fabricPathDn of the last opflex-device.
1✔
655
                cont.fabricPathLogger(cont.nodeOpflexDevice[name][sz-1].GetAttrStr("hostName"),
1✔
656
                        cont.nodeOpflexDevice[name][sz-1]).Info("Processing fabricPathDn for node")
1✔
657
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("fabricPathDn"), true
1✔
658
        }
1✔
659
        return "", false
1✔
660
}
661

662
// must have index lock
663
func (cont *AciController) deviceMacForNode(name string) (string, bool) {
1✔
664
        sz := len(cont.nodeOpflexDevice[name])
1✔
665
        if sz > 0 {
2✔
666
                // When the opflex-device for a node changes, for example when the
1✔
667
                // node is recreated, we end up with both the old and the new
1✔
668
                // device objects till the old object ages out on APIC. The
1✔
669
                // new object is at end of the devices list (see
1✔
670
                // opflexDeviceChanged), so we return the MAC address of the
1✔
671
                // last opflex-device.
1✔
672
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("mac"), true
1✔
673
        }
1✔
674
        return "", false
1✔
675
}
676

677
func apicRedirectDst(rpDn string, ip string, mac string,
678
        descr string, healthGroupDn string, enablePbrTracking bool) apicapi.ApicObject {
1✔
679
        dst := apicapi.NewVnsRedirectDest(rpDn, ip, mac).SetAttr("descr", descr)
1✔
680
        if healthGroupDn != "" && enablePbrTracking {
2✔
681
                dst.AddChild(apicapi.NewVnsRsRedirectHealthGroup(dst.GetDn(),
1✔
682
                        healthGroupDn))
1✔
683
        }
1✔
684
        return dst
1✔
685
}
686

687
func (cont *AciController) apicRedirectPol(name string, tenantName string, nodes []string,
688
        nodeMap map[string]*metadata.ServiceEndpoint,
689
        monPolDn string, enablePbrTracking bool) (apicapi.ApicObject, string) {
1✔
690
        rp := apicapi.NewVnsSvcRedirectPol(tenantName, name)
1✔
691
        rp.SetAttr("thresholdDownAction", "deny")
1✔
692
        if cont.config.DisableResilientHashing {
1✔
693
                rp.SetAttr("resilientHashEnabled", "no")
×
694
        }
×
695
        rpDn := rp.GetDn()
1✔
696
        for _, node := range nodes {
2✔
697
                cont.indexMutex.Lock()
1✔
698
                serviceEp, ok := nodeMap[node]
1✔
699
                if !ok {
1✔
700
                        continue
×
701
                }
702
                if serviceEp.Ipv4 != nil {
2✔
703
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv4.String(),
1✔
704
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
1✔
705
                }
1✔
706
                if serviceEp.Ipv6 != nil {
1✔
707
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv6.String(),
×
708
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
×
709
                }
×
710
                cont.indexMutex.Unlock()
1✔
711
        }
712
        if monPolDn != "" && enablePbrTracking {
2✔
713
                rp.AddChild(apicapi.NewVnsRsIPSLAMonitoringPol(rpDn, monPolDn))
1✔
714
        }
1✔
715
        return rp, rpDn
1✔
716
}
717

718
func apicExtNetCreate(enDn string, ingress string, ipv4 bool,
719
        cidr bool, sharedSec bool) apicapi.ApicObject {
1✔
720
        if !cidr {
2✔
721
                if ipv4 {
2✔
722
                        ingress += "/32"
1✔
723
                } else {
1✔
724
                        ingress += "/128"
×
725
                }
×
726
        }
727
        subnet := apicapi.NewL3extSubnet(enDn, ingress, "", "")
1✔
728
        if sharedSec {
2✔
729
                subnet.SetAttr("scope", "import-security,shared-security")
1✔
730
        }
1✔
731
        return subnet
1✔
732
}
733

734
func apicExtNet(name string, tenantName string, l3Out string,
735
        ingresses []string, sharedSecurity bool, snat bool) apicapi.ApicObject {
1✔
736
        en := apicapi.NewL3extInstP(tenantName, l3Out, name)
1✔
737
        enDn := en.GetDn()
1✔
738
        if snat {
2✔
739
                en.AddChild(apicapi.NewFvRsCons(enDn, name))
1✔
740
        } else {
2✔
741
                en.AddChild(apicapi.NewFvRsProv(enDn, name))
1✔
742
        }
1✔
743

744
        for _, ingress := range ingresses {
2✔
745
                ip, _, _ := net.ParseCIDR(ingress)
1✔
746
                // If ingress is a subnet
1✔
747
                if ip != nil {
2✔
748
                        if ip != nil && ip.To4() != nil {
2✔
749
                                subnet := apicExtNetCreate(enDn, ingress, true, true, sharedSecurity)
1✔
750
                                en.AddChild(subnet)
1✔
751
                        } else if ip != nil && ip.To16() != nil {
1✔
752
                                subnet := apicExtNetCreate(enDn, ingress, false, true, sharedSecurity)
×
753
                                en.AddChild(subnet)
×
754
                        }
×
755
                } else {
1✔
756
                        // If ingress is an IP address
1✔
757
                        ip := net.ParseIP(ingress)
1✔
758
                        if ip != nil && ip.To4() != nil {
2✔
759
                                subnet := apicExtNetCreate(enDn, ingress, true, false, sharedSecurity)
1✔
760
                                en.AddChild(subnet)
1✔
761
                        } else if ip != nil && ip.To16() != nil {
1✔
762
                                subnet := apicExtNetCreate(enDn, ingress, false, false, sharedSecurity)
×
763
                                en.AddChild(subnet)
×
764
                        }
×
765
                }
766
        }
767
        return en
1✔
768
}
769

770
func apicDefaultEgCons(conName string, tenantName string,
771
        appProfile string, epg string) apicapi.ApicObject {
×
772
        enDn := fmt.Sprintf("uni/tn-%s/ap-%s/epg-%s", tenantName, appProfile, epg)
×
773
        return apicapi.NewFvRsCons(enDn, conName)
×
774
}
×
775

776
func apicExtNetCons(conName string, tenantName string,
777
        l3Out string, net string) apicapi.ApicObject {
1✔
778
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
779
        return apicapi.NewFvRsCons(enDn, conName)
1✔
780
}
1✔
781

782
func apicExtNetProv(conName string, tenantName string,
783
        l3Out string, net string) apicapi.ApicObject {
1✔
784
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
785
        return apicapi.NewFvRsProv(enDn, conName)
1✔
786
}
1✔
787

788
// Helper function to check if a string item exists in a slice
789
func stringInSlice(str string, list []string) bool {
1✔
790
        for _, v := range list {
2✔
791
                if v == str {
2✔
792
                        return true
1✔
793
                }
1✔
794
        }
795
        return false
×
796
}
797

798
func validScope(scope string) bool {
1✔
799
        validValues := []string{"", "context", "tenant", "global"}
1✔
800
        return stringInSlice(scope, validValues)
1✔
801
}
1✔
802

803
func (cont *AciController) getGraphNameFromContract(name, tenantName string) (string, error) {
×
804
        var graphName string
×
805
        args := []string{
×
806
                "query-target=subtree",
×
807
        }
×
808
        url := fmt.Sprintf("/api/node/mo/uni/tn-%s/brc-%s.json?%s", tenantName, name, strings.Join(args, "&"))
×
809
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
810
        if err != nil {
×
811
                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
812
                return graphName, err
×
813
        }
×
814
        for _, obj := range apicresp.Imdata {
×
815
                for class, body := range obj {
×
816
                        if class == "vzRsSubjGraphAtt" {
×
817
                                tnVnsAbsGraphName, ok := body.Attributes["tnVnsAbsGraphName"].(string)
×
818
                                if ok {
×
819
                                        graphName = tnVnsAbsGraphName
×
820
                                }
×
821
                                break
×
822
                        }
823
                }
824
        }
825
        cont.log.Debug("graphName: ", graphName)
×
826
        return graphName, err
×
827
}
828

829
func apicContract(conName string, tenantName string,
830
        graphName string, scopeName string, isSnatPbrFltrChain bool,
831
        customSGAnnot bool) apicapi.ApicObject {
1✔
832
        con := apicapi.NewVzBrCP(tenantName, conName)
1✔
833
        if scopeName != "" && scopeName != "context" {
2✔
834
                con.SetAttr("scope", scopeName)
1✔
835
        }
1✔
836
        cs := apicapi.NewVzSubj(con.GetDn(), "loadbalancedservice")
1✔
837
        csDn := cs.GetDn()
1✔
838
        if isSnatPbrFltrChain {
2✔
839
                cs.SetAttr("revFltPorts", "no")
1✔
840
                inTerm := apicapi.NewVzInTerm(csDn)
1✔
841
                outTerm := apicapi.NewVzOutTerm(csDn)
1✔
842
                inTerm.AddChild(apicapi.NewVzRsInTermGraphAtt(inTerm.GetDn(), graphName))
1✔
843
                inTerm.AddChild(apicapi.NewVzRsFiltAtt(inTerm.GetDn(), conName+"_fromCons-toProv"))
1✔
844
                outTerm.AddChild(apicapi.NewVzRsOutTermGraphAtt(outTerm.GetDn(), graphName))
1✔
845
                outTerm.AddChild(apicapi.NewVzRsFiltAtt(outTerm.GetDn(), conName+"_fromProv-toCons"))
1✔
846
                cs.AddChild(inTerm)
1✔
847
                cs.AddChild(outTerm)
1✔
848
        } else {
2✔
849
                cs.AddChild(apicapi.NewVzRsSubjGraphAtt(csDn, graphName, customSGAnnot))
1✔
850
                cs.AddChild(apicapi.NewVzRsSubjFiltAtt(csDn, conName))
1✔
851
        }
1✔
852
        con.AddChild(cs)
1✔
853
        return con
1✔
854
}
855

856
func apicDevCtx(name string, tenantName string,
857
        graphName string, deviceName string, bdName string, rpDn string, isSnatPbrFltrChain bool) apicapi.ApicObject {
1✔
858
        cc := apicapi.NewVnsLDevCtx(tenantName, name, graphName, "loadbalancer")
1✔
859
        ccDn := cc.GetDn()
1✔
860
        graphDn := fmt.Sprintf("uni/tn-%s/lDevVip-%s", tenantName, deviceName)
1✔
861
        lifDn := fmt.Sprintf("%s/lIf-%s", graphDn, "interface")
1✔
862
        bdDn := fmt.Sprintf("uni/tn-%s/BD-%s", tenantName, bdName)
1✔
863
        cc.AddChild(apicapi.NewVnsRsLDevCtxToLDev(ccDn, graphDn))
1✔
864
        rpDnBase := rpDn
1✔
865
        for _, ctxConn := range []string{"consumer", "provider"} {
2✔
866
                lifCtx := apicapi.NewVnsLIfCtx(ccDn, ctxConn)
1✔
867
                if isSnatPbrFltrChain {
2✔
868
                        if ctxConn == "consumer" {
2✔
869
                                rpDn = rpDnBase + "_Cons"
1✔
870
                        } else {
2✔
871
                                rpDn = rpDnBase + "_Prov"
1✔
872
                        }
1✔
873
                }
874
                lifCtxDn := lifCtx.GetDn()
1✔
875
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToSvcRedirectPol(lifCtxDn, rpDn))
1✔
876
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToBD(lifCtxDn, bdDn))
1✔
877
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToLIf(lifCtxDn, lifDn))
1✔
878
                cc.AddChild(lifCtx)
1✔
879
        }
880
        return cc
1✔
881
}
882

883
func apicFilterEntry(filterDn string, count string, p_start string,
884
        p_end string, protocol string, stateful string, snat bool, outTerm bool) apicapi.ApicObject {
1✔
885
        fe := apicapi.NewVzEntry(filterDn, count)
1✔
886
        fe.SetAttr("etherT", "ip")
1✔
887
        fe.SetAttr("prot", protocol)
1✔
888
        if snat {
2✔
889
                if outTerm {
2✔
890
                        if protocol == "tcp" {
2✔
891
                                fe.SetAttr("tcpRules", "est")
1✔
892
                        }
1✔
893
                        // Reverse the ports for outTerm
894
                        fe.SetAttr("dFromPort", p_start)
1✔
895
                        fe.SetAttr("dToPort", p_end)
1✔
896
                } else {
1✔
897
                        fe.SetAttr("sFromPort", p_start)
1✔
898
                        fe.SetAttr("sToPort", p_end)
1✔
899
                }
1✔
900
        } else {
1✔
901
                fe.SetAttr("dFromPort", p_start)
1✔
902
                fe.SetAttr("dToPort", p_end)
1✔
903
        }
1✔
904
        fe.SetAttr("stateful", stateful)
1✔
905
        return fe
1✔
906
}
907
func apicFilter(name string, tenantName string,
908
        portSpec []v1.ServicePort, snat bool, snatRange portRangeSnat) apicapi.ApicObject {
1✔
909
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
910
        filterDn := filter.GetDn()
1✔
911

1✔
912
        var i int
1✔
913
        var port v1.ServicePort
1✔
914
        for i, port = range portSpec {
2✔
915
                pstr := strconv.Itoa(int(port.Port))
1✔
916
                proto := getProtocolStr(port.Protocol)
1✔
917
                fe := apicFilterEntry(filterDn, strconv.Itoa(i), pstr,
1✔
918
                        pstr, proto, "no", false, false)
1✔
919
                filter.AddChild(fe)
1✔
920
        }
1✔
921

922
        if snat {
1✔
923
                portSpec := []portRangeSnat{snatRange}
×
924
                p_start := strconv.Itoa(portSpec[0].start)
×
925
                p_end := strconv.Itoa(portSpec[0].end)
×
926

×
927
                fe1 := apicFilterEntry(filterDn, strconv.Itoa(i+1), p_start,
×
928
                        p_end, "tcp", "no", false, false)
×
929
                filter.AddChild(fe1)
×
930
                fe2 := apicFilterEntry(filterDn, strconv.Itoa(i+2), p_start,
×
931
                        p_end, "udp", "no", false, false)
×
932
                filter.AddChild(fe2)
×
933
        }
×
934
        return filter
1✔
935
}
936

937
func apicFilterSnat(name string, tenantName string,
938
        portSpec []portRangeSnat, outTerm bool) apicapi.ApicObject {
1✔
939
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
940
        filterDn := filter.GetDn()
1✔
941

1✔
942
        p_start := strconv.Itoa(portSpec[0].start)
1✔
943
        p_end := strconv.Itoa(portSpec[0].end)
1✔
944

1✔
945
        fe := apicFilterEntry(filterDn, "0", p_start,
1✔
946
                p_end, "tcp", "no", true, outTerm)
1✔
947
        filter.AddChild(fe)
1✔
948
        fe1 := apicFilterEntry(filterDn, "1", p_start,
1✔
949
                p_end, "udp", "no", true, outTerm)
1✔
950
        filter.AddChild(fe1)
1✔
951

1✔
952
        return filter
1✔
953
}
1✔
954

955
func (cont *AciController) updateServiceDeviceInstance(key string,
956
        service *v1.Service) error {
1✔
957
        cont.indexMutex.Lock()
1✔
958
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
959
        cont.serviceEndPoints.GetnodesMetadata(key, service, nodeMap)
1✔
960
        cont.indexMutex.Unlock()
1✔
961

1✔
962
        var nodes []string
1✔
963
        for node := range nodeMap {
2✔
964
                nodes = append(nodes, node)
1✔
965
        }
1✔
966
        sort.Strings(nodes)
1✔
967
        name := cont.aciNameForKey("svc", key)
1✔
968
        var conScope string
1✔
969
        scopeVal, ok := service.ObjectMeta.Annotations[metadata.ServiceContractScopeAnnotation]
1✔
970
        if ok {
2✔
971
                normScopeVal := strings.ToLower(scopeVal)
1✔
972
                if !validScope(normScopeVal) {
1✔
973
                        errString := "Invalid service contract scope value provided " + scopeVal
×
974
                        err := errors.New(errString)
×
975
                        serviceLogger(cont.log, service).Error("Could not create contract: ", err)
×
976
                        return err
×
977
                } else {
1✔
978
                        conScope = normScopeVal
1✔
979
                }
1✔
980
        } else {
1✔
981
                conScope = DefaultServiceContractScope
1✔
982
        }
1✔
983

984
        var sharedSecurity bool
1✔
985
        if conScope == "global" {
2✔
986
                sharedSecurity = true
1✔
987
        } else {
2✔
988
                sharedSecurity = DefaultServiceExtSubNetShared
1✔
989
        }
1✔
990

991
        graphName := cont.aciNameForKey("svc", "global")
1✔
992
        deviceName := cont.aciNameForKey("svc", "global")
1✔
993
        _, customSGAnnPresent := service.ObjectMeta.Annotations[metadata.ServiceGraphNameAnnotation]
1✔
994
        if customSGAnnPresent {
1✔
995
                customSG, err := cont.getGraphNameFromContract(name, cont.config.AciVrfTenant)
×
996
                if err == nil {
×
997
                        graphName = customSG
×
998
                }
×
999
        }
1000
        cont.log.Debug("Using service graph ", graphName, " for service ", key)
1✔
1001

1✔
1002
        var serviceObjs apicapi.ApicSlice
1✔
1003
        if len(nodes) > 0 {
2✔
1004
                // 1. Service redirect policy
1✔
1005
                // The service redirect policy contains the MAC address
1✔
1006
                // and IP address of each of the service endpoints for
1✔
1007
                // each node that hosts a pod for this service.  The
1✔
1008
                // example below shows the case of two nodes.
1✔
1009
                rp, rpDn :=
1✔
1010
                        cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
1✔
1011
                                nodeMap, cont.staticMonPolDn(), cont.config.AciPbrTrackingNonSnat)
1✔
1012
                serviceObjs = append(serviceObjs, rp)
1✔
1013

1✔
1014
                // 2. Service graph contract and external network
1✔
1015
                // The service graph contract must be bound to the service
1✔
1016
                // graph.  This contract must be consumed by the default
1✔
1017
                // layer 3 network and provided by the service layer 3
1✔
1018
                // network.
1✔
1019
                {
2✔
1020
                        var ingresses []string
1✔
1021
                        for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
1022
                                ingresses = append(ingresses, ingress.IP)
1✔
1023
                        }
1✔
1024
                        serviceObjs = append(serviceObjs,
1✔
1025
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
1026
                                        cont.config.AciL3Out, ingresses, sharedSecurity, false))
1✔
1027
                }
1028

1029
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, false, customSGAnnPresent)
1✔
1030
                serviceObjs = append(serviceObjs, contract)
1✔
1031
                for _, net := range cont.config.AciExtNetworks {
2✔
1032
                        serviceObjs = append(serviceObjs,
1✔
1033
                                apicExtNetCons(name, cont.config.AciVrfTenant,
1✔
1034
                                        cont.config.AciL3Out, net))
1✔
1035
                }
1✔
1036

1037
                if cont.config.AddExternalContractToDefaultEPG && service.Spec.Type == v1.ServiceTypeLoadBalancer {
1✔
1038
                        defaultEpgTenant := cont.config.DefaultEg.PolicySpace
×
1039
                        defaultEpgStringSplit := strings.Split(cont.config.DefaultEg.Name, "|")
×
1040
                        var defaultEpgName, appProfile string
×
1041
                        if len(defaultEpgStringSplit) > 1 {
×
1042
                                appProfile = defaultEpgStringSplit[0]
×
1043
                                defaultEpgName = defaultEpgStringSplit[1]
×
1044
                        } else {
×
1045
                                appProfile = cont.config.AppProfile
×
1046
                                defaultEpgName = defaultEpgStringSplit[0]
×
1047
                        }
×
1048
                        serviceObjs = append(serviceObjs,
×
1049
                                apicDefaultEgCons(name, defaultEpgTenant, appProfile, defaultEpgName))
×
1050
                }
1051

1052
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
1053
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
1054

1✔
1055
                _, snat := cont.snatServices[key]
1✔
1056
                filter := apicFilter(name, cont.config.AciVrfTenant,
1✔
1057
                        service.Spec.Ports, snat, defaultPortRange)
1✔
1058
                serviceObjs = append(serviceObjs, filter)
1✔
1059

1✔
1060
                // 3. Device cluster context
1✔
1061
                // The logical device context binds the service contract
1✔
1062
                // to the redirect policy and the device cluster and
1✔
1063
                // bridge domain for the device cluster.
1✔
1064
                serviceObjs = append(serviceObjs,
1✔
1065
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName, deviceName,
1✔
1066
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, false))
1✔
1067
        }
1068

1069
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1070
        return nil
1✔
1071
}
1072

1073
func (cont *AciController) updateServiceDeviceInstanceSnat(key string) error {
1✔
1074
        nodeList := cont.nodeIndexer.List()
1✔
1075
        cont.indexMutex.Lock()
1✔
1076
        if len(cont.nodeServiceMetaCache) == 0 {
2✔
1077
                cont.indexMutex.Unlock()
1✔
1078
                return nil
1✔
1079
        }
1✔
1080
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
1081
        sort.Slice(nodeList, func(i, j int) bool {
2✔
1082
                nodeA := nodeList[i].(*v1.Node)
1✔
1083
                nodeB := nodeList[j].(*v1.Node)
1✔
1084
                return nodeA.ObjectMeta.Name < nodeB.ObjectMeta.Name
1✔
1085
        })
1✔
1086
        for itr, nodeItem := range nodeList {
2✔
1087
                if itr == cont.config.MaxSvcGraphNodes {
1✔
1088
                        break
×
1089
                }
1090
                node := nodeItem.(*v1.Node)
1✔
1091
                nodeName := node.ObjectMeta.Name
1✔
1092
                nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
1093
                if !ok {
2✔
1094
                        continue
1✔
1095
                }
1096
                _, ok = cont.fabricPathForNode(nodeName)
1✔
1097
                if !ok {
1✔
1098
                        continue
×
1099
                }
1100
                nodeLabels := node.ObjectMeta.Labels
1✔
1101
                excludeNode := cont.nodeLabelsInExcludeList(nodeLabels)
1✔
1102
                if excludeNode {
1✔
1103
                        continue
×
1104
                }
1105
                nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
1106
        }
1107
        cont.indexMutex.Unlock()
1✔
1108

1✔
1109
        var nodes []string
1✔
1110
        for node := range nodeMap {
2✔
1111
                nodes = append(nodes, node)
1✔
1112
        }
1✔
1113
        sort.Strings(nodes)
1✔
1114
        name := cont.aciNameForKey("snat", key)
1✔
1115
        var conScope = cont.config.SnatSvcContractScope
1✔
1116
        sharedSecurity := true
1✔
1117

1✔
1118
        graphName := cont.aciNameForKey("svc", "global")
1✔
1119
        var serviceObjs apicapi.ApicSlice
1✔
1120
        if len(nodes) > 0 {
2✔
1121
                // 1. Service redirect policy
1✔
1122
                // The service redirect policy contains the MAC address
1✔
1123
                // and IP address of each of the service endpoints for
1✔
1124
                // each node that hosts a pod for this service.
1✔
1125
                // For SNAT with the introduction of filter-chain usage, to work-around
1✔
1126
                // an APIC limitation, creating two PBR policies with same nodes.
1✔
1127
                var rpDn string
1✔
1128
                var rp apicapi.ApicObject
1✔
1129
                if cont.apicConn.SnatPbrFltrChain {
2✔
1130
                        rpCons, rpDnCons :=
1✔
1131
                                cont.apicRedirectPol(name+"_Cons", cont.config.AciVrfTenant, nodes,
1✔
1132
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
1133
                        serviceObjs = append(serviceObjs, rpCons)
1✔
1134
                        rpProv, _ :=
1✔
1135
                                cont.apicRedirectPol(name+"_Prov", cont.config.AciVrfTenant, nodes,
1✔
1136
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
1137
                        serviceObjs = append(serviceObjs, rpProv)
1✔
1138
                        rpDn = strings.TrimSuffix(rpDnCons, "_Cons")
1✔
1139
                } else {
1✔
1140
                        rp, rpDn =
×
1141
                                cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
×
1142
                                        nodeMap, cont.staticMonPolDn(), true)
×
1143
                        serviceObjs = append(serviceObjs, rp)
×
1144
                }
×
1145
                // 2. Service graph contract and external network
1146
                // The service graph contract must be bound to the
1147
                // service
1148
                // graph.  This contract must be consumed by the default
1149
                // layer 3 network and provided by the service layer 3
1150
                // network.
1151
                {
1✔
1152
                        var ingresses []string
1✔
1153
                        for _, policy := range cont.snatPolicyCache {
2✔
1154
                                ingresses = append(ingresses, policy.SnatIp...)
1✔
1155
                        }
1✔
1156
                        serviceObjs = append(serviceObjs,
1✔
1157
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
1158
                                        cont.config.AciL3Out, ingresses, sharedSecurity, true))
1✔
1159
                }
1160

1161
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, cont.apicConn.SnatPbrFltrChain, false)
1✔
1162
                serviceObjs = append(serviceObjs, contract)
1✔
1163

1✔
1164
                for _, net := range cont.config.AciExtNetworks {
2✔
1165
                        serviceObjs = append(serviceObjs,
1✔
1166
                                apicExtNetProv(name, cont.config.AciVrfTenant,
1✔
1167
                                        cont.config.AciL3Out, net))
1✔
1168
                }
1✔
1169

1170
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
1171
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
1172
                portSpec := []portRangeSnat{defaultPortRange}
1✔
1173
                if cont.apicConn.SnatPbrFltrChain {
2✔
1174
                        filterIn := apicFilterSnat(name+"_fromCons-toProv", cont.config.AciVrfTenant, portSpec, false)
1✔
1175
                        serviceObjs = append(serviceObjs, filterIn)
1✔
1176
                        filterOut := apicFilterSnat(name+"_fromProv-toCons", cont.config.AciVrfTenant, portSpec, true)
1✔
1177
                        serviceObjs = append(serviceObjs, filterOut)
1✔
1178
                } else {
1✔
1179
                        filter := apicFilterSnat(name, cont.config.AciVrfTenant, portSpec, false)
×
1180
                        serviceObjs = append(serviceObjs, filter)
×
1181
                }
×
1182
                // 3. Device cluster context
1183
                // The logical device context binds the service contract
1184
                // to the redirect policy and the device cluster and
1185
                // bridge domain for the device cluster.
1186
                serviceObjs = append(serviceObjs,
1✔
1187
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName, graphName,
1✔
1188
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, cont.apicConn.SnatPbrFltrChain))
1✔
1189
        }
1190
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1191
        return nil
1✔
1192
}
1193

1194
func (cont *AciController) nodeLabelsInExcludeList(Labels map[string]string) bool {
1✔
1195
        nodeSnatRedirectExclude := cont.config.NodeSnatRedirectExclude
1✔
1196

1✔
1197
        for _, nodeGroup := range nodeSnatRedirectExclude {
1✔
1198
                if len(nodeGroup.Labels) == 0 {
×
1199
                        continue
×
1200
                }
1201
                matchFound := true
×
1202
                for _, label := range nodeGroup.Labels {
×
1203
                        if _, ok := Labels["node-role.kubernetes.io/"+label]; !ok {
×
1204
                                matchFound = false
×
1205
                                break
×
1206
                        }
1207
                }
1208
                if matchFound {
×
1209
                        return true
×
1210
                }
×
1211
        }
1212
        return false
1✔
1213
}
1214

1215
func (cont *AciController) queueServiceUpdateByKey(key string) {
1✔
1216
        cont.serviceQueue.Add(key)
1✔
1217
}
1✔
1218

1219
func (cont *AciController) queueServiceUpdate(service *v1.Service) {
1✔
1220
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
1221
        if err != nil {
1✔
1222
                serviceLogger(cont.log, service).
×
1223
                        Error("Could not create service key: ", err)
×
1224
                return
×
1225
        }
×
1226
        cont.serviceQueue.Add(key)
1✔
1227
}
1228

1229
func apicDeviceCluster(name string, vrfTenant string,
1230
        physDom string, encap string,
1231
        nodes []string, nodeMap map[string]string) (apicapi.ApicObject, string) {
1✔
1232
        dc := apicapi.NewVnsLDevVip(vrfTenant, name)
1✔
1233
        dc.SetAttr("managed", "no")
1✔
1234
        dcDn := dc.GetDn()
1✔
1235
        dc.AddChild(apicapi.NewVnsRsALDevToPhysDomP(dcDn,
1✔
1236
                fmt.Sprintf("uni/phys-%s", physDom)))
1✔
1237
        lif := apicapi.NewVnsLIf(dcDn, "interface")
1✔
1238
        lif.SetAttr("encap", encap)
1✔
1239
        lifDn := lif.GetDn()
1✔
1240

1✔
1241
        for _, node := range nodes {
2✔
1242
                path, ok := nodeMap[node]
1✔
1243
                if !ok {
1✔
1244
                        continue
×
1245
                }
1246

1247
                cdev := apicapi.NewVnsCDev(dcDn, node)
1✔
1248
                cif := apicapi.NewVnsCif(cdev.GetDn(), "interface")
1✔
1249
                cif.AddChild(apicapi.NewVnsRsCIfPathAtt(cif.GetDn(), path))
1✔
1250
                cdev.AddChild(cif)
1✔
1251
                lif.AddChild(apicapi.NewVnsRsCIfAttN(lifDn, cif.GetDn()))
1✔
1252
                dc.AddChild(cdev)
1✔
1253
        }
1254

1255
        dc.AddChild(lif)
1✔
1256

1✔
1257
        return dc, dcDn
1✔
1258
}
1259

1260
func apicServiceGraph(name string, tenantName string,
1261
        dcDn string) apicapi.ApicObject {
1✔
1262
        sg := apicapi.NewVnsAbsGraph(tenantName, name)
1✔
1263
        sgDn := sg.GetDn()
1✔
1264
        var provDn string
1✔
1265
        var consDn string
1✔
1266
        var cTermDn string
1✔
1267
        var pTermDn string
1✔
1268
        {
2✔
1269
                an := apicapi.NewVnsAbsNode(sgDn, "loadbalancer")
1✔
1270
                an.SetAttr("managed", "no")
1✔
1271
                an.SetAttr("routingMode", "Redirect")
1✔
1272
                anDn := an.GetDn()
1✔
1273
                cons := apicapi.NewVnsAbsFuncConn(anDn, "consumer")
1✔
1274
                consDn = cons.GetDn()
1✔
1275
                an.AddChild(cons)
1✔
1276
                prov := apicapi.NewVnsAbsFuncConn(anDn, "provider")
1✔
1277
                provDn = prov.GetDn()
1✔
1278
                an.AddChild(prov)
1✔
1279
                an.AddChild(apicapi.NewVnsRsNodeToLDev(anDn, dcDn))
1✔
1280
                sg.AddChild(an)
1✔
1281
        }
1✔
1282
        {
1✔
1283
                tnc := apicapi.NewVnsAbsTermNodeCon(sgDn, "T1")
1✔
1284
                tncDn := tnc.GetDn()
1✔
1285
                cTerm := apicapi.NewVnsAbsTermConn(tncDn)
1✔
1286
                cTermDn = cTerm.GetDn()
1✔
1287
                tnc.AddChild(cTerm)
1✔
1288
                tnc.AddChild(apicapi.NewVnsInTerm(tncDn))
1✔
1289
                tnc.AddChild(apicapi.NewVnsOutTerm(tncDn))
1✔
1290
                sg.AddChild(tnc)
1✔
1291
        }
1✔
1292
        {
1✔
1293
                tnp := apicapi.NewVnsAbsTermNodeProv(sgDn, "T2")
1✔
1294
                tnpDn := tnp.GetDn()
1✔
1295
                pTerm := apicapi.NewVnsAbsTermConn(tnpDn)
1✔
1296
                pTermDn = pTerm.GetDn()
1✔
1297
                tnp.AddChild(pTerm)
1✔
1298
                tnp.AddChild(apicapi.NewVnsInTerm(tnpDn))
1✔
1299
                tnp.AddChild(apicapi.NewVnsOutTerm(tnpDn))
1✔
1300
                sg.AddChild(tnp)
1✔
1301
        }
1✔
1302
        {
1✔
1303
                acc := apicapi.NewVnsAbsConnection(sgDn, "C1")
1✔
1304
                acc.SetAttr("connDir", "provider")
1✔
1305
                accDn := acc.GetDn()
1✔
1306
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, consDn))
1✔
1307
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, cTermDn))
1✔
1308
                sg.AddChild(acc)
1✔
1309
        }
1✔
1310
        {
1✔
1311
                acp := apicapi.NewVnsAbsConnection(sgDn, "C2")
1✔
1312
                acp.SetAttr("connDir", "provider")
1✔
1313
                acpDn := acp.GetDn()
1✔
1314
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, provDn))
1✔
1315
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, pTermDn))
1✔
1316
                sg.AddChild(acp)
1✔
1317
        }
1✔
1318
        return sg
1✔
1319
}
1320
func (cont *AciController) updateDeviceCluster() {
1✔
1321
        nodeMap := make(map[string]string)
1✔
1322
        cont.indexMutex.Lock()
1✔
1323
        for node := range cont.nodeOpflexDevice {
2✔
1324
                cont.log.Debug("Processing node in nodeOpflexDevice cache : ", node)
1✔
1325
                fabricPath, ok := cont.fabricPathForNode(node)
1✔
1326
                if !ok {
2✔
1327
                        continue
1✔
1328
                }
1329
                nodeMap[node] = fabricPath
1✔
1330
        }
1331

1332
        // For clusters other than OpenShift On OpenStack,
1333
        // openStackFabricPathDnMap will be empty
1334
        for host, opflexOdevInfo := range cont.openStackFabricPathDnMap {
1✔
1335
                nodeMap[host] = opflexOdevInfo.fabricPathDn
×
1336
        }
×
1337

1338
        // For OpenShift On OpenStack clusters,
1339
        // hostFabricPathDnMap will be empty
1340
        for _, hostInfo := range cont.hostFabricPathDnMap {
1✔
1341
                if hostInfo.fabricPathDn != "" {
×
1342
                        nodeMap[hostInfo.host] = hostInfo.fabricPathDn
×
1343
                }
×
1344
        }
1345
        cont.indexMutex.Unlock()
1✔
1346

1✔
1347
        var nodes []string
1✔
1348
        for node := range nodeMap {
2✔
1349
                nodes = append(nodes, node)
1✔
1350
        }
1✔
1351
        sort.Strings(nodes)
1✔
1352

1✔
1353
        name := cont.aciNameForKey("svc", "global")
1✔
1354
        var serviceObjs apicapi.ApicSlice
1✔
1355

1✔
1356
        // 1. Device cluster:
1✔
1357
        // The device cluster is a set of physical paths that need to be
1✔
1358
        // created for each node in the cluster, that correspond to the
1✔
1359
        // service interface for each node.
1✔
1360
        dc, dcDn := apicDeviceCluster(name, cont.config.AciVrfTenant,
1✔
1361
                cont.config.AciServicePhysDom, cont.config.AciServiceEncap,
1✔
1362
                nodes, nodeMap)
1✔
1363
        serviceObjs = append(serviceObjs, dc)
1✔
1364

1✔
1365
        // 2. Service graph template
1✔
1366
        // The service graph controls how the traffic will be redirected.
1✔
1367
        // A service graph must be created for each device cluster.
1✔
1368
        serviceObjs = append(serviceObjs,
1✔
1369
                apicServiceGraph(name, cont.config.AciVrfTenant, dcDn))
1✔
1370

1✔
1371
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1372
}
1373

1374
func (cont *AciController) fabricPathLogger(node string,
1375
        obj apicapi.ApicObject) *logrus.Entry {
1✔
1376
        return cont.log.WithFields(logrus.Fields{
1✔
1377
                "fabricPath": obj.GetAttr("fabricPathDn"),
1✔
1378
                "mac":        obj.GetAttr("mac"),
1✔
1379
                "node":       node,
1✔
1380
                "obj":        obj,
1✔
1381
        })
1✔
1382
}
1✔
1383

1384
func (cont *AciController) setOpenStackSystemId() string {
×
1385

×
1386
        // 1) get opflexIDEp with containerName == <node name of any one of the openshift nodes>
×
1387
        // 2) extract OpenStack system id from compHvDn attribute
×
1388
        //    comp/prov-OpenStack/ctrlr-[k8s-scale]-k8s-scale/hv-overcloud-novacompute-0 - sample compHvDn,
×
1389
        //    where k8s-scale is the system id
×
1390

×
1391
        var systemId string
×
1392
        nodeList := cont.nodeIndexer.List()
×
1393
        if len(nodeList) < 1 {
×
1394
                return systemId
×
1395
        }
×
1396
        node := nodeList[0].(*v1.Node)
×
1397
        nodeName := node.ObjectMeta.Name
×
1398
        opflexIDEpFilter := fmt.Sprintf("query-target-filter=and(eq(opflexIDEp.containerName,\"%s\"))", nodeName)
×
1399
        opflexIDEpArgs := []string{
×
1400
                opflexIDEpFilter,
×
1401
        }
×
1402
        url := fmt.Sprintf("/api/node/class/opflexIDEp.json?%s", strings.Join(opflexIDEpArgs, "&"))
×
1403
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
1404
        if err != nil {
×
1405
                cont.log.Error("Failed to get APIC response, err: ", err.Error())
×
1406
                return systemId
×
1407
        }
×
1408
        for _, obj := range apicresp.Imdata {
×
1409
                for _, body := range obj {
×
1410
                        compHvDn, ok := body.Attributes["compHvDn"].(string)
×
1411
                        if ok {
×
1412
                                systemId = compHvDn[strings.IndexByte(compHvDn, '[')+1 : strings.IndexByte(compHvDn, ']')]
×
1413
                                break
×
1414
                        }
1415
                }
1416
        }
1417
        cont.indexMutex.Lock()
×
1418
        cont.openStackSystemId = systemId
×
1419
        cont.log.Info("Setting OpenStack system id : ", cont.openStackSystemId)
×
1420
        cont.indexMutex.Unlock()
×
1421
        return systemId
×
1422
}
1423

1424
// Returns true when a new OpenStack opflexODev is added
1425
func (cont *AciController) openStackOpflexOdevUpdate(obj apicapi.ApicObject) bool {
×
1426

×
1427
        // If opflexOdev compHvDn contains comp/prov-OpenShift/ctrlr-[<systemid>]-<systemid>,
×
1428
        // it means that it is an OpenStack OpflexOdev which belongs to OpenStack with system id <systemid>
×
1429

×
1430
        var deviceClusterUpdate bool
×
1431
        compHvDn := obj.GetAttrStr("compHvDn")
×
1432
        if strings.Contains(compHvDn, "prov-OpenStack") {
×
1433
                cont.indexMutex.Lock()
×
1434
                systemId := cont.openStackSystemId
×
1435
                cont.indexMutex.Unlock()
×
1436
                if systemId == "" {
×
1437
                        systemId = cont.setOpenStackSystemId()
×
1438
                }
×
1439
                if systemId == "" {
×
1440
                        cont.log.Error("Failed  to get OpenStack system id")
×
1441
                        return deviceClusterUpdate
×
1442
                }
×
1443
                prefix := fmt.Sprintf("comp/prov-OpenStack/ctrlr-[%s]-%s", systemId, systemId)
×
1444
                if strings.Contains(compHvDn, prefix) {
×
1445
                        cont.log.Info("Received notification for OpenStack opflexODev update, hostName: ",
×
1446
                                obj.GetAttrStr("hostName"), " dn: ", obj.GetAttrStr("dn"))
×
1447
                        cont.indexMutex.Lock()
×
1448
                        opflexOdevInfo, ok := cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")]
×
1449
                        if ok {
×
1450
                                opflexOdevInfo.opflexODevDn[obj.GetAttrStr("dn")] = struct{}{}
×
1451
                                cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")] = opflexOdevInfo
×
1452
                        } else {
×
1453
                                var openstackopflexodevinfo openstackOpflexOdevInfo
×
1454
                                opflexODevDn := make(map[string]struct{})
×
1455
                                opflexODevDn[obj.GetAttrStr("dn")] = struct{}{}
×
1456
                                openstackopflexodevinfo.fabricPathDn = obj.GetAttrStr("fabricPathDn")
×
1457
                                openstackopflexodevinfo.opflexODevDn = opflexODevDn
×
1458
                                cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")] = openstackopflexodevinfo
×
1459
                                deviceClusterUpdate = true
×
1460
                        }
×
1461
                        cont.indexMutex.Unlock()
×
1462
                }
1463
        }
1464
        return deviceClusterUpdate
×
1465
}
1466

1467
func (cont *AciController) infraRtAttEntPDeleted(dn string) {
×
1468
        // dn format : uni/infra/attentp-k8s-scale-esxi-aaep/rtattEntP-[uni/infra/funcprof/accbundle-esxi1-vpc-ipg]
×
1469
        cont.log.Info("Processing delete of infraRtAttEntP: ", dn)
×
1470

×
1471
        // extract uni/infra/funcprof/accbundle-esxi1-vpc-ipg
×
1472
        re := regexp.MustCompile(`\[(.*?)\]`)
×
1473
        matches := re.FindStringSubmatch(dn)
×
1474

×
1475
        if len(matches) < 2 {
×
1476
                cont.log.Error("Failed to extract ipg from dn : ", dn)
×
1477
                return
×
1478
        }
×
1479
        tdn := matches[1]
×
1480

×
1481
        cont.indexMutex.Lock()
×
1482
        _, ok := cont.hostFabricPathDnMap[tdn]
×
1483
        if ok {
×
1484
                delete(cont.hostFabricPathDnMap, tdn)
×
1485
                cont.log.Info("Deleted ipg : ", tdn)
×
1486
        }
×
1487
        cont.indexMutex.Unlock()
×
1488

×
1489
        if ok {
×
1490
                cont.updateDeviceCluster()
×
1491
        }
×
1492
}
1493

1494
func (cont *AciController) vpcIfDeleted(dn string) {
×
1495
        var deleted bool
×
1496
        cont.indexMutex.Lock()
×
1497
        for tDn, hostInfo := range cont.hostFabricPathDnMap {
×
1498
                if _, present := hostInfo.vpcIfDn[dn]; present {
×
1499
                        cont.log.Info("Deleting vpcIf, dn :", dn)
×
1500
                        delete(hostInfo.vpcIfDn, dn)
×
1501
                        if len(hostInfo.vpcIfDn) == 0 {
×
1502
                                cont.log.Infof("Removing fabricPathDn(%s) of ipg : %s ", hostInfo.fabricPathDn, hostInfo.host)
×
1503
                                hostInfo.fabricPathDn = ""
×
1504
                                deleted = true
×
1505
                        }
×
1506
                        cont.hostFabricPathDnMap[tDn] = hostInfo
×
1507
                }
1508
        }
1509
        cont.indexMutex.Unlock()
×
1510
        if deleted {
×
1511
                cont.updateDeviceCluster()
×
1512
        }
×
1513
}
1514

1515
func (cont *AciController) vpcIfChanged(obj apicapi.ApicObject) {
×
1516
        if cont.updateHostFabricPathDnMap(obj) {
×
1517
                cont.updateDeviceCluster()
×
1518
        }
×
1519
}
1520

1521
func (cont *AciController) updateHostFabricPathDnMap(obj apicapi.ApicObject) bool {
×
1522
        var accBndlGrpDn, fabricPathDn, dn string
×
1523
        for _, body := range obj {
×
1524
                var ok bool
×
1525
                accBndlGrpDn, ok = body.Attributes["accBndlGrpDn"].(string)
×
1526
                if !ok || (ok && accBndlGrpDn == "") {
×
1527
                        cont.log.Error("accBndlGrpDn missing/empty in vpcIf")
×
1528
                        return false
×
1529
                }
×
1530
                fabricPathDn, ok = body.Attributes["fabricPathDn"].(string)
×
1531
                if !ok && (ok && fabricPathDn == "") {
×
1532
                        cont.log.Error("fabricPathDn missing/empty in vpcIf")
×
1533
                        return false
×
1534
                }
×
1535
                dn, ok = body.Attributes["dn"].(string)
×
1536
                if !ok && (ok && dn == "") {
×
1537
                        cont.log.Error("dn missing/empty in vpcIf")
×
1538
                        return false
×
1539
                }
×
1540
        }
1541
        var updated bool
×
1542
        cont.indexMutex.Lock()
×
1543
        // If accBndlGrpDn exists in hostFabricPathDnMap, the vpcIf belongs to the cluster AEP
×
1544
        hostInfo, exists := cont.hostFabricPathDnMap[accBndlGrpDn]
×
1545
        if exists {
×
1546
                if _, present := hostInfo.vpcIfDn[dn]; !present {
×
1547
                        hostInfo.vpcIfDn[dn] = struct{}{}
×
1548
                        cont.log.Infof("vpcIf processing, dn : %s, accBndlGrpDn: %s", dn, accBndlGrpDn)
×
1549
                }
×
1550
                if hostInfo.fabricPathDn != fabricPathDn {
×
1551
                        hostInfo.fabricPathDn = fabricPathDn
×
1552
                        cont.log.Info("Updated fabricPathDn of ipg :", hostInfo.host, " to: ", hostInfo.fabricPathDn)
×
1553
                        updated = true
×
1554
                }
×
1555
                cont.hostFabricPathDnMap[accBndlGrpDn] = hostInfo
×
1556
        }
1557
        cont.indexMutex.Unlock()
×
1558
        return updated
×
1559
}
1560

1561
func (cont *AciController) infraRtAttEntPChanged(obj apicapi.ApicObject) {
×
1562
        var tdn string
×
1563
        for _, body := range obj {
×
1564
                var ok bool
×
1565
                tdn, ok = body.Attributes["tDn"].(string)
×
1566
                if !ok || (ok && tdn == "") {
×
1567
                        cont.log.Error("tDn missing/empty in infraRtAttEntP")
×
1568
                        return
×
1569
                }
×
1570
        }
1571
        var updated bool
×
1572
        cont.log.Info("infraRtAttEntP updated, tDn : ", tdn)
×
1573

×
1574
        // tdn format for vpc : /uni/infra/funcprof/accbundle-esxi1-vpc-ipg
×
1575
        // tdn format for single leaf : /uni/infra/funcprof/accportgrp-IPG_CLIENT_SIM
×
1576

×
1577
        // Ignore processing of single leaf
×
1578
        if !strings.Contains(tdn, "/accbundle-") {
×
1579
                cont.log.Info("Skipping processing of infraRtAttEntP update, not applicable for non-VPC configuration: ", tdn)
×
1580
                return
×
1581
        }
×
1582

1583
        // extract esxi1-vpc-ipg
1584
        parts := strings.Split(tdn, "/")
×
1585
        lastPart := parts[len(parts)-1]
×
1586
        host := strings.TrimPrefix(lastPart, "accbundle-")
×
1587

×
1588
        // adding entry for ipg in hostFabricPathDnMap
×
1589
        cont.indexMutex.Lock()
×
1590
        _, exists := cont.hostFabricPathDnMap[tdn]
×
1591
        if !exists {
×
1592
                var hostInfo hostFabricInfo
×
1593
                hostInfo.host = host
×
1594
                hostInfo.vpcIfDn = make(map[string]struct{})
×
1595
                cont.hostFabricPathDnMap[tdn] = hostInfo
×
1596
        }
×
1597
        cont.indexMutex.Unlock()
×
1598

×
1599
        accBndlGrpFilter := fmt.Sprintf(`query-target-filter=and(eq(vpcIf.accBndlGrpDn,"%s"))`, tdn)
×
1600
        url := fmt.Sprintf("/api/class/vpcIf.json?%s", accBndlGrpFilter)
×
1601
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
1602
        if err != nil {
×
1603
                cont.log.Error("Failed to get APIC response, err: ", err.Error())
×
1604
                return
×
1605
        }
×
1606

1607
        for _, obj := range apicresp.Imdata {
×
1608
                if cont.updateHostFabricPathDnMap(obj) && !updated {
×
1609
                        updated = true
×
1610
                }
×
1611
        }
1612

1613
        if updated {
×
1614
                cont.updateDeviceCluster()
×
1615
        }
×
1616
        return
×
1617
}
1618

1619
func (cont *AciController) opflexDeviceChanged(obj apicapi.ApicObject) {
1✔
1620
        devType := obj.GetAttrStr("devType")
1✔
1621
        domName := obj.GetAttrStr("domName")
1✔
1622
        ctrlrName := obj.GetAttrStr("ctrlrName")
1✔
1623

1✔
1624
        if !cont.config.DisableServiceVlanPreprovisioning && strings.Contains(cont.config.Flavor, "openstack") {
1✔
1625
                if cont.openStackOpflexOdevUpdate(obj) {
×
1626
                        cont.log.Info("OpenStack opflexODev for ", obj.GetAttrStr("hostName"), " is added")
×
1627
                        cont.updateDeviceCluster()
×
1628
                }
×
1629
        }
1630
        if (devType == cont.env.OpFlexDeviceType()) && (domName == cont.config.AciVmmDomain) && (ctrlrName == cont.config.AciVmmController) {
2✔
1631
                cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Processing opflex device update")
1✔
1632
                if obj.GetAttrStr("state") == "disconnected" {
2✔
1633
                        cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Opflex device disconnected")
1✔
1634
                        cont.indexMutex.Lock()
1✔
1635
                        for node, devices := range cont.nodeOpflexDevice {
1✔
1636
                                if node == obj.GetAttrStr("hostName") {
×
1637
                                        for _, device := range devices {
×
1638
                                                if device.GetDn() == obj.GetDn() {
×
1639
                                                        device.SetAttr("state", "disconnected")
×
1640
                                                        cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Debug("Opflex device cache updated for disconnected node")
×
1641
                                                }
×
1642
                                        }
1643
                                        cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", devices)
×
1644
                                        break
×
1645
                                }
1646
                        }
1647
                        cont.indexMutex.Unlock()
1✔
1648
                        cont.updateDeviceCluster()
1✔
1649
                        return
1✔
1650
                }
1651
                var nodeUpdates []string
1✔
1652

1✔
1653
                cont.indexMutex.Lock()
1✔
1654
                nodefound := false
1✔
1655
                for node, devices := range cont.nodeOpflexDevice {
2✔
1656
                        found := false
1✔
1657

1✔
1658
                        if node == obj.GetAttrStr("hostName") {
2✔
1659
                                nodefound = true
1✔
1660
                        }
1✔
1661

1662
                        for i, device := range devices {
2✔
1663
                                if device.GetDn() != obj.GetDn() {
2✔
1664
                                        continue
1✔
1665
                                }
1666
                                found = true
1✔
1667

1✔
1668
                                if obj.GetAttrStr("hostName") != node {
2✔
1669
                                        cont.fabricPathLogger(node, device).
1✔
1670
                                                Debug("Moving opflex device from node")
1✔
1671

1✔
1672
                                        devices = append(devices[:i], devices[i+1:]...)
1✔
1673
                                        cont.nodeOpflexDevice[node] = devices
1✔
1674
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1675
                                        break
1✔
1676
                                } else if (device.GetAttrStr("mac") != obj.GetAttrStr("mac")) ||
1✔
1677
                                        (device.GetAttrStr("fabricPathDn") != obj.GetAttrStr("fabricPathDn")) ||
1✔
1678
                                        (device.GetAttrStr("state") != obj.GetAttrStr("state")) {
2✔
1679
                                        cont.fabricPathLogger(node, obj).
1✔
1680
                                                Debug("Updating opflex device")
1✔
1681

1✔
1682
                                        devices = append(append(devices[:i], devices[i+1:]...), obj)
1✔
1683
                                        cont.nodeOpflexDevice[node] = devices
1✔
1684
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1685
                                        break
1✔
1686
                                }
1687
                        }
1688
                        if !found && obj.GetAttrStr("hostName") == node {
2✔
1689
                                cont.fabricPathLogger(node, obj).
1✔
1690
                                        Debug("Appending opflex device")
1✔
1691

1✔
1692
                                devices = append(devices, obj)
1✔
1693
                                cont.nodeOpflexDevice[node] = devices
1✔
1694
                                nodeUpdates = append(nodeUpdates, node)
1✔
1695
                        }
1✔
1696
                }
1697
                if !nodefound {
2✔
1698
                        node := obj.GetAttrStr("hostName")
1✔
1699
                        cont.fabricPathLogger(node, obj).Debug("Adding opflex device")
1✔
1700
                        cont.nodeOpflexDevice[node] = apicapi.ApicSlice{obj}
1✔
1701
                        nodeUpdates = append(nodeUpdates, node)
1✔
1702
                }
1✔
1703
                cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", cont.nodeOpflexDevice[obj.GetAttrStr("hostName")])
1✔
1704
                cont.indexMutex.Unlock()
1✔
1705

1✔
1706
                for _, node := range nodeUpdates {
2✔
1707
                        cont.env.NodeServiceChanged(node)
1✔
1708
                        cont.erspanSyncOpflexDev()
1✔
1709
                }
1✔
1710
                cont.updateDeviceCluster()
1✔
1711
        }
1712
}
1713

1714
func (cont *AciController) postOpflexDeviceDelete(nodes []string) {
1✔
1715
        cont.updateDeviceCluster()
1✔
1716
        for _, node := range nodes {
2✔
1717
                cont.env.NodeServiceChanged(node)
1✔
1718
                cont.erspanSyncOpflexDev()
1✔
1719
        }
1✔
1720
}
1721

1722
func (cont *AciController) opflexDeviceDeleted(dn string) {
1✔
1723
        var nodeUpdates []string
1✔
1724
        var dnFound bool //to check if the dn belongs to this cluster
1✔
1725
        cont.log.Info("Processing opflex device delete notification of ", dn)
1✔
1726
        cont.indexMutex.Lock()
1✔
1727
        for node, devices := range cont.nodeOpflexDevice {
2✔
1728
                for i, device := range devices {
2✔
1729
                        if device.GetDn() != dn {
2✔
1730
                                continue
1✔
1731
                        }
1732
                        dnFound = true
1✔
1733
                        cont.fabricPathLogger(node, device).
1✔
1734
                                Debug("Deleting opflex device path")
1✔
1735
                        devices = append(devices[:i], devices[i+1:]...)
1✔
1736
                        cont.nodeOpflexDevice[node] = devices
1✔
1737
                        cont.log.Info("Deleted opflex device of node ", node, ": ", dn)
1✔
1738
                        nodeUpdates = append(nodeUpdates, node)
1✔
1739
                        break
1✔
1740
                }
1741
                if len(devices) == 0 {
2✔
1742
                        delete(cont.nodeOpflexDevice, node)
1✔
1743
                }
1✔
1744
        }
1745

1746
        // For clusters other than OpenShift On OpenStack,
1747
        // openStackFabricPathDnMap will be empty
1748
        for host, opflexOdevInfo := range cont.openStackFabricPathDnMap {
1✔
1749
                if _, ok := opflexOdevInfo.opflexODevDn[dn]; ok {
×
1750
                        cont.log.Info("Received OpenStack opflexODev delete notification for ", dn)
×
1751
                        delete(opflexOdevInfo.opflexODevDn, dn)
×
1752
                        if len(opflexOdevInfo.opflexODevDn) < 1 {
×
1753
                                delete(cont.openStackFabricPathDnMap, host)
×
1754
                                cont.log.Info("OpenStack opflexODev of host ", host, " is deleted from cache")
×
1755
                                dnFound = true
×
1756
                        } else {
×
1757
                                cont.openStackFabricPathDnMap[host] = opflexOdevInfo
×
1758
                        }
×
1759
                        break
×
1760
                }
1761
        }
1762
        cont.indexMutex.Unlock()
1✔
1763

1✔
1764
        if dnFound {
2✔
1765
                cont.postOpflexDeviceDelete(nodeUpdates)
1✔
1766
        }
1✔
1767
}
1768

1769
func (cont *AciController) writeApicSvc(key string, service *v1.Service) {
1✔
1770
        if cont.isCNOEnabled() {
1✔
1771
                return
×
1772
        }
×
1773
        aobj := apicapi.NewVmmInjectedSvc(cont.vmmDomainProvider(),
1✔
1774
                cont.config.AciVmmDomain, cont.config.AciVmmController,
1✔
1775
                service.Namespace, service.Name)
1✔
1776
        aobjDn := aobj.GetDn()
1✔
1777
        aobj.SetAttr("guid", string(service.UID))
1✔
1778

1✔
1779
        svcns := service.ObjectMeta.Namespace
1✔
1780
        _, exists, err := cont.namespaceIndexer.GetByKey(svcns)
1✔
1781
        if err != nil {
1✔
1782
                cont.log.Error("Failed to lookup ns : ", svcns, " ", err)
×
1783
                return
×
1784
        }
×
1785
        if !exists {
2✔
1786
                cont.log.Debug("Namespace of service ", service.ObjectMeta.Name, ": ", svcns, " doesn't exist, hence not sending an update to the APIC")
1✔
1787
                return
1✔
1788
        }
1✔
1789

1790
        if !cont.serviceEndPoints.SetServiceApicObject(aobj, service) {
2✔
1791
                return
1✔
1792
        }
1✔
1793
        var setApicSvcDnsName bool
1✔
1794
        if len(cont.config.ApicHosts) != 0 && apicapi.ApicVersion >= "5.1" {
1✔
1795
                setApicSvcDnsName = true
×
1796
        }
×
1797
        // APIC model only allows one of these
1798
        for _, ingress := range service.Status.LoadBalancer.Ingress {
1✔
1799
                if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
1800
                        aobj.SetAttr("lbIp", ingress.IP)
×
1801
                } else if ingress.Hostname != "" {
×
1802
                        ipList, err := net.LookupHost(ingress.Hostname)
×
1803
                        if err == nil && len(ipList) > 0 {
×
1804
                                aobj.SetAttr("lbIp", ipList[0])
×
1805
                        } else {
×
1806
                                cont.log.Errorf("Lookup: err: %v, ipList: %+v", err, ipList)
×
1807
                        }
×
1808
                }
1809
                break
×
1810
        }
1811
        if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != "None" {
2✔
1812
                aobj.SetAttr("clusterIp", service.Spec.ClusterIP)
1✔
1813
        }
1✔
1814

1815
        var t string
1✔
1816
        switch service.Spec.Type {
1✔
1817
        case v1.ServiceTypeClusterIP:
×
1818
                t = "clusterIp"
×
1819
        case v1.ServiceTypeNodePort:
×
1820
                t = "nodePort"
×
1821
        case v1.ServiceTypeLoadBalancer:
1✔
1822
                t = "loadBalancer"
1✔
1823
        case v1.ServiceTypeExternalName:
×
1824
                t = "externalName"
×
1825
        }
1826
        if t != "" {
2✔
1827
                aobj.SetAttr("type", t)
1✔
1828
        }
1✔
1829

1830
        if setApicSvcDnsName || cont.config.Flavor == "k8s-overlay" {
1✔
1831
                dnsName := fmt.Sprintf("%s.%s.svc.cluster.local", service.Name, service.Namespace)
×
1832

×
1833
                for _, ingress := range service.Status.LoadBalancer.Ingress {
×
1834
                        if ingress.Hostname != "" {
×
1835
                                aobj.SetAttr("dnsName", ingress.Hostname)
×
1836
                        } else if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
1837
                                aobj.SetAttr("dnsName", dnsName)
×
1838
                        }
×
1839
                }
1840
                if t == "clusterIp" || t == "nodePort" || t == "externalName" {
×
1841
                        aobj.SetAttr("dnsName", dnsName)
×
1842
                }
×
1843
        }
1844
        for _, port := range service.Spec.Ports {
2✔
1845
                proto := getProtocolStr(port.Protocol)
1✔
1846
                p := apicapi.NewVmmInjectedSvcPort(aobjDn,
1✔
1847
                        strconv.Itoa(int(port.Port)), proto, port.TargetPort.String())
1✔
1848
                p.SetAttr("nodePort", strconv.Itoa(int(port.NodePort)))
1✔
1849
                aobj.AddChild(p)
1✔
1850
        }
1✔
1851
        if cont.config.EnableVmmInjectedLabels && service.ObjectMeta.Labels != nil && apicapi.ApicVersion >= "5.2" {
1✔
1852
                for key, val := range service.ObjectMeta.Labels {
×
1853
                        newLabelKey := cont.aciNameForKey("label", key)
×
1854
                        label := apicapi.NewVmmInjectedLabel(aobj.GetDn(),
×
1855
                                newLabelKey, val)
×
1856
                        aobj.AddChild(label)
×
1857
                }
×
1858
        }
1859
        name := cont.aciNameForKey("service-vmm", key)
1✔
1860
        cont.log.Debug("Write Service Object: ", aobj)
1✔
1861
        cont.apicConn.WriteApicObjects(name, apicapi.ApicSlice{aobj})
1✔
1862
        cont.log.Debugf("svcObject: %+v", aobj)
1✔
1863
}
1864

1865
func removeAllConditions(conditions []metav1.Condition, conditionType string) []metav1.Condition {
1✔
1866
        i := 0
1✔
1867
        for _, cond := range conditions {
1✔
1868
                if cond.Type != conditionType {
×
1869
                        conditions[i] = cond
×
1870
                }
×
1871
        }
1872
        return conditions[:i]
1✔
1873
}
1874

1875
func (cont *AciController) updateServiceCondition(service *v1.Service, success bool, reason string, message string) bool {
1✔
1876
        conditionType := "LbIpamAllocation"
1✔
1877

1✔
1878
        var condition metav1.Condition
1✔
1879
        if success {
2✔
1880
                condition.Status = metav1.ConditionTrue
1✔
1881
        } else {
2✔
1882
                condition.Status = metav1.ConditionFalse
1✔
1883
                condition.Message = message
1✔
1884
        }
1✔
1885
        condition.Type = conditionType
1✔
1886
        condition.Reason = reason
1✔
1887
        condition.LastTransitionTime = metav1.Time{time.Now()}
1✔
1888
        for _, cond := range service.Status.Conditions {
2✔
1889
                if cond.Type == conditionType &&
1✔
1890
                        cond.Status == condition.Status &&
1✔
1891
                        cond.Message == condition.Message &&
1✔
1892
                        cond.Reason == condition.Reason {
2✔
1893
                        return false
1✔
1894
                }
1✔
1895
        }
1896

1897
        service.Status.Conditions = removeAllConditions(service.Status.Conditions, conditionType)
1✔
1898
        service.Status.Conditions = append(service.Status.Conditions, condition)
1✔
1899
        return true
1✔
1900
}
1901

1902
func (cont *AciController) validateRequestedIps(lbIpList []string) (net.IP, net.IP, bool) {
1✔
1903
        var ipv4, ipv6 net.IP
1✔
1904
        for _, lbIp := range lbIpList {
2✔
1905
                ip := net.ParseIP(lbIp)
1✔
1906
                if ip != nil {
2✔
1907
                        if ip.To4() != nil {
2✔
1908
                                if ipv4.Equal(net.IP{}) {
2✔
1909
                                        ipv4 = ip
1✔
1910
                                } else {
2✔
1911
                                        cont.log.Error("Annotation should have only one ipv4")
1✔
1912
                                        return ipv4, ipv6, false
1✔
1913
                                }
1✔
1914
                        } else if ip.To16() != nil {
2✔
1915
                                if ipv6.Equal(net.IP{}) {
2✔
1916
                                        ipv6 = ip
1✔
1917
                                } else {
2✔
1918
                                        cont.log.Error("Annotation should have only one ipv6")
1✔
1919
                                        return ipv4, ipv6, false
1✔
1920
                                }
1✔
1921
                        }
1922
                }
1923
        }
1924
        return ipv4, ipv6, true
1✔
1925
}
1926

1927
func (cont *AciController) returnUnusedStaticIngressIps(staticIngressIps, requestedIps []net.IP) {
1✔
1928
        for _, staticIp := range staticIngressIps {
2✔
1929
                found := false
1✔
1930
                for _, reqIp := range requestedIps {
2✔
1931
                        if reqIp.Equal(staticIp) {
2✔
1932
                                found = true
1✔
1933
                        }
1✔
1934
                }
1935
                if !found {
2✔
1936
                        returnIps(cont.staticServiceIps, []net.IP{staticIp})
1✔
1937
                }
1✔
1938
        }
1939
}
1940

1941
func (cont *AciController) allocateServiceIps(servicekey string,
1942
        service *v1.Service) bool {
1✔
1943
        logger := serviceLogger(cont.log, service)
1✔
1944
        cont.indexMutex.Lock()
1✔
1945
        meta, ok := cont.serviceMetaCache[servicekey]
1✔
1946
        if !ok {
2✔
1947
                meta = &serviceMeta{}
1✔
1948
                cont.serviceMetaCache[servicekey] = meta
1✔
1949

1✔
1950
                // Read any existing IPs and attempt to allocate them to the pod
1✔
1951
                for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
1952
                        ip := net.ParseIP(ingress.IP)
1✔
1953
                        if ip == nil {
1✔
1954
                                continue
×
1955
                        }
1956
                        if ip.To4() != nil {
2✔
1957
                                if cont.serviceIps.GetV4IpCache()[0].RemoveIp(ip) {
2✔
1958
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
1959
                                } else if cont.staticServiceIps.V4.RemoveIp(ip) {
3✔
1960
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
1961
                                }
1✔
1962
                        } else if ip.To16() != nil {
2✔
1963
                                if cont.serviceIps.GetV6IpCache()[0].RemoveIp(ip) {
2✔
1964
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
1965
                                } else if cont.staticServiceIps.V6.RemoveIp(ip) {
3✔
1966
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
1967
                                }
1✔
1968
                        }
1969
                }
1970
        }
1971

1972
        if !cont.serviceSyncEnabled {
2✔
1973
                cont.indexMutex.Unlock()
1✔
1974
                return false
1✔
1975
        }
1✔
1976

1977
        var requestedIps []net.IP
1✔
1978
        // try to give the requested load balancer IP to the pod
1✔
1979
        lbIps, ok := service.ObjectMeta.Annotations[metadata.LbIpAnnotation]
1✔
1980
        if ok {
2✔
1981
                lbIpList := strings.Split(lbIps, ",")
1✔
1982
                ipv4, ipv6, valid := cont.validateRequestedIps(lbIpList)
1✔
1983
                if valid {
2✔
1984
                        if ipv4 != nil {
2✔
1985
                                requestedIps = append(requestedIps, ipv4)
1✔
1986
                        }
1✔
1987
                        if ipv6 != nil {
2✔
1988
                                requestedIps = append(requestedIps, ipv6)
1✔
1989
                        }
1✔
1990
                } else {
1✔
1991
                        cont.returnServiceIps(meta.ingressIps)
1✔
1992
                        cont.log.Error("Invalid LB IP annotation for service ", servicekey)
1✔
1993
                        condUpdated := cont.updateServiceCondition(service, false, "InvalidAnnotation", "Invalid Loadbalancer IP annotation")
1✔
1994
                        if condUpdated {
2✔
1995
                                _, err := cont.updateServiceStatus(service)
1✔
1996
                                if err != nil {
1✔
1997
                                        logger.Error("Failed to update service status : ", err)
×
1998
                                        cont.indexMutex.Unlock()
×
1999
                                        return true
×
2000
                                }
×
2001
                        }
2002
                        cont.indexMutex.Unlock()
1✔
2003
                        return false
1✔
2004
                }
2005
        } else {
1✔
2006
                requestedIp := net.ParseIP(service.Spec.LoadBalancerIP)
1✔
2007
                if requestedIp != nil {
2✔
2008
                        requestedIps = append(requestedIps, requestedIp)
1✔
2009
                }
1✔
2010
        }
2011
        if len(requestedIps) > 0 {
2✔
2012
                meta.requestedIps = []net.IP{}
1✔
2013
                for _, requestedIp := range requestedIps {
2✔
2014
                        hasRequestedIp := false
1✔
2015
                        for _, ip := range meta.staticIngressIps {
2✔
2016
                                if reflect.DeepEqual(requestedIp, ip) {
2✔
2017
                                        hasRequestedIp = true
1✔
2018
                                }
1✔
2019
                        }
2020
                        if !hasRequestedIp {
2✔
2021
                                if requestedIp.To4() != nil &&
1✔
2022
                                        cont.staticServiceIps.V4.RemoveIp(requestedIp) {
2✔
2023
                                        hasRequestedIp = true
1✔
2024
                                } else if requestedIp.To16() != nil &&
2✔
2025
                                        cont.staticServiceIps.V6.RemoveIp(requestedIp) {
2✔
2026
                                        hasRequestedIp = true
1✔
2027
                                }
1✔
2028
                        }
2029
                        if hasRequestedIp {
2✔
2030
                                meta.requestedIps = append(meta.requestedIps, requestedIp)
1✔
2031
                        }
1✔
2032
                }
2033
                cont.returnUnusedStaticIngressIps(meta.staticIngressIps, meta.requestedIps)
1✔
2034
                meta.staticIngressIps = meta.requestedIps
1✔
2035
                cont.returnServiceIps(meta.ingressIps)
1✔
2036
                meta.ingressIps = nil
1✔
2037
                // If no requested ips are allocatable
1✔
2038
                if len(meta.requestedIps) < 1 {
2✔
2039
                        logger.Error("No Requested Ip addresses available for service ", servicekey)
1✔
2040
                        condUpdated := cont.updateServiceCondition(service, false, "RequestedIpsNotAllocatable", "The requested ips for loadbalancer service are not available or not in extern static range")
1✔
2041
                        if condUpdated {
2✔
2042
                                _, err := cont.updateServiceStatus(service)
1✔
2043
                                if err != nil {
1✔
2044
                                        cont.indexMutex.Unlock()
×
2045
                                        logger.Error("Failed to update service status: ", err)
×
2046
                                        return true
×
2047
                                }
×
2048
                        }
2049
                        cont.indexMutex.Unlock()
1✔
2050
                        return false
1✔
2051
                }
2052
        } else if len(meta.requestedIps) > 0 {
1✔
2053
                meta.requestedIps = nil
×
2054
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
×
2055
                meta.staticIngressIps = nil
×
2056
        }
×
2057
        ingressIps := make([]net.IP, 0)
1✔
2058
        ingressIps = append(ingressIps, meta.ingressIps...)
1✔
2059
        ingressIps = append(ingressIps, meta.staticIngressIps...)
1✔
2060

1✔
2061
        var ipv4, ipv6 net.IP
1✔
2062
        for _, ip := range ingressIps {
2✔
2063
                if ip.To4() != nil {
2✔
2064
                        ipv4 = ip
1✔
2065
                } else if ip.To16() != nil {
3✔
2066
                        ipv6 = ip
1✔
2067
                }
1✔
2068
        }
2069
        var clusterIPv4, clusterIPv6 net.IP
1✔
2070
        clusterIPs := append([]string{service.Spec.ClusterIP}, service.Spec.ClusterIPs...)
1✔
2071
        for _, ipStr := range clusterIPs {
2✔
2072
                ip := net.ParseIP(ipStr)
1✔
2073
                if ip == nil {
1✔
2074
                        continue
×
2075
                }
2076
                if ip.To4() != nil && clusterIPv4 == nil {
2✔
2077
                        clusterIPv4 = ip
1✔
2078
                } else if ip.To16() != nil && strings.Contains(ip.String(), ":") && clusterIPv6 == nil {
3✔
2079
                        clusterIPv6 = ip
1✔
2080
                }
1✔
2081
        }
2082
        if clusterIPv4 != nil && ipv4 == nil {
2✔
2083
                if len(requestedIps) < 1 {
2✔
2084
                        ipv4, _ = cont.serviceIps.AllocateIp(true)
1✔
2085
                        if ipv4 != nil {
2✔
2086
                                ingressIps = append(ingressIps, ipv4)
1✔
2087
                        }
1✔
2088
                }
2089
        } else if clusterIPv4 == nil && ipv4 != nil {
1✔
2090
                cont.removeIpFromIngressIPList(&ingressIps, ipv4)
×
2091
        }
×
2092

2093
        if clusterIPv6 != nil && ipv6 == nil {
2✔
2094
                if len(requestedIps) < 1 {
2✔
2095
                        ipv6, _ = cont.serviceIps.AllocateIp(false)
1✔
2096
                        if ipv6 != nil {
2✔
2097
                                ingressIps = append(ingressIps, ipv6)
1✔
2098
                        }
1✔
2099
                }
2100
        } else if clusterIPv6 == nil && ipv6 != nil {
1✔
2101
                cont.removeIpFromIngressIPList(&ingressIps, ipv6)
×
2102
        }
×
2103

2104
        if len(requestedIps) < 1 {
2✔
2105
                meta.ingressIps = ingressIps
1✔
2106
        }
1✔
2107
        if ipv4 == nil && ipv6 == nil {
2✔
2108
                logger.Error("No IP addresses available for service")
1✔
2109
                cont.indexMutex.Unlock()
1✔
2110
                return true
1✔
2111
        }
1✔
2112
        cont.indexMutex.Unlock()
1✔
2113
        var newIngress []v1.LoadBalancerIngress
1✔
2114
        for _, ip := range meta.ingressIps {
2✔
2115
                newIngress = append(newIngress, v1.LoadBalancerIngress{IP: ip.String()})
1✔
2116
        }
1✔
2117
        for _, ip := range meta.staticIngressIps {
2✔
2118
                newIngress = append(newIngress, v1.LoadBalancerIngress{IP: ip.String()})
1✔
2119
        }
1✔
2120

2121
        ipUpdated := false
1✔
2122
        if !reflect.DeepEqual(newIngress, service.Status.LoadBalancer.Ingress) {
2✔
2123
                service.Status.LoadBalancer.Ingress = newIngress
1✔
2124

1✔
2125
                logger.WithFields(logrus.Fields{
1✔
2126
                        "status": service.Status.LoadBalancer.Ingress,
1✔
2127
                }).Info("Updating service load balancer status")
1✔
2128

1✔
2129
                ipUpdated = true
1✔
2130
        }
1✔
2131

2132
        success := true
1✔
2133
        reason := "Success"
1✔
2134
        message := ""
1✔
2135
        if len(requestedIps) > 0 && len(requestedIps) != len(meta.staticIngressIps) {
1✔
2136
                success = false
×
2137
                reason = "OneIpNotAllocatable"
×
2138
                message = "One of the requested Ips is not allocatable"
×
2139
        }
×
2140
        condUpdated := cont.updateServiceCondition(service, success, reason, message)
1✔
2141
        if ipUpdated || condUpdated {
2✔
2142
                _, err := cont.updateServiceStatus(service)
1✔
2143
                if err != nil {
1✔
2144
                        logger.Error("Failed to update service status: ", err)
×
2145
                        return true
×
2146
                }
×
2147
        }
2148
        return false
1✔
2149
}
2150

2151
func (cont *AciController) handleServiceDelete(servicekey string) bool {
1✔
2152
        if cont.isCNOEnabled() {
1✔
2153
                return false
×
2154
        }
×
2155
        cont.clearLbService(servicekey)
1✔
2156
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("service-vmm",
1✔
2157
                servicekey))
1✔
2158
        return false
1✔
2159
}
2160

2161
func (cont *AciController) handleServiceUpdate(service *v1.Service) bool {
1✔
2162
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2163
        if err != nil {
1✔
2164
                serviceLogger(cont.log, service).
×
2165
                        Error("Could not create service key: ", err)
×
2166
                return false
×
2167
        }
×
2168
        if cont.isCNOEnabled() {
1✔
2169
                return false
×
2170
        }
×
2171
        var requeue bool
1✔
2172
        isLoadBalancer := service.Spec.Type == v1.ServiceTypeLoadBalancer
1✔
2173
        if isLoadBalancer {
2✔
2174
                if *cont.config.AllocateServiceIps {
2✔
2175
                        requeue = cont.allocateServiceIps(servicekey, service)
1✔
2176
                }
1✔
2177
                cont.indexMutex.Lock()
1✔
2178
                if cont.serviceSyncEnabled {
2✔
2179
                        cont.indexMutex.Unlock()
1✔
2180
                        err = cont.updateServiceDeviceInstance(servicekey, service)
1✔
2181
                        if err != nil {
1✔
2182
                                serviceLogger(cont.log, service).
×
2183
                                        Error("Failed to update service device Instance: ", err)
×
2184
                                return true
×
2185
                        }
×
2186
                } else {
1✔
2187
                        cont.indexMutex.Unlock()
1✔
2188
                }
1✔
2189
        } else {
1✔
2190
                cont.clearLbService(servicekey)
1✔
2191
        }
1✔
2192
        cont.writeApicSvc(servicekey, service)
1✔
2193
        return requeue
1✔
2194
}
2195

2196
func (cont *AciController) clearLbService(servicekey string) {
1✔
2197
        cont.indexMutex.Lock()
1✔
2198
        if meta, ok := cont.serviceMetaCache[servicekey]; ok {
2✔
2199
                cont.returnServiceIps(meta.ingressIps)
1✔
2200
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
1✔
2201
                delete(cont.serviceMetaCache, servicekey)
1✔
2202
        }
1✔
2203
        cont.indexMutex.Unlock()
1✔
2204
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("svc", servicekey))
1✔
2205
}
2206

2207
func getEndpointsIps(endpoints *v1.Endpoints) map[string]bool {
1✔
2208
        ips := make(map[string]bool)
1✔
2209
        for _, subset := range endpoints.Subsets {
2✔
2210
                for _, addr := range subset.Addresses {
2✔
2211
                        ips[addr.IP] = true
1✔
2212
                }
1✔
2213
                for _, addr := range subset.NotReadyAddresses {
1✔
2214
                        ips[addr.IP] = true
×
2215
                }
×
2216
        }
2217
        return ips
1✔
2218
}
2219

2220
func (cont *AciController) processServiceTargetPorts(service *v1.Service, svcKey string, old bool) map[string]targetPort {
1✔
2221
        ports := make(map[string]targetPort)
1✔
2222
        for _, port := range service.Spec.Ports {
2✔
2223
                var key string
1✔
2224
                portnums := make(map[int]bool)
1✔
2225

1✔
2226
                if port.TargetPort.Type == intstr.String {
1✔
NEW
2227
                        entry, exists := cont.namedPortServiceIndex[svcKey]
×
NEW
2228
                        if !old {
×
NEW
2229
                                if !exists {
×
NEW
2230
                                        cont.log.Debugf("Creating named port index for service: %s, port: %s", svcKey, port.Name)
×
NEW
2231
                                        newEntry := make(namedPortServiceIndexEntry)
×
NEW
2232
                                        entry = &newEntry
×
NEW
2233
                                }
×
NEW
2234
                                (*entry)[port.Name] = &namedPortServiceIndexPort{
×
NEW
2235
                                        targetPortName: port.TargetPort.String(),
×
NEW
2236
                                        resolvedPorts:  make(map[int]bool),
×
NEW
2237
                                }
×
NEW
2238
                                cont.namedPortServiceIndex[svcKey] = entry
×
NEW
2239
                        } else if exists {
×
NEW
2240
                                delete(*entry, port.Name)
×
NEW
2241
                                cont.log.Debugf("Removed named port index for service: %s port: %s, entry: %v", svcKey, port.Name, entry)
×
NEW
2242
                                if len(*entry) == 0 {
×
NEW
2243
                                        delete(cont.namedPortServiceIndex, svcKey)
×
NEW
2244
                                } else {
×
NEW
2245
                                        cont.namedPortServiceIndex[svcKey] = entry
×
NEW
2246
                                }
×
2247
                        }
NEW
2248
                        key = portProto(&port.Protocol) + "-name-" + port.TargetPort.String()
×
2249
                } else {
1✔
2250
                        portNum := port.TargetPort.IntValue()
1✔
2251
                        if portNum <= 0 {
2✔
2252
                                portNum = int(port.Port)
1✔
2253
                        }
1✔
2254
                        key = portProto(&port.Protocol) + "-num-" + strconv.Itoa(portNum)
1✔
2255
                        portnums[portNum] = true
1✔
2256
                }
2257

2258
                ports[key] = targetPort{
1✔
2259
                        proto: port.Protocol,
1✔
2260
                        ports: portnums,
1✔
2261
                }
1✔
2262
        }
2263
        return ports
1✔
2264
}
2265

2266
func (cont *AciController) endpointsAdded(obj interface{}) {
1✔
2267
        endpoints := obj.(*v1.Endpoints)
1✔
2268
        servicekey, err := cache.MetaNamespaceKeyFunc(obj.(*v1.Endpoints))
1✔
2269
        if err != nil {
1✔
2270
                cont.log.Error("Could not create service key: ", err)
×
2271
                return
×
2272
        }
×
2273

2274
        ips := getEndpointsIps(endpoints)
1✔
2275
        cont.indexMutex.Lock()
1✔
2276
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
2277
        cont.queueIPNetPolUpdates(ips)
1✔
2278
        cont.indexMutex.Unlock()
1✔
2279

1✔
2280
        cont.queueEndpointsNetPolUpdates(endpoints)
1✔
2281

1✔
2282
        cont.queueServiceUpdateByKey(servicekey)
1✔
2283
}
2284

2285
func (cont *AciController) endpointsDeleted(obj interface{}) {
1✔
2286
        endpoints, isEndpoints := obj.(*v1.Endpoints)
1✔
2287
        if !isEndpoints {
1✔
2288
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2289
                if !ok {
×
2290
                        cont.log.Error("Received unexpected object: ", obj)
×
2291
                        return
×
2292
                }
×
2293
                endpoints, ok = deletedState.Obj.(*v1.Endpoints)
×
2294
                if !ok {
×
2295
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpoints object: ", deletedState.Obj)
×
2296
                        return
×
2297
                }
×
2298
        }
2299
        servicekey, err := cache.MetaNamespaceKeyFunc(endpoints)
1✔
2300
        if err != nil {
1✔
2301
                cont.log.Error("Could not create service key: ", err)
×
2302
                return
×
2303
        }
×
2304

2305
        ips := getEndpointsIps(endpoints)
1✔
2306
        cont.indexMutex.Lock()
1✔
2307
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
2308
        cont.queueIPNetPolUpdates(ips)
1✔
2309
        cont.indexMutex.Unlock()
1✔
2310

1✔
2311
        cont.queueEndpointsNetPolUpdates(endpoints)
1✔
2312

1✔
2313
        cont.queueServiceUpdateByKey(servicekey)
1✔
2314
}
2315

2316
func (cont *AciController) endpointsUpdated(oldEps, newEps interface{}) {
1✔
2317
        oldendpoints := oldEps.(*v1.Endpoints)
1✔
2318
        newendpoints := newEps.(*v1.Endpoints)
1✔
2319
        servicekey, err := cache.MetaNamespaceKeyFunc(newendpoints)
1✔
2320
        if err != nil {
1✔
2321
                cont.log.Error("Could not create service key: ", err)
×
2322
                return
×
2323
        }
×
2324

2325
        oldIps := getEndpointsIps(oldendpoints)
1✔
2326
        newIps := getEndpointsIps(newendpoints)
1✔
2327
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
2328
                cont.indexMutex.Lock()
1✔
2329
                cont.queueIPNetPolUpdates(oldIps)
1✔
2330
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
2331
                cont.queueIPNetPolUpdates(newIps)
1✔
2332
                cont.indexMutex.Unlock()
1✔
2333
        }
1✔
2334

2335
        if !reflect.DeepEqual(oldendpoints.Subsets, newendpoints.Subsets) {
2✔
2336
                cont.queueEndpointsNetPolUpdates(oldendpoints)
1✔
2337
                cont.queueEndpointsNetPolUpdates(newendpoints)
1✔
2338
        }
1✔
2339

2340
        cont.queueServiceUpdateByKey(servicekey)
1✔
2341
}
2342

2343
func (cont *AciController) serviceAdded(obj interface{}) {
1✔
2344
        service := obj.(*v1.Service)
1✔
2345
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2346
        if err != nil {
1✔
2347
                serviceLogger(cont.log, service).
×
2348
                        Error("Could not create service key: ", err)
×
2349
                return
×
2350
        }
×
2351

2352
        cont.indexMutex.Lock()
1✔
2353
        ports := cont.processServiceTargetPorts(service, servicekey, false)
1✔
2354
        cont.queuePortNetPolUpdates(ports)
1✔
2355
        cont.updateTargetPortIndex(true, servicekey, nil, ports)
1✔
2356
        cont.indexMutex.Unlock()
1✔
2357

1✔
2358
        cont.queueServiceUpdateByKey(servicekey)
1✔
2359
}
2360

2361
func (cont *AciController) serviceUpdated(oldSvc, newSvc interface{}) {
1✔
2362
        oldservice := oldSvc.(*v1.Service)
1✔
2363
        newservice := newSvc.(*v1.Service)
1✔
2364
        servicekey, err := cache.MetaNamespaceKeyFunc(newservice)
1✔
2365
        if err != nil {
1✔
2366
                serviceLogger(cont.log, newservice).
×
2367
                        Error("Could not create service key: ", err)
×
2368
                return
×
2369
        }
×
2370
        if !reflect.DeepEqual(oldservice.Spec.Ports, newservice.Spec.Ports) {
1✔
UNCOV
2371
                cont.indexMutex.Lock()
×
NEW
2372
                oldPorts := cont.processServiceTargetPorts(oldservice, servicekey, true)
×
NEW
2373
                newPorts := cont.processServiceTargetPorts(newservice, servicekey, false)
×
2374
                cont.queuePortNetPolUpdates(oldPorts)
×
2375
                cont.updateTargetPortIndex(true, servicekey, oldPorts, newPorts)
×
2376
                cont.queuePortNetPolUpdates(newPorts)
×
2377
                cont.indexMutex.Unlock()
×
2378
        }
×
2379
        cont.queueServiceUpdateByKey(servicekey)
1✔
2380
}
2381

2382
func (cont *AciController) serviceDeleted(obj interface{}) {
1✔
2383
        service, isService := obj.(*v1.Service)
1✔
2384
        if !isService {
1✔
2385
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2386
                if !ok {
×
2387
                        serviceLogger(cont.log, service).
×
2388
                                Error("Received unexpected object: ", obj)
×
2389
                        return
×
2390
                }
×
2391
                service, ok = deletedState.Obj.(*v1.Service)
×
2392
                if !ok {
×
2393
                        serviceLogger(cont.log, service).
×
2394
                                Error("DeletedFinalStateUnknown contained non-Services object: ", deletedState.Obj)
×
2395
                        return
×
2396
                }
×
2397
        }
2398
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2399
        if err != nil {
1✔
2400
                serviceLogger(cont.log, service).
×
2401
                        Error("Could not create service key: ", err)
×
2402
                return
×
2403
        }
×
2404

2405
        cont.indexMutex.Lock()
1✔
2406
        ports := cont.processServiceTargetPorts(service, servicekey, true)
1✔
2407
        cont.updateTargetPortIndex(true, servicekey, ports, nil)
1✔
2408
        cont.queuePortNetPolUpdates(ports)
1✔
2409
        delete(cont.snatServices, servicekey)
1✔
2410
        cont.indexMutex.Unlock()
1✔
2411

1✔
2412
        deletedServiceKey := "DELETED_" + servicekey
1✔
2413
        cont.queueServiceUpdateByKey(deletedServiceKey)
1✔
2414
}
2415

2416
func (cont *AciController) serviceFullSync() {
1✔
2417
        cache.ListAll(cont.serviceIndexer, labels.Everything(),
1✔
2418
                func(sobj interface{}) {
2✔
2419
                        cont.queueServiceUpdate(sobj.(*v1.Service))
1✔
2420
                })
1✔
2421
}
2422

2423
func (cont *AciController) getEndpointSliceIps(endpointSlice *discovery.EndpointSlice) map[string]bool {
1✔
2424
        ips := make(map[string]bool)
1✔
2425
        for _, endpoints := range endpointSlice.Endpoints {
2✔
2426
                for _, addr := range endpoints.Addresses {
2✔
2427
                        ips[addr] = true
1✔
2428
                }
1✔
2429
        }
2430
        return ips
1✔
2431
}
2432

2433
func (cont *AciController) notReadyEndpointPresent(endpointSlice *discovery.EndpointSlice) bool {
×
2434
        for _, endpoints := range endpointSlice.Endpoints {
×
2435
                if (endpoints.Conditions.Ready != nil && !*endpoints.Conditions.Ready) &&
×
2436
                        (endpoints.Conditions.Terminating == nil || !*endpoints.Conditions.Terminating) {
×
2437
                        return true
×
2438
                }
×
2439
        }
2440
        return false
×
2441
}
2442

2443
func (cont *AciController) getEndpointSliceEpIps(endpoints *discovery.Endpoint) map[string]bool {
×
2444
        ips := make(map[string]bool)
×
2445
        for _, addr := range endpoints.Addresses {
×
2446
                ips[addr] = true
×
2447
        }
×
2448
        return ips
×
2449
}
2450

2451
func (cont *AciController) processDelayedEpSlices() {
1✔
2452
        var processEps []DelayedEpSlice
1✔
2453
        cont.indexMutex.Lock()
1✔
2454
        for i := 0; i < len(cont.delayedEpSlices); i++ {
1✔
2455
                delayedepslice := cont.delayedEpSlices[i]
×
2456
                if time.Now().After(delayedepslice.DelayedTime) {
×
2457
                        var toprocess DelayedEpSlice
×
2458
                        err := util.DeepCopyObj(&delayedepslice, &toprocess)
×
2459
                        if err != nil {
×
2460
                                cont.log.Error(err)
×
2461
                                continue
×
2462
                        }
2463
                        processEps = append(processEps, toprocess)
×
2464
                        cont.delayedEpSlices = append(cont.delayedEpSlices[:i], cont.delayedEpSlices[i+1:]...)
×
2465
                }
2466
        }
2467

2468
        cont.indexMutex.Unlock()
1✔
2469
        for _, epslice := range processEps {
1✔
2470
                //ignore the epslice if newly added endpoint is not ready
×
2471
                if cont.notReadyEndpointPresent(epslice.NewEpSlice) {
×
2472
                        cont.log.Debug("Ignoring the update as the new endpoint is not ready : ", epslice.NewEpSlice)
×
2473
                } else {
×
2474
                        cont.log.Debug("Processing update of epslice : ", epslice.NewEpSlice)
×
2475
                        cont.doendpointSliceUpdated(epslice.OldEpSlice, epslice.NewEpSlice)
×
2476
                }
×
2477
        }
2478
}
2479

2480
func (cont *AciController) resolveServiceNamedPortFromEpSlice(epSlice *discovery.EndpointSlice, serviceKey string, old bool) {
1✔
2481
        indexEntry, ok := cont.namedPortServiceIndex[serviceKey]
1✔
2482
        if !ok {
2✔
2483
                return
1✔
2484
        }
1✔
NEW
2485
        for _, port := range epSlice.Ports {
×
NEW
2486
                if port.Name == nil || port.Port == nil {
×
NEW
2487
                        continue
×
2488
                }
NEW
2489
                if portEntry, ok := (*indexEntry)[*port.Name]; ok && portEntry != nil {
×
NEW
2490
                        portNum := int(*port.Port)
×
NEW
2491
                        if old {
×
NEW
2492
                                delete(portEntry.resolvedPorts, portNum)
×
NEW
2493
                                cont.log.Debugf("Deleting port: %d from service %s resolved target port. Resolved ports: %v", portNum, serviceKey, portEntry.resolvedPorts)
×
NEW
2494
                        } else {
×
NEW
2495
                                portEntry.resolvedPorts[portNum] = true
×
NEW
2496
                                cont.log.Debugf("Adding port: %d to service %s resolved target port. Resolved ports: %v", portNum, serviceKey, portEntry.resolvedPorts)
×
NEW
2497
                        }
×
NEW
2498
                        key := portProto(port.Protocol) + "-num-" + strconv.Itoa(portNum)
×
NEW
2499
                        targetPortIndexEntry := cont.targetPortIndex[key]
×
NEW
2500
                        if targetPortIndexEntry == nil && len(portEntry.resolvedPorts) == 1 {
×
NEW
2501
                                targetPortIndexEntry = &portIndexEntry{
×
NEW
2502
                                        port: targetPort{
×
NEW
2503
                                                proto: *port.Protocol,
×
NEW
2504
                                                ports: make(map[int]bool),
×
NEW
2505
                                        },
×
NEW
2506
                                        serviceKeys:       make(map[string]bool),
×
NEW
2507
                                        networkPolicyKeys: make(map[string]bool),
×
NEW
2508
                                }
×
NEW
2509
                                targetPortIndexEntry.port.ports[portNum] = true
×
NEW
2510
                                cont.targetPortIndex[key] = targetPortIndexEntry
×
NEW
2511
                        }
×
NEW
2512
                        if targetPortIndexEntry != nil {
×
NEW
2513
                                if len(portEntry.resolvedPorts) == 1 {
×
NEW
2514
                                        targetPortIndexEntry.serviceKeys[serviceKey] = true
×
NEW
2515
                                } else {
×
NEW
2516
                                        delete(targetPortIndexEntry.serviceKeys, serviceKey)
×
NEW
2517
                                }
×
2518
                        }
2519
                }
2520
        }
2521
}
2522
func (cont *AciController) endpointSliceAdded(obj interface{}) {
1✔
2523
        endpointslice, ok := obj.(*discovery.EndpointSlice)
1✔
2524
        if !ok {
1✔
2525
                cont.log.Error("error processing Endpointslice object: ", obj)
×
2526
                return
×
2527
        }
×
2528
        servicekey, valid := getServiceKey(endpointslice)
1✔
2529
        if !valid {
1✔
2530
                return
×
2531
        }
×
2532
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
2533
        cont.indexMutex.Lock()
1✔
2534
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
2535
        cont.resolveServiceNamedPortFromEpSlice(endpointslice, servicekey, false)
1✔
2536
        cont.queueIPNetPolUpdates(ips)
1✔
2537
        cont.indexMutex.Unlock()
1✔
2538

1✔
2539
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
2540

1✔
2541
        cont.queueServiceUpdateByKey(servicekey)
1✔
2542
        cont.log.Info("EndPointSlice Object Added: ", servicekey)
1✔
2543
}
2544

2545
func (cont *AciController) endpointSliceDeleted(obj interface{}) {
1✔
2546
        endpointslice, isEndpointslice := obj.(*discovery.EndpointSlice)
1✔
2547
        if !isEndpointslice {
1✔
2548
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
2549
                if !ok {
×
2550
                        cont.log.Error("Received unexpected object: ", obj)
×
2551
                        return
×
2552
                }
×
2553
                endpointslice, ok = deletedState.Obj.(*discovery.EndpointSlice)
×
2554
                if !ok {
×
2555
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpointslice object: ", deletedState.Obj)
×
2556
                        return
×
2557
                }
×
2558
        }
2559
        servicekey, valid := getServiceKey(endpointslice)
1✔
2560
        if !valid {
1✔
2561
                return
×
2562
        }
×
2563
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
2564
        cont.indexMutex.Lock()
1✔
2565
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
2566
        cont.resolveServiceNamedPortFromEpSlice(endpointslice, servicekey, true)
1✔
2567
        cont.queueIPNetPolUpdates(ips)
1✔
2568
        cont.indexMutex.Unlock()
1✔
2569
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
2570
        cont.queueServiceUpdateByKey(servicekey)
1✔
2571
}
2572

2573
// Checks if the given service is present in the user configured list of services
2574
// for pbr delay and if present, returns the servie specific delay if configured
2575
func (cont *AciController) svcInAddDelayList(name, ns string) (int, bool) {
×
2576
        for _, svc := range cont.config.ServiceGraphEndpointAddDelay.Services {
×
2577
                if svc.Name == name && svc.Namespace == ns {
×
2578
                        return svc.Delay, true
×
2579
                }
×
2580
        }
2581
        return 0, false
×
2582
}
2583

2584
// Check if the endpointslice update notification has any deletion of enpoint
2585
func (cont *AciController) isDeleteEndpointSlice(oldendpointslice, newendpointslice *discovery.EndpointSlice) bool {
×
2586
        del := false
×
2587

×
2588
        // if any endpoint is removed from endpontslice
×
2589
        if len(newendpointslice.Endpoints) < len(oldendpointslice.Endpoints) {
×
2590
                del = true
×
2591
        }
×
2592

2593
        if !del {
×
2594
                // if any one of the endpoint is in terminating state
×
2595
                for _, endpoint := range newendpointslice.Endpoints {
×
2596
                        if endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating {
×
2597
                                del = true
×
2598
                                break
×
2599
                        }
2600
                }
2601
        }
2602
        if !del {
×
2603
                // if any one of endpoint moved from ready state to not-ready state
×
2604
                for ix := range oldendpointslice.Endpoints {
×
2605
                        oldips := cont.getEndpointSliceEpIps(&oldendpointslice.Endpoints[ix])
×
2606
                        for newIx := range newendpointslice.Endpoints {
×
2607
                                newips := cont.getEndpointSliceEpIps(&newendpointslice.Endpoints[newIx])
×
2608
                                if reflect.DeepEqual(oldips, newips) {
×
2609
                                        if (oldendpointslice.Endpoints[ix].Conditions.Ready != nil && *oldendpointslice.Endpoints[ix].Conditions.Ready) &&
×
2610
                                                (newendpointslice.Endpoints[newIx].Conditions.Ready != nil && !*newendpointslice.Endpoints[newIx].Conditions.Ready) {
×
2611
                                                del = true
×
2612
                                        }
×
2613
                                        break
×
2614
                                }
2615
                        }
2616
                }
2617
        }
2618
        return del
×
2619
}
2620

2621
func (cont *AciController) doendpointSliceUpdatedDelay(oldendpointslice *discovery.EndpointSlice,
2622
        newendpointslice *discovery.EndpointSlice) {
×
2623
        svc, ns, valid := getServiceNameAndNs(newendpointslice)
×
2624
        if !valid {
×
2625
                return
×
2626
        }
×
2627
        svckey, valid := getServiceKey(newendpointslice)
×
2628
        if !valid {
×
2629
                return
×
2630
        }
×
2631
        delay := cont.config.ServiceGraphEndpointAddDelay.Delay
×
2632
        svcDelay, exists := cont.svcInAddDelayList(svc, ns)
×
2633
        if svcDelay > 0 {
×
2634
                delay = svcDelay
×
2635
        }
×
2636
        delayedsvc := exists && delay > 0
×
2637
        if delayedsvc {
×
2638
                cont.log.Debug("Delay of ", delay, " seconds is applicable for svc :", svc, " in ns: ", ns)
×
2639
                var delayedepslice DelayedEpSlice
×
2640
                delayedepslice.OldEpSlice = oldendpointslice
×
2641
                delayedepslice.ServiceKey = svckey
×
2642
                delayedepslice.NewEpSlice = newendpointslice
×
2643
                currentTime := time.Now()
×
2644
                delayedepslice.DelayedTime = currentTime.Add(time.Duration(delay) * time.Second)
×
2645
                cont.indexMutex.Lock()
×
2646
                cont.delayedEpSlices = append(cont.delayedEpSlices, &delayedepslice)
×
2647
                cont.indexMutex.Unlock()
×
2648
        } else {
×
2649
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
2650
        }
×
2651

2652
        if delayedsvc && cont.isDeleteEndpointSlice(oldendpointslice, newendpointslice) {
×
2653
                cont.log.Debug("Proceeding by ignoring delay as the update is due to delete of endpoint")
×
2654
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
2655
        }
×
2656
}
2657

2658
func (cont *AciController) endpointSliceUpdated(oldobj, newobj interface{}) {
1✔
2659
        oldendpointslice, ok := oldobj.(*discovery.EndpointSlice)
1✔
2660
        if !ok {
1✔
2661
                cont.log.Error("error processing Endpointslice object: ", oldobj)
×
2662
                return
×
2663
        }
×
2664
        newendpointslice, ok := newobj.(*discovery.EndpointSlice)
1✔
2665
        if !ok {
1✔
2666
                cont.log.Error("error processing Endpointslice object: ", newobj)
×
2667
                return
×
2668
        }
×
2669
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
2670
                cont.doendpointSliceUpdatedDelay(oldendpointslice, newendpointslice)
×
2671
        } else {
1✔
2672
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
1✔
2673
        }
1✔
2674
}
2675

2676
func (cont *AciController) doendpointSliceUpdated(oldendpointslice *discovery.EndpointSlice,
2677
        newendpointslice *discovery.EndpointSlice) {
1✔
2678
        servicekey, valid := getServiceKey(newendpointslice)
1✔
2679
        if !valid {
1✔
2680
                return
×
2681
        }
×
2682
        oldIps := cont.getEndpointSliceIps(oldendpointslice)
1✔
2683
        newIps := cont.getEndpointSliceIps(newendpointslice)
1✔
2684
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
2685
                cont.indexMutex.Lock()
1✔
2686
                cont.resolveServiceNamedPortFromEpSlice(oldendpointslice, servicekey, true)
1✔
2687
                cont.resolveServiceNamedPortFromEpSlice(newendpointslice, servicekey, false)
1✔
2688
                cont.queueIPNetPolUpdates(oldIps)
1✔
2689
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
2690
                cont.queueIPNetPolUpdates(newIps)
1✔
2691
                cont.indexMutex.Unlock()
1✔
2692
        }
1✔
2693

2694
        if !reflect.DeepEqual(oldendpointslice.Endpoints, newendpointslice.Endpoints) {
2✔
2695
                cont.queueEndpointSliceNetPolUpdates(oldendpointslice)
1✔
2696
                cont.queueEndpointSliceNetPolUpdates(newendpointslice)
1✔
2697
        }
1✔
2698
        cont.log.Debug("EndPointSlice Object Update: ", servicekey)
1✔
2699
        cont.queueServiceUpdateByKey(servicekey)
1✔
2700
}
2701

2702
func (cont *AciController) queueEndpointSliceNetPolUpdates(endpointslice *discovery.EndpointSlice) {
1✔
2703
        for _, endpoint := range endpointslice.Endpoints {
2✔
2704
                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" ||
1✔
2705
                        endpoint.TargetRef.Namespace == "" || endpoint.TargetRef.Name == "" {
2✔
2706
                        continue
1✔
2707
                }
2708
                if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
1✔
2709
                        continue
×
2710
                }
2711
                podkey := endpoint.TargetRef.Namespace + "/" + endpoint.TargetRef.Name
1✔
2712
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
2713
                ps := make(map[string]bool)
1✔
2714
                for _, npkey := range npkeys {
2✔
2715
                        cont.queueNetPolUpdateByKey(npkey)
1✔
2716
                }
1✔
2717
                // Process if the  any matching namedport wildcard policy is present
2718
                // ignore np already processed policies
2719
                cont.queueMatchingNamedNp(ps, podkey)
1✔
2720
        }
2721
}
2722

2723
func getServiceKey(endPointSlice *discovery.EndpointSlice) (string, bool) {
1✔
2724
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
1✔
2725
        if !ok {
1✔
2726
                return "", false
×
2727
        }
×
2728
        return endPointSlice.ObjectMeta.Namespace + "/" + serviceName, true
1✔
2729
}
2730

2731
func getServiceNameAndNs(endPointSlice *discovery.EndpointSlice) (string, string, bool) {
×
2732
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
×
2733
        if !ok {
×
2734
                return "", "", false
×
2735
        }
×
2736
        return serviceName, endPointSlice.ObjectMeta.Namespace, true
×
2737
}
2738

2739
// can be called with index lock
2740
func (sep *serviceEndpoint) UpdateServicesForNode(nodename string) {
1✔
2741
        cont := sep.cont
1✔
2742
        cache.ListAll(cont.endpointsIndexer, labels.Everything(),
1✔
2743
                func(endpointsobj interface{}) {
2✔
2744
                        endpoints := endpointsobj.(*v1.Endpoints)
1✔
2745
                        for _, subset := range endpoints.Subsets {
2✔
2746
                                for _, addr := range subset.Addresses {
2✔
2747
                                        if addr.NodeName != nil && *addr.NodeName == nodename {
2✔
2748
                                                servicekey, err :=
1✔
2749
                                                        cache.MetaNamespaceKeyFunc(endpointsobj.(*v1.Endpoints))
1✔
2750
                                                if err != nil {
1✔
2751
                                                        cont.log.Error("Could not create endpoints key: ", err)
×
2752
                                                        return
×
2753
                                                }
×
2754
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
2755
                                                return
1✔
2756
                                        }
2757
                                }
2758
                        }
2759
                })
2760
}
2761

2762
func (seps *serviceEndpointSlice) UpdateServicesForNode(nodename string) {
1✔
2763
        // 1. List all the endpointslice and check for matching nodename
1✔
2764
        // 2. if it matches trigger the Service update and mark it visited
1✔
2765
        cont := seps.cont
1✔
2766
        visited := make(map[string]bool)
1✔
2767
        cache.ListAll(cont.endpointSliceIndexer, labels.Everything(),
1✔
2768
                func(endpointSliceobj interface{}) {
2✔
2769
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2770
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2771
                                if endpoint.NodeName != nil && *endpoint.NodeName == nodename {
2✔
2772
                                        servicekey, valid := getServiceKey(endpointSlices)
1✔
2773
                                        if !valid {
1✔
2774
                                                return
×
2775
                                        }
×
2776
                                        if _, ok := visited[servicekey]; !ok {
2✔
2777
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
2778
                                                visited[servicekey] = true
1✔
2779
                                                return
1✔
2780
                                        }
1✔
2781
                                }
2782
                        }
2783
                })
2784
}
2785
func (cont *AciController) setNodeMap(nodeMap map[string]*metadata.ServiceEndpoint, nodeName string) {
1✔
2786
        nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
2787
        if !ok {
1✔
2788
                return
×
2789
        }
×
2790
        _, ok = cont.fabricPathForNode(nodeName)
1✔
2791
        if !ok {
2✔
2792
                return
1✔
2793
        }
1✔
2794
        nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
2795
}
2796

2797
// 2 cases when epslices corresponding to given service is presnt in delayedEpSlices:
2798
//  1. endpoint not present in delayedEpSlices of the service
2799
//  2. endpoint present in delayedEpSlices of the service but in not ready state
2800
//
2801
// indexMutex lock must be acquired before calling the function
2802
func (cont *AciController) isDelayedEndpoint(endpoint *discovery.Endpoint, svckey string) bool {
×
2803
        delayed := false
×
2804
        endpointips := cont.getEndpointSliceEpIps(endpoint)
×
2805
        for _, delayedepslices := range cont.delayedEpSlices {
×
2806
                if delayedepslices.ServiceKey == svckey {
×
2807
                        var found bool
×
2808
                        epslice := delayedepslices.OldEpSlice
×
2809
                        for ix := range epslice.Endpoints {
×
2810
                                epips := cont.getEndpointSliceEpIps(&epslice.Endpoints[ix])
×
2811
                                if reflect.DeepEqual(endpointips, epips) {
×
2812
                                        // case 2
×
2813
                                        if epslice.Endpoints[ix].Conditions.Ready != nil && !*epslice.Endpoints[ix].Conditions.Ready {
×
2814
                                                delayed = true
×
2815
                                        }
×
2816
                                        found = true
×
2817
                                }
2818
                        }
2819
                        // case 1
2820
                        if !found {
×
2821
                                delayed = true
×
2822
                        }
×
2823
                }
2824
        }
2825
        return delayed
×
2826
}
2827

2828
// set nodemap only if endoint is ready and not in delayedEpSlices
2829
func (cont *AciController) setNodeMapDelay(nodeMap map[string]*metadata.ServiceEndpoint,
2830
        endpoint *discovery.Endpoint, service *v1.Service) {
×
2831
        svckey, err := cache.MetaNamespaceKeyFunc(service)
×
2832
        if err != nil {
×
2833
                cont.log.Error("Could not create service key: ", err)
×
2834
                return
×
2835
        }
×
2836
        if cont.config.NoWaitForServiceEpReadiness ||
×
2837
                (endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready) {
×
2838
                if endpoint.NodeName != nil && *endpoint.NodeName != "" {
×
2839
                        // donot setNodeMap for endpoint if:
×
2840
                        //   endpoint is newly added
×
2841
                        //   endpoint status changed from not ready to ready
×
2842
                        if !cont.isDelayedEndpoint(endpoint, svckey) {
×
2843
                                cont.setNodeMap(nodeMap, *endpoint.NodeName)
×
2844
                        }
×
2845
                }
2846
        }
2847
}
2848

2849
func (sep *serviceEndpoint) GetnodesMetadata(key string,
2850
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
2851
        cont := sep.cont
1✔
2852
        endpointsobj, exists, err := cont.endpointsIndexer.GetByKey(key)
1✔
2853
        if err != nil {
1✔
2854
                cont.log.Error("Could not lookup endpoints for " +
×
2855
                        key + ": " + err.Error())
×
2856
        }
×
2857
        if exists && endpointsobj != nil {
2✔
2858
                endpoints := endpointsobj.(*v1.Endpoints)
1✔
2859
                for _, subset := range endpoints.Subsets {
2✔
2860
                        for _, addr := range subset.Addresses {
2✔
2861
                                if addr.NodeName == nil {
2✔
2862
                                        continue
1✔
2863
                                }
2864
                                cont.setNodeMap(nodeMap, *addr.NodeName)
1✔
2865
                        }
2866
                }
2867
        }
2868
        cont.log.Info("NodeMap: ", nodeMap)
1✔
2869
}
2870

2871
func (seps *serviceEndpointSlice) GetnodesMetadata(key string,
2872
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
2873
        cont := seps.cont
1✔
2874
        // 1. Get all the Endpoint slices matching the label service-name
1✔
2875
        // 2. update the node map matching with endpoints nodes name
1✔
2876
        label := map[string]string{discovery.LabelServiceName: service.ObjectMeta.Name}
1✔
2877
        selector := labels.SelectorFromSet(label)
1✔
2878
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2879
                func(endpointSliceobj interface{}) {
2✔
2880
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2881
                        for ix := range endpointSlices.Endpoints {
2✔
2882
                                if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
2883
                                        cont.setNodeMapDelay(nodeMap, &endpointSlices.Endpoints[ix], service)
×
2884
                                } else if cont.config.NoWaitForServiceEpReadiness ||
1✔
2885
                                        (endpointSlices.Endpoints[ix].Conditions.Ready != nil && *endpointSlices.Endpoints[ix].Conditions.Ready) {
2✔
2886
                                        if endpointSlices.Endpoints[ix].NodeName != nil && *endpointSlices.Endpoints[ix].NodeName != "" {
2✔
2887
                                                cont.setNodeMap(nodeMap, *endpointSlices.Endpoints[ix].NodeName)
1✔
2888
                                        }
1✔
2889
                                }
2890
                        }
2891
                })
2892
        cont.log.Debug("NodeMap: ", nodeMap)
1✔
2893
}
2894

2895
func (sep *serviceEndpoint) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2896
        cont := sep.cont
1✔
2897
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
2898
        if err != nil {
1✔
2899
                serviceLogger(cont.log, service).
×
2900
                        Error("Could not create service key: ", err)
×
2901
                return false
×
2902
        }
×
2903
        endpointsobj, _, err := cont.endpointsIndexer.GetByKey(key)
1✔
2904
        if err != nil {
1✔
2905
                cont.log.Error("Could not lookup endpoints for " +
×
2906
                        key + ": " + err.Error())
×
2907
                return false
×
2908
        }
×
2909
        if endpointsobj != nil {
2✔
2910
                for _, subset := range endpointsobj.(*v1.Endpoints).Subsets {
2✔
2911
                        for _, addr := range subset.Addresses {
2✔
2912
                                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" {
1✔
2913
                                        continue
×
2914
                                }
2915
                                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(),
1✔
2916
                                        addr.TargetRef.Name))
1✔
2917
                        }
2918
                }
2919
        }
2920
        return true
1✔
2921
}
2922

2923
func (seps *serviceEndpointSlice) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2924
        cont := seps.cont
1✔
2925
        label := map[string]string{discovery.LabelServiceName: service.ObjectMeta.Name}
1✔
2926
        selector := labels.SelectorFromSet(label)
1✔
2927
        epcount := 0
1✔
2928
        childs := make(map[string]struct{})
1✔
2929
        var exists = struct{}{}
1✔
2930
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2931
                func(endpointSliceobj interface{}) {
2✔
2932
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2933
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2934
                                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" {
1✔
2935
                                        continue
×
2936
                                }
2937
                                epcount++
1✔
2938
                                childs[endpoint.TargetRef.Name] = exists
1✔
2939
                                cont.log.Debug("EndPoint added: ", endpoint.TargetRef.Name)
1✔
2940
                        }
2941
                })
2942
        for child := range childs {
2✔
2943
                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(), child))
1✔
2944
        }
1✔
2945
        return epcount != 0
1✔
2946
}
2947

2948
func getProtocolStr(proto v1.Protocol) string {
1✔
2949
        var protostring string
1✔
2950
        switch proto {
1✔
2951
        case v1.ProtocolUDP:
1✔
2952
                protostring = "udp"
1✔
2953
        case v1.ProtocolTCP:
1✔
2954
                protostring = "tcp"
1✔
2955
        case v1.ProtocolSCTP:
×
2956
                protostring = "sctp"
×
2957
        default:
×
2958
                protostring = "tcp"
×
2959
        }
2960
        return protostring
1✔
2961
}
2962

2963
func (cont *AciController) removeIpFromIngressIPList(ingressIps *[]net.IP, ip net.IP) {
×
2964
        cont.returnServiceIps([]net.IP{ip})
×
2965
        index := -1
×
2966
        for i, v := range *ingressIps {
×
2967
                if v.Equal(ip) {
×
2968
                        index = i
×
2969
                        break
×
2970
                }
2971
        }
2972
        if index == -1 {
×
2973
                return
×
2974
        }
×
2975
        *ingressIps = append((*ingressIps)[:index], (*ingressIps)[index+1:]...)
×
2976
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc