• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 11917

04 May 2026 06:57AM UTC coverage: 63.383% (+0.4%) from 63.028%
11917

Pull #1706

travis-pro

jeffinkottaram
Fix per-service SNAT IP assignment for no-SnatIp policies

Two bugs are fixed:

1. Namespace-level no-SnatIp SNAT policies with multiple LoadBalancer
   services incorrectly assigned all LB external IPs to all pods.
   The controller now tags each global info entry with the originating
   service key. The hostagent filters SNAT UIDs per-pod by checking
   service endpoint membership, ensuring each pod only egresses with
   the external IP of the service it belongs to.

2. After hostagent pod restarts, namespace-level no-SnatIp policies
   lost their service SNAT entries because pod events did not evaluate
   service membership for the namespace-scoped policy path. The
   namespace-scoped branch now checks for matching services when
   processing pod events, consistent with the label-scoped path.

Additionally, two  inconsistencies are corrected:

1. The deployment path for no-SnatIp policies incorrectly matched
deployment metadata labels against service pod selectors. Since
services select pods (not deployments), and pod template changes
produce new pod events handled separately, this check was both
inaccurate and redundant. It is removed.

2. Explicit-SnatIp policies matching services now consistently apply the
specified SNAT IP to the service's backend pods across all policy
evaluation paths
Pull Request #1706: Fix per-service SNAT IP assignment for no-SnatIp policies

57 of 87 new or added lines in 2 files covered. (65.52%)

1119 existing lines in 9 files now uncovered.

13628 of 21501 relevant lines covered (63.38%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.63
/pkg/controller/services.go
1
// Copyright 2016 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
        "errors"
19
        "fmt"
20
        "net"
21
        "reflect"
22
        "regexp"
23
        "sort"
24
        "strconv"
25
        "strings"
26
        "time"
27

28
        "github.com/noironetworks/aci-containers/pkg/apicapi"
29
        "github.com/noironetworks/aci-containers/pkg/metadata"
30
        "github.com/noironetworks/aci-containers/pkg/util"
31
        "github.com/sirupsen/logrus"
32
        v1 "k8s.io/api/core/v1"
33
        discovery "k8s.io/api/discovery/v1"
34
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35
        "k8s.io/apimachinery/pkg/fields"
36
        "k8s.io/apimachinery/pkg/labels"
37
        "k8s.io/apimachinery/pkg/util/intstr"
38
        "k8s.io/client-go/kubernetes"
39
        "k8s.io/client-go/tools/cache"
40
)
41

42
// Default service contract scope value
43
const DefaultServiceContractScope = "context"
44

45
// Default service ext subnet scope - enable shared security
46
const DefaultServiceExtSubNetShared = false
47

48
func (cont *AciController) initEndpointsInformerFromClient(
49
        kubeClient kubernetes.Interface) {
×
50
        cont.initEndpointsInformerBase(
×
51
                cache.NewListWatchFromClient(
×
52
                        kubeClient.CoreV1().RESTClient(), "endpoints",
×
53
                        metav1.NamespaceAll, fields.Everything()))
×
54
}
×
55

56
func (cont *AciController) initEndpointSliceInformerFromClient(
UNCOV
57
        kubeClient kubernetes.Interface) {
×
UNCOV
58
        cont.initEndpointSliceInformerBase(
×
UNCOV
59
                cache.NewListWatchFromClient(
×
UNCOV
60
                        kubeClient.DiscoveryV1().RESTClient(), "endpointslices",
×
UNCOV
61
                        metav1.NamespaceAll, fields.Everything()))
×
UNCOV
62
}
×
63

64
func (cont *AciController) initEndpointSliceInformerBase(listWatch *cache.ListWatch) {
1✔
65
        cont.endpointSliceIndexer, cont.endpointSliceInformer = cache.NewIndexerInformer(
1✔
66
                listWatch, &discovery.EndpointSlice{}, 0,
1✔
67
                cache.ResourceEventHandlerFuncs{
1✔
68
                        AddFunc: func(obj interface{}) {
2✔
69
                                cont.endpointSliceAdded(obj)
1✔
70
                        },
1✔
71
                        UpdateFunc: func(oldobj interface{}, newobj interface{}) {
1✔
72
                                cont.endpointSliceUpdated(oldobj, newobj)
1✔
73
                        },
1✔
74
                        DeleteFunc: func(obj interface{}) {
1✔
75
                                cont.endpointSliceDeleted(obj)
1✔
76
                        },
1✔
77
                },
78
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
79
        )
80
}
81

82
func (cont *AciController) initEndpointsInformerBase(listWatch *cache.ListWatch) {
1✔
83
        cont.endpointsIndexer, cont.endpointsInformer = cache.NewIndexerInformer(
1✔
84
                listWatch, &v1.Endpoints{}, 0,
1✔
85
                cache.ResourceEventHandlerFuncs{
1✔
86
                        AddFunc: func(obj interface{}) {
2✔
87
                                cont.endpointsAdded(obj)
1✔
88
                        },
1✔
89
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
90
                                cont.endpointsUpdated(old, new)
1✔
91
                        },
1✔
92
                        DeleteFunc: func(obj interface{}) {
1✔
93
                                cont.endpointsDeleted(obj)
1✔
94
                        },
1✔
95
                },
96
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
97
        )
98
}
99

100
func (cont *AciController) initServiceInformerFromClient(
UNCOV
101
        kubeClient *kubernetes.Clientset) {
×
UNCOV
102
        cont.initServiceInformerBase(
×
UNCOV
103
                cache.NewListWatchFromClient(
×
UNCOV
104
                        kubeClient.CoreV1().RESTClient(), "services",
×
UNCOV
105
                        metav1.NamespaceAll, fields.Everything()))
×
UNCOV
106
}
×
107

108
func (cont *AciController) initServiceInformerBase(listWatch *cache.ListWatch) {
1✔
109
        cont.serviceIndexer, cont.serviceInformer = cache.NewIndexerInformer(
1✔
110
                listWatch, &v1.Service{}, 0,
1✔
111
                cache.ResourceEventHandlerFuncs{
1✔
112
                        AddFunc: func(obj interface{}) {
2✔
113
                                cont.serviceAdded(obj)
1✔
114
                        },
1✔
115
                        UpdateFunc: func(old interface{}, new interface{}) {
1✔
116
                                cont.serviceUpdated(old, new)
1✔
117
                        },
1✔
118
                        DeleteFunc: func(obj interface{}) {
×
UNCOV
119
                                cont.serviceDeleted(obj)
×
UNCOV
120
                        },
×
121
                },
122
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
123
        )
124
}
125

126
func serviceLogger(log *logrus.Logger, as *v1.Service) *logrus.Entry {
1✔
127
        return log.WithFields(logrus.Fields{
1✔
128
                "namespace": as.ObjectMeta.Namespace,
1✔
129
                "name":      as.ObjectMeta.Name,
1✔
130
                "type":      as.Spec.Type,
1✔
131
        })
1✔
132
}
1✔
133

134
func (cont *AciController) queueIPNetPolUpdates(ips map[string]bool) {
1✔
135
        for ipStr := range ips {
2✔
136
                ip := net.ParseIP(ipStr)
1✔
137
                if ip == nil {
1✔
UNCOV
138
                        continue
×
139
                }
140
                entries, err := cont.netPolSubnetIndex.ContainingNetworks(ip)
1✔
141
                if err != nil {
1✔
UNCOV
142
                        cont.log.Error("Corrupted network policy IP index, err: ", err)
×
UNCOV
143
                        return
×
UNCOV
144
                }
×
145
                for _, entry := range entries {
2✔
146
                        for npkey := range entry.(*ipIndexEntry).keys {
2✔
147
                                cont.queueNetPolUpdateByKey(npkey)
1✔
148
                        }
1✔
149
                }
150
        }
151
}
152

153
func (cont *AciController) queuePortNetPolUpdates(ports map[string]targetPort) {
1✔
154
        for portkey := range ports {
2✔
155
                entry := cont.targetPortIndex[portkey]
1✔
156
                if entry == nil {
2✔
157
                        continue
1✔
158
                }
159
                for npkey := range entry.networkPolicyKeys {
2✔
160
                        cont.queueNetPolUpdateByKey(npkey)
1✔
161
                }
1✔
162
        }
163
}
164

165
func (cont *AciController) queueNetPolForEpAddrs(addrs []v1.EndpointAddress) {
1✔
166
        for _, addr := range addrs {
2✔
167
                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" ||
1✔
168
                        addr.TargetRef.Namespace == "" || addr.TargetRef.Name == "" {
2✔
169
                        continue
1✔
170
                }
171
                podkey := addr.TargetRef.Namespace + "/" + addr.TargetRef.Name
1✔
172
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
173
                ps := make(map[string]bool)
1✔
174
                for _, npkey := range npkeys {
2✔
175
                        cont.queueNetPolUpdateByKey(npkey)
1✔
176
                        ps[npkey] = true
1✔
177
                }
1✔
178
                // Process if the  any matching namedport wildcard policy is present
179
                // ignore np already processed policies
180
                cont.queueMatchingNamedNp(ps, podkey)
1✔
181
        }
182
}
183

184
func (cont *AciController) queueMatchingNamedNp(served map[string]bool, podkey string) {
1✔
185
        cont.indexMutex.Lock()
1✔
186
        for npkey := range cont.nmPortNp {
2✔
187
                if _, ok := served[npkey]; !ok {
2✔
188
                        if cont.checkPodNmpMatchesNp(npkey, podkey) {
2✔
189
                                cont.queueNetPolUpdateByKey(npkey)
1✔
190
                        }
1✔
191
                }
192
        }
193
        cont.indexMutex.Unlock()
1✔
194
}
195
func (cont *AciController) queueEndpointsNetPolUpdates(endpoints *v1.Endpoints) {
1✔
196
        for _, subset := range endpoints.Subsets {
2✔
197
                cont.queueNetPolForEpAddrs(subset.Addresses)
1✔
198
                cont.queueNetPolForEpAddrs(subset.NotReadyAddresses)
1✔
199
        }
1✔
200
}
201

202
func (cont *AciController) returnServiceIps(ips []net.IP) {
1✔
203
        for _, ip := range ips {
2✔
204
                if ip.To4() != nil {
2✔
205
                        cont.serviceIps.DeallocateIp(ip)
1✔
206
                } else if ip.To16() != nil {
3✔
207
                        cont.serviceIps.DeallocateIp(ip)
1✔
208
                }
1✔
209
        }
210
}
211

212
func returnIps(pool *netIps, ips []net.IP) {
1✔
213
        for _, ip := range ips {
2✔
214
                if ip.To4() != nil {
2✔
215
                        pool.V4.AddIp(ip)
1✔
216
                } else if ip.To16() != nil {
3✔
217
                        pool.V6.AddIp(ip)
1✔
218
                }
1✔
219
        }
220
}
221

222
func (cont *AciController) staticMonPolName() string {
1✔
223
        return cont.aciNameForKey("monPol", cont.env.ServiceBd())
1✔
224
}
1✔
225

226
func (cont *AciController) staticMonPolDn() string {
1✔
227
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
228
                return fmt.Sprintf("uni/tn-%s/ipslaMonitoringPol-%s",
1✔
229
                        cont.config.AciVrfTenant, cont.staticMonPolName())
1✔
230
        }
1✔
231
        return ""
×
232
}
233

234
func (cont *AciController) staticServiceObjs() apicapi.ApicSlice {
1✔
235
        var serviceObjs apicapi.ApicSlice
1✔
236

1✔
237
        // Service bridge domain
1✔
238
        bdName := cont.aciNameForKey("bd", cont.env.ServiceBd())
1✔
239
        bd := apicapi.NewFvBD(cont.config.AciVrfTenant, bdName)
1✔
240
        if apicapi.ApicVersion >= "6.0(4c)" {
1✔
UNCOV
241
                bd.SetAttr("serviceBdRoutingDisable", "yes")
×
242
        }
×
243
        bd.SetAttr("arpFlood", "yes")
1✔
244
        bd.SetAttr("ipLearning", "no")
1✔
245
        bd.SetAttr("unkMacUcastAct", cont.config.UnknownMacUnicastAction)
1✔
246
        bdToOut := apicapi.NewRsBdToOut(bd.GetDn(), cont.config.AciL3Out)
1✔
247
        bd.AddChild(bdToOut)
1✔
248
        bdToVrf := apicapi.NewRsCtx(bd.GetDn(), cont.config.AciVrf)
1✔
249
        bd.AddChild(bdToVrf)
1✔
250

1✔
251
        bdn := bd.GetDn()
1✔
252
        for _, cidr := range cont.config.NodeServiceSubnets {
2✔
253
                sn := apicapi.NewFvSubnet(bdn, cidr)
1✔
254
                bd.AddChild(sn)
1✔
255
        }
1✔
256
        serviceObjs = append(serviceObjs, bd)
1✔
257

1✔
258
        // Service IP SLA monitoring policy
1✔
259
        if cont.config.AciServiceMonitorInterval > 0 {
2✔
260
                monPol := apicapi.NewFvIPSLAMonitoringPol(cont.config.AciVrfTenant,
1✔
261
                        cont.staticMonPolName())
1✔
262
                monPol.SetAttr("slaFrequency",
1✔
263
                        strconv.Itoa(cont.config.AciServiceMonitorInterval))
1✔
264
                serviceObjs = append(serviceObjs, monPol)
1✔
265
        }
1✔
266

267
        return serviceObjs
1✔
268
}
269

270
func (cont *AciController) initStaticServiceObjs() {
1✔
271
        cont.apicConn.WriteApicObjects(cont.config.AciPrefix+"_service_static",
1✔
272
                cont.staticServiceObjs())
1✔
273
}
1✔
274

275
func (cont *AciController) updateServicesForNode(nodename string) {
1✔
276
        cont.serviceEndPoints.UpdateServicesForNode(nodename)
1✔
277
}
1✔
278

279
// must have index lock
280
func (cont *AciController) getActiveFabricPathDn(node string) string {
×
281
        var fabricPathDn string
×
282
        sz := len(cont.nodeOpflexDevice[node])
×
283
        for i := range cont.nodeOpflexDevice[node] {
×
284
                device := cont.nodeOpflexDevice[node][sz-1-i]
×
285
                if device.GetAttrStr("state") == "connected" {
×
286
                        fabricPathDn = device.GetAttrStr("fabricPathDn")
×
287
                        break
×
288
                }
289
        }
290
        return fabricPathDn
×
291
}
292

293
func (cont *AciController) getOpflexOdevCount(node string) (int, string) {
×
294
        fabricPathDnCount := make(map[string]int)
×
295
        var maxCount int
×
296
        var fabricPathDn string
×
297
        for _, device := range cont.nodeOpflexDevice[node] {
×
298
                fabricpathdn := device.GetAttrStr("fabricPathDn")
×
299
                count, ok := fabricPathDnCount[fabricpathdn]
×
UNCOV
300
                if ok {
×
UNCOV
301
                        fabricPathDnCount[fabricpathdn] = count + 1
×
UNCOV
302
                } else {
×
303
                        fabricPathDnCount[fabricpathdn] = 1
×
UNCOV
304
                }
×
UNCOV
305
                if fabricPathDnCount[fabricpathdn] >= maxCount {
×
306
                        maxCount = fabricPathDnCount[fabricpathdn]
×
307
                        fabricPathDn = fabricpathdn
×
308
                }
×
309
        }
310
        return maxCount, fabricPathDn
×
311
}
312

313
func deleteDevicesFromList(delDevices, devices apicapi.ApicSlice) apicapi.ApicSlice {
×
314
        var newDevices apicapi.ApicSlice
×
315
        for _, device := range devices {
×
316
                found := false
×
317
                for _, delDev := range delDevices {
×
318
                        if reflect.DeepEqual(delDev, device) {
×
319
                                found = true
×
320
                        }
×
321
                }
322
                if !found {
×
323
                        newDevices = append(newDevices, device)
×
324
                }
×
325
        }
326
        return newDevices
×
327
}
328

UNCOV
329
func (cont *AciController) getAciPodSubnet(pod string) (string, error) {
×
330
        podslice := strings.Split(pod, "-")
×
331
        if len(podslice) < 2 {
×
332
                return "", fmt.Errorf("Failed to get podid from pod")
×
333
        }
×
334
        podid := podslice[1]
×
UNCOV
335
        var subnet string
×
336
        args := []string{
×
337
                "query-target=self",
×
338
        }
×
339
        url := fmt.Sprintf("/api/node/mo/uni/controller/setuppol/setupp-%s.json?%s", podid, strings.Join(args, "&"))
×
340
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
341
        if err != nil {
×
342
                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
343
                return subnet, err
×
UNCOV
344
        }
×
UNCOV
345
        for _, obj := range apicresp.Imdata {
×
UNCOV
346
                for _, body := range obj {
×
UNCOV
347
                        tepPool, ok := body.Attributes["tepPool"].(string)
×
348
                        if ok {
×
349
                                subnet = tepPool
×
350
                                break
×
351
                        }
352
                }
353
        }
354
        return subnet, nil
×
355
}
356

UNCOV
357
func (cont *AciController) isSingleOpflexOdev(fabricPathDn string) (bool, error) {
×
UNCOV
358
        pathSlice := strings.Split(fabricPathDn, "/")
×
359
        if len(pathSlice) > 2 {
×
360
                path := pathSlice[2]
×
361
                // topology/<aci_pod_name>/paths-<id>/pathep-<iface> - fabricPathDn if
×
362
                // host is connnected via single link
×
363
                // topology/<aci_pod_name>/protpaths-<id_1>-<id_2>/pathep-<iface> - fabricPathDn if
×
364
                // host is connected to vpc pair
×
365
                if strings.Contains(path, "protpaths-") {
×
366
                        args := []string{
×
367
                                "query-target=self",
×
368
                        }
×
369
                        url := fmt.Sprintf("/api/node/class/vpcDom.json?%s", strings.Join(args, "&"))
×
UNCOV
370
                        apicresp, err := cont.apicConn.GetApicResponse(url)
×
371
                        if err != nil {
×
372
                                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
373
                                return false, err
×
374
                        }
×
375
                        nodeIdSlice := strings.Split(path, "-")
×
376
                        if len(nodeIdSlice) != 3 {
×
377
                                cont.log.Error("Invalid fabricPathDn ", fabricPathDn)
×
378
                                return false, fmt.Errorf("Invalid path in fabricPathDn %s", fabricPathDn)
×
379
                        }
×
380
                        // As host is connected to vpc pair, check if the status of any of the vpcDom is down
381
                        upCount := 0
×
382
                        for i, nodeId := range nodeIdSlice {
×
UNCOV
383
                                instDn := fmt.Sprintf("topology/%s/node-%s/sys/vpc/inst", pathSlice[1], nodeId)
×
UNCOV
384
                                if i == 0 {
×
385
                                        continue
×
386
                                }
387
                                for _, obj := range apicresp.Imdata {
×
388
                                        for _, body := range obj {
×
389
                                                dn, ok := body.Attributes["dn"].(string)
×
390
                                                if ok && strings.Contains(dn, instDn) {
×
391
                                                        peerSt, ok := body.Attributes["peerSt"].(string)
×
392
                                                        if ok && peerSt == "up" {
×
393
                                                                upCount++
×
394
                                                        }
×
395
                                                }
396
                                        }
397
                                }
398
                        }
399
                        if upCount < 2 {
×
400
                                return true, nil
×
401
                        }
×
402
                        return false, nil
×
403
                } else {
×
404
                        return true, nil
×
405
                }
×
406
        }
407
        return false, fmt.Errorf("Invalid fabricPathDn %s ", fabricPathDn)
×
408
}
409

410
func (cont *AciController) createAciPodAnnotation(node string) (aciPodAnnot, error) {
×
411
        odevCount, fabricPathDn := cont.getOpflexOdevCount(node)
×
412
        nodeAciPodAnnot := cont.nodeACIPod[node]
×
413
        isSingleOdev := false
×
414
        if odevCount == 1 {
×
UNCOV
415
                var err error
×
416
                isSingleOdev, err = cont.isSingleOpflexOdev(fabricPathDn)
×
UNCOV
417
                if err != nil {
×
UNCOV
418
                        cont.log.Error(err)
×
419
                        return nodeAciPodAnnot, err
×
420
                }
×
421
        }
422
        if (odevCount == 0) ||
×
423
                (odevCount == 1 && !isSingleOdev) {
×
424
                if nodeAciPodAnnot.aciPod != "none" {
×
425
                        if nodeAciPodAnnot.disconnectTime.IsZero() {
×
426
                                nodeAciPodAnnot.disconnectTime = time.Now()
×
427
                        } else {
×
428
                                currentTime := time.Now()
×
429
                                diff := currentTime.Sub(nodeAciPodAnnot.disconnectTime)
×
UNCOV
430
                                if diff.Seconds() > float64(cont.config.OpflexDeviceReconnectWaitTimeout) {
×
431
                                        nodeAciPodAnnot.aciPod = "none"
×
432
                                        nodeAciPodAnnot.disconnectTime = time.Time{}
×
433
                                }
×
434
                        }
435
                }
436
                return nodeAciPodAnnot, nil
×
437
        } else if (odevCount == 2) ||
×
438
                (odevCount == 1 && isSingleOdev) {
×
439
                // when there is already a connected opflex device,
×
440
                // fabricPathDn will have latest pod iformation
×
441
                // and annotation will be in the form pod-<podid>-<subnet of pod>
×
442
                nodeAciPodAnnot.disconnectTime = time.Time{}
×
443
                nodeAciPod := nodeAciPodAnnot.aciPod
×
444
                pathSlice := strings.Split(fabricPathDn, "/")
×
445
                if len(pathSlice) > 1 {
×
446
                        pod := pathSlice[1]
×
447

×
UNCOV
448
                        // when there is difference in pod info avaliable from fabricPathDn
×
449
                        // and what we have in cache, update info in cache and change annotation on node
×
UNCOV
450
                        if !strings.Contains(nodeAciPod, pod) {
×
UNCOV
451
                                subnet, err := cont.getAciPodSubnet(pod)
×
452
                                if err != nil {
×
453
                                        cont.log.Error("Failed to get subnet of aci pod ", err.Error())
×
454
                                        return nodeAciPodAnnot, err
×
455
                                } else {
×
456
                                        nodeAciPodAnnot.aciPod = pod + "-" + subnet
×
457
                                        return nodeAciPodAnnot, nil
×
458
                                }
×
459
                        } else {
×
460
                                return nodeAciPodAnnot, nil
×
UNCOV
461
                        }
×
462
                } else {
×
463
                        cont.log.Error("Invalid fabricPathDn of opflexOdev of node ", node)
×
464
                        return nodeAciPodAnnot, fmt.Errorf("Invalid fabricPathDn of opflexOdev")
×
465
                }
×
466
        }
467
        return nodeAciPodAnnot, fmt.Errorf("Failed to get annotation for node %s", node)
×
468
}
469

470
func (cont *AciController) createNodeAciPodAnnotation(node string) (aciPodAnnot, error) {
×
471
        odevCount, fabricPathDn := cont.getOpflexOdevCount(node)
×
472
        nodeAciPodAnnot := cont.nodeACIPodAnnot[node]
×
473
        isSingleOdev := false
×
474
        if odevCount == 1 {
×
475
                var err error
×
476
                isSingleOdev, err = cont.isSingleOpflexOdev(fabricPathDn)
×
477
                if err != nil {
×
478
                        cont.log.Error(err)
×
479
                        return nodeAciPodAnnot, err
×
480
                }
×
481
        }
UNCOV
482
        if (odevCount == 0) ||
×
483
                (odevCount == 1 && !isSingleOdev) {
×
484
                if nodeAciPodAnnot.aciPod != "none" {
×
485
                        nodeAciPodAnnot.aciPod = "none"
×
486
                }
×
487
                return nodeAciPodAnnot, nil
×
UNCOV
488
        } else if (odevCount == 2) ||
×
UNCOV
489
                (odevCount == 1 && isSingleOdev) {
×
UNCOV
490
                pathSlice := strings.Split(fabricPathDn, "/")
×
491
                if len(pathSlice) > 1 {
×
492

×
493
                        nodeAciPodAnnot.aciPod = pathSlice[1]
×
494
                        return nodeAciPodAnnot, nil
×
495
                } else {
×
496
                        cont.log.Error("Invalid fabricPathDn of opflexOdev of node ", node)
×
497
                        return nodeAciPodAnnot, fmt.Errorf("Invalid fabricPathDn of opflexOdev")
×
498
                }
×
499
        }
UNCOV
500
        return nodeAciPodAnnot, fmt.Errorf("Failed to get annotation for node %s", node)
×
501
}
502

503
func (cont *AciController) checkChangeOfOpflexOdevAciPod() {
×
504
        var nodeAnnotationUpdates []string
×
505
        cont.apicConn.SyncMutex.Lock()
×
506
        syncDone := cont.apicConn.SyncDone
×
507
        cont.apicConn.SyncMutex.Unlock()
×
508

×
509
        if !syncDone {
×
510
                return
×
511
        }
×
512

513
        cont.indexMutex.Lock()
×
514
        for node := range cont.nodeACIPodAnnot {
×
515
                annot, err := cont.createNodeAciPodAnnotation(node)
×
516
                if err != nil {
×
517
                        if strings.Contains(fmt.Sprint(err), "Failed to get annotation") {
×
518
                                now := time.Now()
×
519
                                if annot.lastErrorTime.IsZero() || now.Sub(annot.lastErrorTime).Seconds() >= 60 {
×
UNCOV
520
                                        annot.lastErrorTime = now
×
UNCOV
521
                                        cont.nodeACIPodAnnot[node] = annot
×
522
                                        cont.log.Error(err.Error())
×
523
                                }
×
524
                        } else {
×
525
                                cont.log.Error(err.Error())
×
526
                        }
×
UNCOV
527
                } else {
×
UNCOV
528
                        if annot != cont.nodeACIPodAnnot[node] {
×
UNCOV
529
                                cont.nodeACIPodAnnot[node] = annot
×
UNCOV
530
                                nodeAnnotationUpdates = append(nodeAnnotationUpdates, node)
×
UNCOV
531
                        }
×
532
                }
533
        }
534
        cont.indexMutex.Unlock()
×
535
        if len(nodeAnnotationUpdates) > 0 {
×
536
                for _, updatednode := range nodeAnnotationUpdates {
×
537
                        go cont.env.NodeAnnotationChanged(updatednode)
×
538
                }
×
539
        }
540
}
541

542
func (cont *AciController) checkChangeOfOdevAciPod() {
×
543
        var nodeAnnotationUpdates []string
×
UNCOV
544
        cont.apicConn.SyncMutex.Lock()
×
545
        syncDone := cont.apicConn.SyncDone
×
546
        cont.apicConn.SyncMutex.Unlock()
×
547

×
548
        if !syncDone {
×
549
                return
×
UNCOV
550
        }
×
551

552
        cont.indexMutex.Lock()
×
553
        for node := range cont.nodeACIPod {
×
554
                annot, err := cont.createAciPodAnnotation(node)
×
555
                if err != nil {
×
556
                        if strings.Contains(fmt.Sprint(err), "Failed to get annotation") {
×
557
                                now := time.Now()
×
558
                                if annot.lastErrorTime.IsZero() || now.Sub(annot.lastErrorTime).Seconds() >= 60 {
×
559
                                        annot.lastErrorTime = now
×
UNCOV
560
                                        cont.nodeACIPod[node] = annot
×
UNCOV
561
                                        cont.log.Error(err.Error())
×
UNCOV
562
                                }
×
UNCOV
563
                        } else {
×
UNCOV
564
                                cont.log.Error(err.Error())
×
565
                        }
×
566
                } else {
×
UNCOV
567
                        if annot != cont.nodeACIPod[node] {
×
UNCOV
568
                                cont.nodeACIPod[node] = annot
×
UNCOV
569
                                nodeAnnotationUpdates = append(nodeAnnotationUpdates, node)
×
UNCOV
570
                        }
×
571
                }
572
        }
573
        cont.indexMutex.Unlock()
×
574
        if len(nodeAnnotationUpdates) > 0 {
×
575
                for _, updatednode := range nodeAnnotationUpdates {
×
576
                        go cont.env.NodeAnnotationChanged(updatednode)
×
UNCOV
577
                }
×
578
        }
579
}
580

581
func (cont *AciController) deleteOldOpflexDevices() {
1✔
582
        var nodeUpdates []string
1✔
583
        cont.indexMutex.Lock()
1✔
584
        for node, devices := range cont.nodeOpflexDevice {
1✔
UNCOV
585
                var delDevices apicapi.ApicSlice
×
UNCOV
586
                fabricPathDn := cont.getActiveFabricPathDn(node)
×
UNCOV
587
                if fabricPathDn != "" {
×
UNCOV
588
                        for _, device := range devices {
×
UNCOV
589
                                if device.GetAttrStr("delete") == "true" && device.GetAttrStr("fabricPathDn") != fabricPathDn {
×
UNCOV
590
                                        deleteTimeStr := device.GetAttrStr("deleteTime")
×
UNCOV
591
                                        deleteTime, err := time.Parse(time.RFC3339, deleteTimeStr)
×
UNCOV
592
                                        if err != nil {
×
UNCOV
593
                                                cont.log.Error("Failed to parse opflex device delete time: ", err)
×
UNCOV
594
                                                continue
×
595
                                        }
UNCOV
596
                                        now := time.Now()
×
UNCOV
597
                                        diff := now.Sub(deleteTime)
×
UNCOV
598
                                        if diff.Seconds() >= cont.config.OpflexDeviceDeleteTimeout {
×
UNCOV
599
                                                delDevices = append(delDevices, device)
×
UNCOV
600
                                        }
×
601
                                }
602
                        }
UNCOV
603
                        if len(delDevices) > 0 {
×
UNCOV
604
                                newDevices := deleteDevicesFromList(delDevices, devices)
×
UNCOV
605
                                cont.nodeOpflexDevice[node] = newDevices
×
UNCOV
606
                                cont.log.Info("Opflex device list for node ", node, " after deleting stale entries: ", cont.nodeOpflexDevice[node])
×
UNCOV
607
                                if len(newDevices) == 0 {
×
UNCOV
608
                                        delete(cont.nodeOpflexDevice, node)
×
UNCOV
609
                                }
×
UNCOV
610
                                nodeUpdates = append(nodeUpdates, node)
×
611
                        }
612
                }
613
        }
614
        cont.indexMutex.Unlock()
1✔
615
        if len(nodeUpdates) > 0 {
1✔
UNCOV
616
                cont.postOpflexDeviceDelete(nodeUpdates)
×
UNCOV
617
        }
×
618
}
619

620
// must have index lock
621
func (cont *AciController) setDeleteFlagForOldDevices(node, fabricPathDn string) {
1✔
622
        for _, device := range cont.nodeOpflexDevice[node] {
2✔
623
                if device.GetAttrStr("fabricPathDn") != fabricPathDn {
1✔
UNCOV
624
                        t := time.Now()
×
UNCOV
625
                        device.SetAttr("delete", "true")
×
UNCOV
626
                        device.SetAttr("deleteTime", t.Format(time.RFC3339))
×
UNCOV
627
                }
×
628
        }
629
}
630

631
// must have index lock
632
func (cont *AciController) fabricPathForNode(name string) (string, bool) {
1✔
633
        sz := len(cont.nodeOpflexDevice[name])
1✔
634
        for i := range cont.nodeOpflexDevice[name] {
2✔
635
                device := cont.nodeOpflexDevice[name][sz-1-i]
1✔
636
                deviceState := device.GetAttrStr("state")
1✔
637
                if deviceState == "connected" {
2✔
638
                        if deviceState != device.GetAttrStr("prevState") {
2✔
639
                                cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Info("Processing fabric path for node ",
1✔
640
                                        "when connected device state is found")
1✔
641
                                device.SetAttr("prevState", deviceState)
1✔
642
                        }
1✔
643
                        fabricPathDn := device.GetAttrStr("fabricPathDn")
1✔
644
                        cont.setDeleteFlagForOldDevices(name, fabricPathDn)
1✔
645
                        return fabricPathDn, true
1✔
646
                } else {
1✔
647
                        device.SetAttr("prevState", deviceState)
1✔
648
                }
1✔
649
        }
650
        if sz > 0 {
2✔
651
                // When the opflex-device for a node changes, for example during a live migration,
1✔
652
                // we end up with both the old and the new device objects till the old object
1✔
653
                // ages out on APIC. The new object is at end of the devices list (see opflexDeviceChanged),
1✔
654
                // so we return the fabricPathDn of the last opflex-device.
1✔
655
                cont.fabricPathLogger(cont.nodeOpflexDevice[name][sz-1].GetAttrStr("hostName"),
1✔
656
                        cont.nodeOpflexDevice[name][sz-1]).Info("Processing fabricPathDn for node")
1✔
657
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("fabricPathDn"), true
1✔
658
        }
1✔
659
        return "", false
1✔
660
}
661

662
// must have index lock
663
func (cont *AciController) deviceMacForNode(name string) (string, bool) {
1✔
664
        sz := len(cont.nodeOpflexDevice[name])
1✔
665
        if sz > 0 {
2✔
666
                // When the opflex-device for a node changes, for example when the
1✔
667
                // node is recreated, we end up with both the old and the new
1✔
668
                // device objects till the old object ages out on APIC. The
1✔
669
                // new object is at end of the devices list (see
1✔
670
                // opflexDeviceChanged), so we return the MAC address of the
1✔
671
                // last opflex-device.
1✔
672
                return cont.nodeOpflexDevice[name][sz-1].GetAttrStr("mac"), true
1✔
673
        }
1✔
674
        return "", false
1✔
675
}
676

677
func apicRedirectDst(rpDn string, ip string, mac string,
678
        descr string, healthGroupDn string, enablePbrTracking bool) apicapi.ApicObject {
1✔
679
        dst := apicapi.NewVnsRedirectDest(rpDn, ip, mac).SetAttr("descr", descr)
1✔
680
        if healthGroupDn != "" && enablePbrTracking {
2✔
681
                dst.AddChild(apicapi.NewVnsRsRedirectHealthGroup(dst.GetDn(),
1✔
682
                        healthGroupDn))
1✔
683
        }
1✔
684
        return dst
1✔
685
}
686

687
func (cont *AciController) apicRedirectPol(name string, tenantName string, nodes []string,
688
        nodeMap map[string]*metadata.ServiceEndpoint,
689
        monPolDn string, enablePbrTracking bool) (apicapi.ApicObject, string) {
1✔
690
        rp := apicapi.NewVnsSvcRedirectPol(tenantName, name)
1✔
691
        rp.SetAttr("thresholdDownAction", "deny")
1✔
692
        if cont.config.DisableResilientHashing {
1✔
UNCOV
693
                rp.SetAttr("resilientHashEnabled", "no")
×
UNCOV
694
        }
×
695
        rpDn := rp.GetDn()
1✔
696
        for _, node := range nodes {
2✔
697
                cont.indexMutex.Lock()
1✔
698
                serviceEp, ok := nodeMap[node]
1✔
699
                if !ok {
1✔
UNCOV
700
                        continue
×
701
                }
702
                if serviceEp.Ipv4 != nil {
2✔
703
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv4.String(),
1✔
704
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
1✔
705
                }
1✔
706
                if serviceEp.Ipv6 != nil {
1✔
UNCOV
707
                        rp.AddChild(apicRedirectDst(rpDn, serviceEp.Ipv6.String(),
×
UNCOV
708
                                serviceEp.Mac, node, serviceEp.HealthGroupDn, enablePbrTracking))
×
UNCOV
709
                }
×
710
                cont.indexMutex.Unlock()
1✔
711
        }
712
        if monPolDn != "" && enablePbrTracking {
2✔
713
                rp.AddChild(apicapi.NewVnsRsIPSLAMonitoringPol(rpDn, monPolDn))
1✔
714
        }
1✔
715
        return rp, rpDn
1✔
716
}
717

718
func apicExtNetCreate(enDn string, ingress string, ipv4 bool,
719
        cidr bool, sharedSec bool) apicapi.ApicObject {
1✔
720
        if !cidr {
2✔
721
                if ipv4 {
2✔
722
                        ingress += "/32"
1✔
723
                } else {
1✔
UNCOV
724
                        ingress += "/128"
×
UNCOV
725
                }
×
726
        }
727
        subnet := apicapi.NewL3extSubnet(enDn, ingress, "", "")
1✔
728
        if sharedSec {
2✔
729
                subnet.SetAttr("scope", "import-security,shared-security")
1✔
730
        }
1✔
731
        return subnet
1✔
732
}
733

734
func apicExtNet(name string, tenantName string, l3Out string,
735
        ingresses []string, sharedSecurity bool, snat bool) apicapi.ApicObject {
1✔
736
        en := apicapi.NewL3extInstP(tenantName, l3Out, name)
1✔
737
        enDn := en.GetDn()
1✔
738
        if snat {
2✔
739
                en.AddChild(apicapi.NewFvRsCons(enDn, name))
1✔
740
        } else {
2✔
741
                en.AddChild(apicapi.NewFvRsProv(enDn, name))
1✔
742
        }
1✔
743

744
        for _, ingress := range ingresses {
2✔
745
                ip, _, _ := net.ParseCIDR(ingress)
1✔
746
                // If ingress is a subnet
1✔
747
                if ip != nil {
2✔
748
                        if ip != nil && ip.To4() != nil {
2✔
749
                                subnet := apicExtNetCreate(enDn, ingress, true, true, sharedSecurity)
1✔
750
                                en.AddChild(subnet)
1✔
751
                        } else if ip != nil && ip.To16() != nil {
1✔
752
                                subnet := apicExtNetCreate(enDn, ingress, false, true, sharedSecurity)
×
753
                                en.AddChild(subnet)
×
754
                        }
×
755
                } else {
1✔
756
                        // If ingress is an IP address
1✔
757
                        ip := net.ParseIP(ingress)
1✔
758
                        if ip != nil && ip.To4() != nil {
2✔
759
                                subnet := apicExtNetCreate(enDn, ingress, true, false, sharedSecurity)
1✔
760
                                en.AddChild(subnet)
1✔
761
                        } else if ip != nil && ip.To16() != nil {
1✔
762
                                subnet := apicExtNetCreate(enDn, ingress, false, false, sharedSecurity)
×
763
                                en.AddChild(subnet)
×
764
                        }
×
765
                }
766
        }
767
        return en
1✔
768
}
769

770
func apicDefaultEgCons(conName string, tenantName string,
UNCOV
771
        appProfile string, epg string) apicapi.ApicObject {
×
UNCOV
772
        enDn := fmt.Sprintf("uni/tn-%s/ap-%s/epg-%s", tenantName, appProfile, epg)
×
UNCOV
773
        return apicapi.NewFvRsCons(enDn, conName)
×
774
}
×
775

776
func apicExtNetCons(conName string, tenantName string,
777
        l3Out string, net string) apicapi.ApicObject {
1✔
778
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
779
        return apicapi.NewFvRsCons(enDn, conName)
1✔
780
}
1✔
781

782
func apicExtNetProv(conName string, tenantName string,
783
        l3Out string, net string) apicapi.ApicObject {
1✔
784
        enDn := fmt.Sprintf("uni/tn-%s/out-%s/instP-%s", tenantName, l3Out, net)
1✔
785
        return apicapi.NewFvRsProv(enDn, conName)
1✔
786
}
1✔
787

788
// Helper function to check if a string item exists in a slice
789
func stringInSlice(str string, list []string) bool {
1✔
790
        for _, v := range list {
2✔
791
                if v == str {
2✔
792
                        return true
1✔
793
                }
1✔
794
        }
UNCOV
795
        return false
×
796
}
797

798
func validScope(scope string) bool {
1✔
799
        validValues := []string{"", "context", "tenant", "global"}
1✔
800
        return stringInSlice(scope, validValues)
1✔
801
}
1✔
802

UNCOV
803
func (cont *AciController) getGraphNameFromContract(name, tenantName string) (string, error) {
×
UNCOV
804
        var graphName string
×
UNCOV
805
        args := []string{
×
UNCOV
806
                "query-target=subtree",
×
UNCOV
807
        }
×
UNCOV
808
        url := fmt.Sprintf("/api/node/mo/uni/tn-%s/brc-%s.json?%s", tenantName, name, strings.Join(args, "&"))
×
UNCOV
809
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
UNCOV
810
        if err != nil {
×
UNCOV
811
                cont.log.Debug("Failed to get APIC response, err: ", err.Error())
×
UNCOV
812
                return graphName, err
×
UNCOV
813
        }
×
UNCOV
814
        for _, obj := range apicresp.Imdata {
×
UNCOV
815
                for class, body := range obj {
×
UNCOV
816
                        if class == "vzRsSubjGraphAtt" {
×
UNCOV
817
                                tnVnsAbsGraphName, ok := body.Attributes["tnVnsAbsGraphName"].(string)
×
UNCOV
818
                                if ok {
×
UNCOV
819
                                        graphName = tnVnsAbsGraphName
×
UNCOV
820
                                }
×
UNCOV
821
                                break
×
822
                        }
823
                }
824
        }
UNCOV
825
        cont.log.Debug("graphName: ", graphName)
×
UNCOV
826
        return graphName, err
×
827
}
828

829
func apicContract(conName string, tenantName string,
830
        graphName string, scopeName string, isSnatPbrFltrChain bool,
831
        customSGAnnot bool) apicapi.ApicObject {
1✔
832
        con := apicapi.NewVzBrCP(tenantName, conName)
1✔
833
        if scopeName != "" && scopeName != "context" {
2✔
834
                con.SetAttr("scope", scopeName)
1✔
835
        }
1✔
836
        cs := apicapi.NewVzSubj(con.GetDn(), "loadbalancedservice")
1✔
837
        csDn := cs.GetDn()
1✔
838
        if isSnatPbrFltrChain {
2✔
839
                cs.SetAttr("revFltPorts", "no")
1✔
840
                inTerm := apicapi.NewVzInTerm(csDn)
1✔
841
                outTerm := apicapi.NewVzOutTerm(csDn)
1✔
842
                inTerm.AddChild(apicapi.NewVzRsInTermGraphAtt(inTerm.GetDn(), graphName))
1✔
843
                inTerm.AddChild(apicapi.NewVzRsFiltAtt(inTerm.GetDn(), conName+"_fromCons-toProv"))
1✔
844
                outTerm.AddChild(apicapi.NewVzRsOutTermGraphAtt(outTerm.GetDn(), graphName))
1✔
845
                outTerm.AddChild(apicapi.NewVzRsFiltAtt(outTerm.GetDn(), conName+"_fromProv-toCons"))
1✔
846
                cs.AddChild(inTerm)
1✔
847
                cs.AddChild(outTerm)
1✔
848
        } else {
2✔
849
                cs.AddChild(apicapi.NewVzRsSubjGraphAtt(csDn, graphName, customSGAnnot))
1✔
850
                cs.AddChild(apicapi.NewVzRsSubjFiltAtt(csDn, conName))
1✔
851
        }
1✔
852
        con.AddChild(cs)
1✔
853
        return con
1✔
854
}
855

856
func apicDevCtx(name string, tenantName string,
857
        graphName string, deviceName string, bdName string, rpDn string, isSnatPbrFltrChain bool) apicapi.ApicObject {
1✔
858
        cc := apicapi.NewVnsLDevCtx(tenantName, name, graphName, "loadbalancer")
1✔
859
        ccDn := cc.GetDn()
1✔
860
        graphDn := fmt.Sprintf("uni/tn-%s/lDevVip-%s", tenantName, deviceName)
1✔
861
        lifDn := fmt.Sprintf("%s/lIf-%s", graphDn, "interface")
1✔
862
        bdDn := fmt.Sprintf("uni/tn-%s/BD-%s", tenantName, bdName)
1✔
863
        cc.AddChild(apicapi.NewVnsRsLDevCtxToLDev(ccDn, graphDn))
1✔
864
        rpDnBase := rpDn
1✔
865
        for _, ctxConn := range []string{"consumer", "provider"} {
2✔
866
                lifCtx := apicapi.NewVnsLIfCtx(ccDn, ctxConn)
1✔
867
                if isSnatPbrFltrChain {
2✔
868
                        if ctxConn == "consumer" {
2✔
869
                                rpDn = rpDnBase + "_Cons"
1✔
870
                        } else {
2✔
871
                                rpDn = rpDnBase + "_Prov"
1✔
872
                        }
1✔
873
                }
874
                lifCtxDn := lifCtx.GetDn()
1✔
875
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToSvcRedirectPol(lifCtxDn, rpDn))
1✔
876
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToBD(lifCtxDn, bdDn))
1✔
877
                lifCtx.AddChild(apicapi.NewVnsRsLIfCtxToLIf(lifCtxDn, lifDn))
1✔
878
                cc.AddChild(lifCtx)
1✔
879
        }
880
        return cc
1✔
881
}
882

883
func apicFilterEntry(filterDn string, count string, p_start string,
884
        p_end string, protocol string, stateful string, snat bool, outTerm bool) apicapi.ApicObject {
1✔
885
        fe := apicapi.NewVzEntry(filterDn, count)
1✔
886
        fe.SetAttr("etherT", "ip")
1✔
887
        fe.SetAttr("prot", protocol)
1✔
888
        if snat {
2✔
889
                if outTerm {
2✔
890
                        if protocol == "tcp" {
2✔
891
                                fe.SetAttr("tcpRules", "est")
1✔
892
                        }
1✔
893
                        // Reverse the ports for outTerm
894
                        fe.SetAttr("dFromPort", p_start)
1✔
895
                        fe.SetAttr("dToPort", p_end)
1✔
896
                } else {
1✔
897
                        fe.SetAttr("sFromPort", p_start)
1✔
898
                        fe.SetAttr("sToPort", p_end)
1✔
899
                }
1✔
900
        } else {
1✔
901
                fe.SetAttr("dFromPort", p_start)
1✔
902
                fe.SetAttr("dToPort", p_end)
1✔
903
        }
1✔
904
        fe.SetAttr("stateful", stateful)
1✔
905
        return fe
1✔
906
}
907
func apicFilter(name string, tenantName string,
908
        portSpec []v1.ServicePort, snat bool, snatRange portRangeSnat) apicapi.ApicObject {
1✔
909
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
910
        filterDn := filter.GetDn()
1✔
911

1✔
912
        var i int
1✔
913
        var port v1.ServicePort
1✔
914
        for i, port = range portSpec {
2✔
915
                pstr := strconv.Itoa(int(port.Port))
1✔
916
                proto := getProtocolStr(port.Protocol)
1✔
917
                fe := apicFilterEntry(filterDn, strconv.Itoa(i), pstr,
1✔
918
                        pstr, proto, "no", false, false)
1✔
919
                filter.AddChild(fe)
1✔
920
        }
1✔
921

922
        if snat {
1✔
923
                portSpec := []portRangeSnat{snatRange}
×
924
                p_start := strconv.Itoa(portSpec[0].start)
×
925
                p_end := strconv.Itoa(portSpec[0].end)
×
UNCOV
926

×
UNCOV
927
                fe1 := apicFilterEntry(filterDn, strconv.Itoa(i+1), p_start,
×
UNCOV
928
                        p_end, "tcp", "no", false, false)
×
UNCOV
929
                filter.AddChild(fe1)
×
UNCOV
930
                fe2 := apicFilterEntry(filterDn, strconv.Itoa(i+2), p_start,
×
UNCOV
931
                        p_end, "udp", "no", false, false)
×
UNCOV
932
                filter.AddChild(fe2)
×
UNCOV
933
        }
×
934
        return filter
1✔
935
}
936

937
func apicFilterSnat(name string, tenantName string,
938
        portSpec []portRangeSnat, outTerm bool) apicapi.ApicObject {
1✔
939
        filter := apicapi.NewVzFilter(tenantName, name)
1✔
940
        filterDn := filter.GetDn()
1✔
941

1✔
942
        p_start := strconv.Itoa(portSpec[0].start)
1✔
943
        p_end := strconv.Itoa(portSpec[0].end)
1✔
944

1✔
945
        fe := apicFilterEntry(filterDn, "0", p_start,
1✔
946
                p_end, "tcp", "no", true, outTerm)
1✔
947
        filter.AddChild(fe)
1✔
948
        fe1 := apicFilterEntry(filterDn, "1", p_start,
1✔
949
                p_end, "udp", "no", true, outTerm)
1✔
950
        filter.AddChild(fe1)
1✔
951

1✔
952
        return filter
1✔
953
}
1✔
954

955
func (cont *AciController) updateServiceDeviceInstance(key string,
956
        service *v1.Service) error {
1✔
957
        cont.indexMutex.Lock()
1✔
958
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
959
        cont.serviceEndPoints.GetnodesMetadata(key, service, nodeMap)
1✔
960
        cont.indexMutex.Unlock()
1✔
961

1✔
962
        var nodes []string
1✔
963
        for node := range nodeMap {
2✔
964
                nodes = append(nodes, node)
1✔
965
        }
1✔
966
        sort.Strings(nodes)
1✔
967
        name := cont.aciNameForKey("svc", key)
1✔
968
        var conScope string
1✔
969
        scopeVal, ok := service.ObjectMeta.Annotations[metadata.ServiceContractScopeAnnotation]
1✔
970
        if ok {
2✔
971
                normScopeVal := strings.ToLower(scopeVal)
1✔
972
                if !validScope(normScopeVal) {
1✔
UNCOV
973
                        errString := "Invalid service contract scope value provided " + scopeVal
×
UNCOV
974
                        err := errors.New(errString)
×
UNCOV
975
                        serviceLogger(cont.log, service).Error("Could not create contract: ", err)
×
UNCOV
976
                        return err
×
977
                } else {
1✔
978
                        conScope = normScopeVal
1✔
979
                }
1✔
980
        } else {
1✔
981
                conScope = DefaultServiceContractScope
1✔
982
        }
1✔
983

984
        var sharedSecurity bool
1✔
985
        if conScope == "global" {
2✔
986
                sharedSecurity = true
1✔
987
        } else {
2✔
988
                sharedSecurity = DefaultServiceExtSubNetShared
1✔
989
        }
1✔
990

991
        graphName := cont.aciNameForKey("svc", "global")
1✔
992
        deviceName := cont.aciNameForKey("svc", "global")
1✔
993
        _, customSGAnnPresent := service.ObjectMeta.Annotations[metadata.ServiceGraphNameAnnotation]
1✔
994
        if customSGAnnPresent {
1✔
995
                customSG, err := cont.getGraphNameFromContract(name, cont.config.AciVrfTenant)
×
996
                if err == nil {
×
997
                        graphName = customSG
×
998
                }
×
999
        }
1000
        cont.log.Debug("Using service graph ", graphName, " for service ", key)
1✔
1001

1✔
1002
        var serviceObjs apicapi.ApicSlice
1✔
1003
        if len(nodes) > 0 {
2✔
1004
                // 1. Service redirect policy
1✔
1005
                // The service redirect policy contains the MAC address
1✔
1006
                // and IP address of each of the service endpoints for
1✔
1007
                // each node that hosts a pod for this service.  The
1✔
1008
                // example below shows the case of two nodes.
1✔
1009
                rp, rpDn :=
1✔
1010
                        cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
1✔
1011
                                nodeMap, cont.staticMonPolDn(), cont.config.AciPbrTrackingNonSnat)
1✔
1012
                serviceObjs = append(serviceObjs, rp)
1✔
1013

1✔
1014
                // 2. Service graph contract and external network
1✔
1015
                // The service graph contract must be bound to the service
1✔
1016
                // graph.  This contract must be consumed by the default
1✔
1017
                // layer 3 network and provided by the service layer 3
1✔
1018
                // network.
1✔
1019
                {
2✔
1020
                        var ingresses []string
1✔
1021
                        for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
1022
                                ingresses = append(ingresses, ingress.IP)
1✔
1023
                        }
1✔
1024
                        serviceObjs = append(serviceObjs,
1✔
1025
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
1026
                                        cont.config.AciL3Out, ingresses, sharedSecurity, false))
1✔
1027
                }
1028

1029
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, false, customSGAnnPresent)
1✔
1030
                serviceObjs = append(serviceObjs, contract)
1✔
1031
                for _, net := range cont.config.AciExtNetworks {
2✔
1032
                        serviceObjs = append(serviceObjs,
1✔
1033
                                apicExtNetCons(name, cont.config.AciVrfTenant,
1✔
1034
                                        cont.config.AciL3Out, net))
1✔
1035
                }
1✔
1036

1037
                if cont.config.AddExternalContractToDefaultEPG && service.Spec.Type == v1.ServiceTypeLoadBalancer {
1✔
UNCOV
1038
                        defaultEpgTenant := cont.config.DefaultEg.PolicySpace
×
UNCOV
1039
                        defaultEpgStringSplit := strings.Split(cont.config.DefaultEg.Name, "|")
×
UNCOV
1040
                        var defaultEpgName, appProfile string
×
UNCOV
1041
                        if len(defaultEpgStringSplit) > 1 {
×
UNCOV
1042
                                appProfile = defaultEpgStringSplit[0]
×
UNCOV
1043
                                defaultEpgName = defaultEpgStringSplit[1]
×
UNCOV
1044
                        } else {
×
UNCOV
1045
                                appProfile = cont.config.AppProfile
×
UNCOV
1046
                                defaultEpgName = defaultEpgStringSplit[0]
×
1047
                        }
×
UNCOV
1048
                        serviceObjs = append(serviceObjs,
×
UNCOV
1049
                                apicDefaultEgCons(name, defaultEpgTenant, appProfile, defaultEpgName))
×
1050
                }
1051

1052
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
1053
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
1054

1✔
1055
                _, snat := cont.snatServices[key]
1✔
1056
                filter := apicFilter(name, cont.config.AciVrfTenant,
1✔
1057
                        service.Spec.Ports, snat, defaultPortRange)
1✔
1058
                serviceObjs = append(serviceObjs, filter)
1✔
1059

1✔
1060
                // 3. Device cluster context
1✔
1061
                // The logical device context binds the service contract
1✔
1062
                // to the redirect policy and the device cluster and
1✔
1063
                // bridge domain for the device cluster.
1✔
1064
                serviceObjs = append(serviceObjs,
1✔
1065
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName, deviceName,
1✔
1066
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, false))
1✔
1067
        }
1068

1069
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1070
        return nil
1✔
1071
}
1072

1073
func (cont *AciController) updateServiceDeviceInstanceSnat(key string) error {
1✔
1074
        nodeList := cont.nodeIndexer.List()
1✔
1075
        cont.indexMutex.Lock()
1✔
1076
        if len(cont.nodeServiceMetaCache) == 0 {
2✔
1077
                cont.indexMutex.Unlock()
1✔
1078
                return nil
1✔
1079
        }
1✔
1080
        nodeMap := make(map[string]*metadata.ServiceEndpoint)
1✔
1081
        sort.Slice(nodeList, func(i, j int) bool {
2✔
1082
                nodeA := nodeList[i].(*v1.Node)
1✔
1083
                nodeB := nodeList[j].(*v1.Node)
1✔
1084
                return nodeA.ObjectMeta.Name < nodeB.ObjectMeta.Name
1✔
1085
        })
1✔
1086
        for itr, nodeItem := range nodeList {
2✔
1087
                if itr == cont.config.MaxSvcGraphNodes {
1✔
UNCOV
1088
                        break
×
1089
                }
1090
                node := nodeItem.(*v1.Node)
1✔
1091
                nodeName := node.ObjectMeta.Name
1✔
1092
                nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
1093
                if !ok {
2✔
1094
                        continue
1✔
1095
                }
1096
                _, ok = cont.fabricPathForNode(nodeName)
1✔
1097
                if !ok {
1✔
UNCOV
1098
                        continue
×
1099
                }
1100
                nodeLabels := node.ObjectMeta.Labels
1✔
1101
                excludeNode := cont.nodeLabelsInExcludeList(nodeLabels)
1✔
1102
                if excludeNode {
1✔
UNCOV
1103
                        continue
×
1104
                }
1105
                nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
1106
        }
1107
        cont.indexMutex.Unlock()
1✔
1108

1✔
1109
        var nodes []string
1✔
1110
        for node := range nodeMap {
2✔
1111
                nodes = append(nodes, node)
1✔
1112
        }
1✔
1113
        sort.Strings(nodes)
1✔
1114
        name := cont.aciNameForKey("snat", key)
1✔
1115
        var conScope = cont.config.SnatSvcContractScope
1✔
1116
        sharedSecurity := true
1✔
1117

1✔
1118
        graphName := cont.aciNameForKey("svc", "global")
1✔
1119
        var serviceObjs apicapi.ApicSlice
1✔
1120
        if len(nodes) > 0 {
2✔
1121
                // 1. Service redirect policy
1✔
1122
                // The service redirect policy contains the MAC address
1✔
1123
                // and IP address of each of the service endpoints for
1✔
1124
                // each node that hosts a pod for this service.
1✔
1125
                // For SNAT with the introduction of filter-chain usage, to work-around
1✔
1126
                // an APIC limitation, creating two PBR policies with same nodes.
1✔
1127
                var rpDn string
1✔
1128
                var rp apicapi.ApicObject
1✔
1129
                if cont.apicConn.SnatPbrFltrChain {
2✔
1130
                        rpCons, rpDnCons :=
1✔
1131
                                cont.apicRedirectPol(name+"_Cons", cont.config.AciVrfTenant, nodes,
1✔
1132
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
1133
                        serviceObjs = append(serviceObjs, rpCons)
1✔
1134
                        rpProv, _ :=
1✔
1135
                                cont.apicRedirectPol(name+"_Prov", cont.config.AciVrfTenant, nodes,
1✔
1136
                                        nodeMap, cont.staticMonPolDn(), true)
1✔
1137
                        serviceObjs = append(serviceObjs, rpProv)
1✔
1138
                        rpDn = strings.TrimSuffix(rpDnCons, "_Cons")
1✔
1139
                } else {
1✔
UNCOV
1140
                        rp, rpDn =
×
UNCOV
1141
                                cont.apicRedirectPol(name, cont.config.AciVrfTenant, nodes,
×
UNCOV
1142
                                        nodeMap, cont.staticMonPolDn(), true)
×
UNCOV
1143
                        serviceObjs = append(serviceObjs, rp)
×
UNCOV
1144
                }
×
1145
                // 2. Service graph contract and external network
1146
                // The service graph contract must be bound to the
1147
                // service
1148
                // graph.  This contract must be consumed by the default
1149
                // layer 3 network and provided by the service layer 3
1150
                // network.
1151
                {
1✔
1152
                        var ingresses []string
1✔
1153
                        for _, policy := range cont.snatPolicyCache {
2✔
1154
                                ingresses = append(ingresses, policy.SnatIp...)
1✔
1155
                        }
1✔
1156
                        serviceObjs = append(serviceObjs,
1✔
1157
                                apicExtNet(name, cont.config.AciVrfTenant,
1✔
1158
                                        cont.config.AciL3Out, ingresses, sharedSecurity, true))
1✔
1159
                }
1160

1161
                contract := apicContract(name, cont.config.AciVrfTenant, graphName, conScope, cont.apicConn.SnatPbrFltrChain, false)
1✔
1162
                serviceObjs = append(serviceObjs, contract)
1✔
1163

1✔
1164
                for _, net := range cont.config.AciExtNetworks {
2✔
1165
                        serviceObjs = append(serviceObjs,
1✔
1166
                                apicExtNetProv(name, cont.config.AciVrfTenant,
1✔
1167
                                        cont.config.AciL3Out, net))
1✔
1168
                }
1✔
1169

1170
                defaultPortRange := portRangeSnat{start: cont.config.SnatDefaultPortRangeStart,
1✔
1171
                        end: cont.config.SnatDefaultPortRangeEnd}
1✔
1172
                portSpec := []portRangeSnat{defaultPortRange}
1✔
1173
                if cont.apicConn.SnatPbrFltrChain {
2✔
1174
                        filterIn := apicFilterSnat(name+"_fromCons-toProv", cont.config.AciVrfTenant, portSpec, false)
1✔
1175
                        serviceObjs = append(serviceObjs, filterIn)
1✔
1176
                        filterOut := apicFilterSnat(name+"_fromProv-toCons", cont.config.AciVrfTenant, portSpec, true)
1✔
1177
                        serviceObjs = append(serviceObjs, filterOut)
1✔
1178
                } else {
1✔
UNCOV
1179
                        filter := apicFilterSnat(name, cont.config.AciVrfTenant, portSpec, false)
×
UNCOV
1180
                        serviceObjs = append(serviceObjs, filter)
×
UNCOV
1181
                }
×
1182
                // 3. Device cluster context
1183
                // The logical device context binds the service contract
1184
                // to the redirect policy and the device cluster and
1185
                // bridge domain for the device cluster.
1186
                serviceObjs = append(serviceObjs,
1✔
1187
                        apicDevCtx(name, cont.config.AciVrfTenant, graphName, graphName,
1✔
1188
                                cont.aciNameForKey("bd", cont.env.ServiceBd()), rpDn, cont.apicConn.SnatPbrFltrChain))
1✔
1189
        }
1190
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1191
        return nil
1✔
1192
}
1193

1194
func (cont *AciController) nodeLabelsInExcludeList(Labels map[string]string) bool {
1✔
1195
        nodeSnatRedirectExclude := cont.config.NodeSnatRedirectExclude
1✔
1196

1✔
1197
        for _, nodeGroup := range nodeSnatRedirectExclude {
1✔
UNCOV
1198
                if len(nodeGroup.Labels) == 0 {
×
UNCOV
1199
                        continue
×
1200
                }
UNCOV
1201
                matchFound := true
×
UNCOV
1202
                for _, label := range nodeGroup.Labels {
×
UNCOV
1203
                        if _, ok := Labels["node-role.kubernetes.io/"+label]; !ok {
×
UNCOV
1204
                                matchFound = false
×
UNCOV
1205
                                break
×
1206
                        }
1207
                }
UNCOV
1208
                if matchFound {
×
UNCOV
1209
                        return true
×
UNCOV
1210
                }
×
1211
        }
1212
        return false
1✔
1213
}
1214

1215
func (cont *AciController) queueServiceUpdateByKey(key string) {
1✔
1216
        cont.serviceQueue.Add(key)
1✔
1217
}
1✔
1218

1219
func (cont *AciController) queueServiceUpdate(service *v1.Service) {
1✔
1220
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
1221
        if err != nil {
1✔
UNCOV
1222
                serviceLogger(cont.log, service).
×
UNCOV
1223
                        Error("Could not create service key: ", err)
×
UNCOV
1224
                return
×
UNCOV
1225
        }
×
1226
        cont.serviceQueue.Add(key)
1✔
1227
}
1228

1229
func apicDeviceCluster(name string, vrfTenant string,
1230
        physDom string, encap string,
1231
        nodes []string, nodeMap map[string]string) (apicapi.ApicObject, string) {
1✔
1232
        dc := apicapi.NewVnsLDevVip(vrfTenant, name)
1✔
1233
        dc.SetAttr("managed", "no")
1✔
1234
        dcDn := dc.GetDn()
1✔
1235
        dc.AddChild(apicapi.NewVnsRsALDevToPhysDomP(dcDn,
1✔
1236
                fmt.Sprintf("uni/phys-%s", physDom)))
1✔
1237
        lif := apicapi.NewVnsLIf(dcDn, "interface")
1✔
1238
        lif.SetAttr("encap", encap)
1✔
1239
        lifDn := lif.GetDn()
1✔
1240

1✔
1241
        for _, node := range nodes {
2✔
1242
                path, ok := nodeMap[node]
1✔
1243
                if !ok {
1✔
UNCOV
1244
                        continue
×
1245
                }
1246

1247
                cdev := apicapi.NewVnsCDev(dcDn, node)
1✔
1248
                cif := apicapi.NewVnsCif(cdev.GetDn(), "interface")
1✔
1249
                cif.AddChild(apicapi.NewVnsRsCIfPathAtt(cif.GetDn(), path))
1✔
1250
                cdev.AddChild(cif)
1✔
1251
                lif.AddChild(apicapi.NewVnsRsCIfAttN(lifDn, cif.GetDn()))
1✔
1252
                dc.AddChild(cdev)
1✔
1253
        }
1254

1255
        dc.AddChild(lif)
1✔
1256

1✔
1257
        return dc, dcDn
1✔
1258
}
1259

1260
func apicServiceGraph(name string, tenantName string,
1261
        dcDn string) apicapi.ApicObject {
1✔
1262
        sg := apicapi.NewVnsAbsGraph(tenantName, name)
1✔
1263
        sgDn := sg.GetDn()
1✔
1264
        var provDn string
1✔
1265
        var consDn string
1✔
1266
        var cTermDn string
1✔
1267
        var pTermDn string
1✔
1268
        {
2✔
1269
                an := apicapi.NewVnsAbsNode(sgDn, "loadbalancer")
1✔
1270
                an.SetAttr("managed", "no")
1✔
1271
                an.SetAttr("routingMode", "Redirect")
1✔
1272
                anDn := an.GetDn()
1✔
1273
                cons := apicapi.NewVnsAbsFuncConn(anDn, "consumer")
1✔
1274
                consDn = cons.GetDn()
1✔
1275
                an.AddChild(cons)
1✔
1276
                prov := apicapi.NewVnsAbsFuncConn(anDn, "provider")
1✔
1277
                provDn = prov.GetDn()
1✔
1278
                an.AddChild(prov)
1✔
1279
                an.AddChild(apicapi.NewVnsRsNodeToLDev(anDn, dcDn))
1✔
1280
                sg.AddChild(an)
1✔
1281
        }
1✔
1282
        {
1✔
1283
                tnc := apicapi.NewVnsAbsTermNodeCon(sgDn, "T1")
1✔
1284
                tncDn := tnc.GetDn()
1✔
1285
                cTerm := apicapi.NewVnsAbsTermConn(tncDn)
1✔
1286
                cTermDn = cTerm.GetDn()
1✔
1287
                tnc.AddChild(cTerm)
1✔
1288
                tnc.AddChild(apicapi.NewVnsInTerm(tncDn))
1✔
1289
                tnc.AddChild(apicapi.NewVnsOutTerm(tncDn))
1✔
1290
                sg.AddChild(tnc)
1✔
1291
        }
1✔
1292
        {
1✔
1293
                tnp := apicapi.NewVnsAbsTermNodeProv(sgDn, "T2")
1✔
1294
                tnpDn := tnp.GetDn()
1✔
1295
                pTerm := apicapi.NewVnsAbsTermConn(tnpDn)
1✔
1296
                pTermDn = pTerm.GetDn()
1✔
1297
                tnp.AddChild(pTerm)
1✔
1298
                tnp.AddChild(apicapi.NewVnsInTerm(tnpDn))
1✔
1299
                tnp.AddChild(apicapi.NewVnsOutTerm(tnpDn))
1✔
1300
                sg.AddChild(tnp)
1✔
1301
        }
1✔
1302
        {
1✔
1303
                acc := apicapi.NewVnsAbsConnection(sgDn, "C1")
1✔
1304
                acc.SetAttr("connDir", "provider")
1✔
1305
                accDn := acc.GetDn()
1✔
1306
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, consDn))
1✔
1307
                acc.AddChild(apicapi.NewVnsRsAbsConnectionConns(accDn, cTermDn))
1✔
1308
                sg.AddChild(acc)
1✔
1309
        }
1✔
1310
        {
1✔
1311
                acp := apicapi.NewVnsAbsConnection(sgDn, "C2")
1✔
1312
                acp.SetAttr("connDir", "provider")
1✔
1313
                acpDn := acp.GetDn()
1✔
1314
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, provDn))
1✔
1315
                acp.AddChild(apicapi.NewVnsRsAbsConnectionConns(acpDn, pTermDn))
1✔
1316
                sg.AddChild(acp)
1✔
1317
        }
1✔
1318
        return sg
1✔
1319
}
1320
func (cont *AciController) updateDeviceCluster() {
1✔
1321
        nodeMap := make(map[string]string)
1✔
1322
        cont.indexMutex.Lock()
1✔
1323
        for node := range cont.nodeOpflexDevice {
2✔
1324
                cont.log.Debug("Processing node in nodeOpflexDevice cache : ", node)
1✔
1325
                fabricPath, ok := cont.fabricPathForNode(node)
1✔
1326
                if !ok {
2✔
1327
                        continue
1✔
1328
                }
1329
                nodeMap[node] = fabricPath
1✔
1330
        }
1331

1332
        // For clusters other than OpenShift On OpenStack,
1333
        // openStackFabricPathDnMap will be empty
1334
        for host, opflexOdevInfo := range cont.openStackFabricPathDnMap {
1✔
1335
                nodeMap[host] = opflexOdevInfo.fabricPathDn
×
1336
        }
×
1337

1338
        // For OpenShift On OpenStack clusters,
1339
        // hostFabricPathDnMap will be empty
1340
        for _, hostInfo := range cont.hostFabricPathDnMap {
1✔
1341
                if hostInfo.fabricPathDn != "" {
×
1342
                        nodeMap[hostInfo.host] = hostInfo.fabricPathDn
×
1343
                }
×
1344
        }
1345
        cont.indexMutex.Unlock()
1✔
1346

1✔
1347
        var nodes []string
1✔
1348
        for node := range nodeMap {
2✔
1349
                nodes = append(nodes, node)
1✔
1350
        }
1✔
1351
        sort.Strings(nodes)
1✔
1352

1✔
1353
        name := cont.aciNameForKey("svc", "global")
1✔
1354
        var serviceObjs apicapi.ApicSlice
1✔
1355

1✔
1356
        // 1. Device cluster:
1✔
1357
        // The device cluster is a set of physical paths that need to be
1✔
1358
        // created for each node in the cluster, that correspond to the
1✔
1359
        // service interface for each node.
1✔
1360
        dc, dcDn := apicDeviceCluster(name, cont.config.AciVrfTenant,
1✔
1361
                cont.config.AciServicePhysDom, cont.config.AciServiceEncap,
1✔
1362
                nodes, nodeMap)
1✔
1363
        serviceObjs = append(serviceObjs, dc)
1✔
1364

1✔
1365
        // 2. Service graph template
1✔
1366
        // The service graph controls how the traffic will be redirected.
1✔
1367
        // A service graph must be created for each device cluster.
1✔
1368
        serviceObjs = append(serviceObjs,
1✔
1369
                apicServiceGraph(name, cont.config.AciVrfTenant, dcDn))
1✔
1370

1✔
1371
        cont.apicConn.WriteApicObjects(name, serviceObjs)
1✔
1372
}
1373

1374
func (cont *AciController) fabricPathLogger(node string,
1375
        obj apicapi.ApicObject) *logrus.Entry {
1✔
1376
        return cont.log.WithFields(logrus.Fields{
1✔
1377
                "fabricPath": obj.GetAttr("fabricPathDn"),
1✔
1378
                "mac":        obj.GetAttr("mac"),
1✔
1379
                "node":       node,
1✔
1380
                "obj":        obj,
1✔
1381
        })
1✔
1382
}
1✔
1383

1384
func (cont *AciController) setOpenStackSystemId() string {
×
1385

×
1386
        // 1) get opflexIDEp with containerName == <node name of any one of the openshift nodes>
×
1387
        // 2) extract OpenStack system id from compHvDn attribute
×
1388
        //    comp/prov-OpenStack/ctrlr-[k8s-scale]-k8s-scale/hv-overcloud-novacompute-0 - sample compHvDn,
×
1389
        //    where k8s-scale is the system id
×
1390

×
1391
        var systemId string
×
1392
        nodeList := cont.nodeIndexer.List()
×
1393
        if len(nodeList) < 1 {
×
1394
                return systemId
×
1395
        }
×
1396
        node := nodeList[0].(*v1.Node)
×
1397
        nodeName := node.ObjectMeta.Name
×
1398
        opflexIDEpFilter := fmt.Sprintf("query-target-filter=and(eq(opflexIDEp.containerName,\"%s\"))", nodeName)
×
1399
        opflexIDEpArgs := []string{
×
1400
                opflexIDEpFilter,
×
1401
        }
×
1402
        url := fmt.Sprintf("/api/node/class/opflexIDEp.json?%s", strings.Join(opflexIDEpArgs, "&"))
×
1403
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
1404
        if err != nil {
×
1405
                cont.log.Error("Failed to get APIC response, err: ", err.Error())
×
1406
                return systemId
×
1407
        }
×
1408
        for _, obj := range apicresp.Imdata {
×
1409
                for _, body := range obj {
×
1410
                        compHvDn, ok := body.Attributes["compHvDn"].(string)
×
UNCOV
1411
                        if ok {
×
UNCOV
1412
                                systemId = compHvDn[strings.IndexByte(compHvDn, '[')+1 : strings.IndexByte(compHvDn, ']')]
×
1413
                                break
×
1414
                        }
1415
                }
1416
        }
1417
        cont.indexMutex.Lock()
×
1418
        cont.openStackSystemId = systemId
×
1419
        cont.log.Info("Setting OpenStack system id : ", cont.openStackSystemId)
×
1420
        cont.indexMutex.Unlock()
×
1421
        return systemId
×
1422
}
1423

1424
// Returns true when a new OpenStack opflexODev is added
1425
func (cont *AciController) openStackOpflexOdevUpdate(obj apicapi.ApicObject) bool {
×
1426

×
1427
        // If opflexOdev compHvDn contains comp/prov-OpenShift/ctrlr-[<systemid>]-<systemid>,
×
1428
        // it means that it is an OpenStack OpflexOdev which belongs to OpenStack with system id <systemid>
×
1429

×
1430
        var deviceClusterUpdate bool
×
1431
        compHvDn := obj.GetAttrStr("compHvDn")
×
1432
        if strings.Contains(compHvDn, "prov-OpenStack") {
×
1433
                cont.indexMutex.Lock()
×
1434
                systemId := cont.openStackSystemId
×
1435
                cont.indexMutex.Unlock()
×
1436
                if systemId == "" {
×
1437
                        systemId = cont.setOpenStackSystemId()
×
1438
                }
×
1439
                if systemId == "" {
×
1440
                        cont.log.Error("Failed  to get OpenStack system id")
×
UNCOV
1441
                        return deviceClusterUpdate
×
UNCOV
1442
                }
×
1443
                prefix := fmt.Sprintf("comp/prov-OpenStack/ctrlr-[%s]-%s", systemId, systemId)
×
1444
                if strings.Contains(compHvDn, prefix) {
×
1445
                        cont.log.Info("Received notification for OpenStack opflexODev update, hostName: ",
×
1446
                                obj.GetAttrStr("hostName"), " dn: ", obj.GetAttrStr("dn"))
×
1447
                        cont.indexMutex.Lock()
×
1448
                        opflexOdevInfo, ok := cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")]
×
1449
                        if ok {
×
1450
                                opflexOdevInfo.opflexODevDn[obj.GetAttrStr("dn")] = struct{}{}
×
1451
                                cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")] = opflexOdevInfo
×
1452
                        } else {
×
1453
                                var openstackopflexodevinfo openstackOpflexOdevInfo
×
1454
                                opflexODevDn := make(map[string]struct{})
×
1455
                                opflexODevDn[obj.GetAttrStr("dn")] = struct{}{}
×
UNCOV
1456
                                openstackopflexodevinfo.fabricPathDn = obj.GetAttrStr("fabricPathDn")
×
UNCOV
1457
                                openstackopflexodevinfo.opflexODevDn = opflexODevDn
×
1458
                                cont.openStackFabricPathDnMap[obj.GetAttrStr("hostName")] = openstackopflexodevinfo
×
1459
                                deviceClusterUpdate = true
×
1460
                        }
×
1461
                        cont.indexMutex.Unlock()
×
1462
                }
1463
        }
1464
        return deviceClusterUpdate
×
1465
}
1466

1467
func (cont *AciController) infraRtAttEntPDeleted(dn string) {
×
UNCOV
1468
        // dn format : uni/infra/attentp-k8s-scale-esxi-aaep/rtattEntP-[uni/infra/funcprof/accbundle-esxi1-vpc-ipg]
×
UNCOV
1469
        cont.log.Info("Processing delete of infraRtAttEntP: ", dn)
×
1470

×
1471
        // extract uni/infra/funcprof/accbundle-esxi1-vpc-ipg
×
1472
        re := regexp.MustCompile(`\[(.*?)\]`)
×
1473
        matches := re.FindStringSubmatch(dn)
×
1474

×
1475
        if len(matches) < 2 {
×
1476
                cont.log.Error("Failed to extract ipg from dn : ", dn)
×
1477
                return
×
1478
        }
×
1479
        tdn := matches[1]
×
1480

×
1481
        cont.indexMutex.Lock()
×
1482
        _, ok := cont.hostFabricPathDnMap[tdn]
×
1483
        if ok {
×
1484
                delete(cont.hostFabricPathDnMap, tdn)
×
1485
                cont.log.Info("Deleted ipg : ", tdn)
×
1486
        }
×
1487
        cont.indexMutex.Unlock()
×
1488

×
UNCOV
1489
        if ok {
×
1490
                cont.updateDeviceCluster()
×
1491
        }
×
1492
}
1493

1494
func (cont *AciController) vpcIfDeleted(dn string) {
×
1495
        var deleted bool
×
1496
        cont.indexMutex.Lock()
×
1497
        for tDn, hostInfo := range cont.hostFabricPathDnMap {
×
1498
                if _, present := hostInfo.vpcIfDn[dn]; present {
×
1499
                        cont.log.Info("Deleting vpcIf, dn :", dn)
×
1500
                        delete(hostInfo.vpcIfDn, dn)
×
1501
                        if len(hostInfo.vpcIfDn) == 0 {
×
1502
                                cont.log.Infof("Removing fabricPathDn(%s) of ipg : %s ", hostInfo.fabricPathDn, hostInfo.host)
×
1503
                                hostInfo.fabricPathDn = ""
×
1504
                                deleted = true
×
UNCOV
1505
                        }
×
1506
                        cont.hostFabricPathDnMap[tDn] = hostInfo
×
1507
                }
1508
        }
UNCOV
1509
        cont.indexMutex.Unlock()
×
1510
        if deleted {
×
1511
                cont.updateDeviceCluster()
×
1512
        }
×
1513
}
1514

1515
func (cont *AciController) vpcIfChanged(obj apicapi.ApicObject) {
×
1516
        if cont.updateHostFabricPathDnMap(obj) {
×
1517
                cont.updateDeviceCluster()
×
1518
        }
×
1519
}
1520

1521
func (cont *AciController) updateHostFabricPathDnMap(obj apicapi.ApicObject) bool {
×
1522
        var accBndlGrpDn, fabricPathDn, dn string
×
1523
        for _, body := range obj {
×
1524
                var ok bool
×
1525
                accBndlGrpDn, ok = body.Attributes["accBndlGrpDn"].(string)
×
1526
                if !ok || (ok && accBndlGrpDn == "") {
×
1527
                        cont.log.Error("accBndlGrpDn missing/empty in vpcIf")
×
1528
                        return false
×
1529
                }
×
1530
                fabricPathDn, ok = body.Attributes["fabricPathDn"].(string)
×
UNCOV
1531
                if !ok && (ok && fabricPathDn == "") {
×
UNCOV
1532
                        cont.log.Error("fabricPathDn missing/empty in vpcIf")
×
1533
                        return false
×
1534
                }
×
1535
                dn, ok = body.Attributes["dn"].(string)
×
1536
                if !ok && (ok && dn == "") {
×
1537
                        cont.log.Error("dn missing/empty in vpcIf")
×
1538
                        return false
×
1539
                }
×
1540
        }
1541
        var updated bool
×
1542
        cont.indexMutex.Lock()
×
1543
        // If accBndlGrpDn exists in hostFabricPathDnMap, the vpcIf belongs to the cluster AEP
×
1544
        hostInfo, exists := cont.hostFabricPathDnMap[accBndlGrpDn]
×
1545
        if exists {
×
1546
                if _, present := hostInfo.vpcIfDn[dn]; !present {
×
1547
                        hostInfo.vpcIfDn[dn] = struct{}{}
×
1548
                        cont.log.Infof("vpcIf processing, dn : %s, accBndlGrpDn: %s", dn, accBndlGrpDn)
×
1549
                }
×
1550
                if hostInfo.fabricPathDn != fabricPathDn {
×
1551
                        hostInfo.fabricPathDn = fabricPathDn
×
1552
                        cont.log.Info("Updated fabricPathDn of ipg :", hostInfo.host, " to: ", hostInfo.fabricPathDn)
×
1553
                        updated = true
×
1554
                }
×
UNCOV
1555
                cont.hostFabricPathDnMap[accBndlGrpDn] = hostInfo
×
1556
        }
1557
        cont.indexMutex.Unlock()
×
1558
        return updated
×
1559
}
1560

UNCOV
1561
func (cont *AciController) infraRtAttEntPChanged(obj apicapi.ApicObject) {
×
1562
        var tdn string
×
1563
        for _, body := range obj {
×
1564
                var ok bool
×
1565
                tdn, ok = body.Attributes["tDn"].(string)
×
UNCOV
1566
                if !ok || (ok && tdn == "") {
×
UNCOV
1567
                        cont.log.Error("tDn missing/empty in infraRtAttEntP")
×
UNCOV
1568
                        return
×
UNCOV
1569
                }
×
1570
        }
UNCOV
1571
        var updated bool
×
UNCOV
1572
        cont.log.Info("infraRtAttEntP updated, tDn : ", tdn)
×
UNCOV
1573

×
1574
        // tdn format for vpc : /uni/infra/funcprof/accbundle-esxi1-vpc-ipg
×
1575
        // tdn format for single leaf : /uni/infra/funcprof/accportgrp-IPG_CLIENT_SIM
×
1576

×
1577
        // Ignore processing of single leaf
×
UNCOV
1578
        if !strings.Contains(tdn, "/accbundle-") {
×
UNCOV
1579
                cont.log.Info("Skipping processing of infraRtAttEntP update, not applicable for non-VPC configuration: ", tdn)
×
UNCOV
1580
                return
×
UNCOV
1581
        }
×
1582

1583
        // extract esxi1-vpc-ipg
UNCOV
1584
        parts := strings.Split(tdn, "/")
×
1585
        lastPart := parts[len(parts)-1]
×
1586
        host := strings.TrimPrefix(lastPart, "accbundle-")
×
1587

×
1588
        // adding entry for ipg in hostFabricPathDnMap
×
1589
        cont.indexMutex.Lock()
×
1590
        _, exists := cont.hostFabricPathDnMap[tdn]
×
UNCOV
1591
        if !exists {
×
1592
                var hostInfo hostFabricInfo
×
1593
                hostInfo.host = host
×
UNCOV
1594
                hostInfo.vpcIfDn = make(map[string]struct{})
×
UNCOV
1595
                cont.hostFabricPathDnMap[tdn] = hostInfo
×
UNCOV
1596
        }
×
UNCOV
1597
        cont.indexMutex.Unlock()
×
UNCOV
1598

×
UNCOV
1599
        accBndlGrpFilter := fmt.Sprintf(`query-target-filter=and(eq(vpcIf.accBndlGrpDn,"%s"))`, tdn)
×
UNCOV
1600
        url := fmt.Sprintf("/api/class/vpcIf.json?%s", accBndlGrpFilter)
×
UNCOV
1601
        apicresp, err := cont.apicConn.GetApicResponse(url)
×
UNCOV
1602
        if err != nil {
×
UNCOV
1603
                cont.log.Error("Failed to get APIC response, err: ", err.Error())
×
UNCOV
1604
                return
×
UNCOV
1605
        }
×
1606

UNCOV
1607
        for _, obj := range apicresp.Imdata {
×
UNCOV
1608
                if cont.updateHostFabricPathDnMap(obj) && !updated {
×
UNCOV
1609
                        updated = true
×
UNCOV
1610
                }
×
1611
        }
1612

UNCOV
1613
        if updated {
×
UNCOV
1614
                cont.updateDeviceCluster()
×
UNCOV
1615
        }
×
UNCOV
1616
        return
×
1617
}
1618

1619
func (cont *AciController) opflexDeviceChanged(obj apicapi.ApicObject) {
1✔
1620
        devType := obj.GetAttrStr("devType")
1✔
1621
        domName := obj.GetAttrStr("domName")
1✔
1622
        ctrlrName := obj.GetAttrStr("ctrlrName")
1✔
1623

1✔
1624
        if !cont.config.DisableServiceVlanPreprovisioning && strings.Contains(cont.config.Flavor, "openstack") {
1✔
UNCOV
1625
                if cont.openStackOpflexOdevUpdate(obj) {
×
UNCOV
1626
                        cont.log.Info("OpenStack opflexODev for ", obj.GetAttrStr("hostName"), " is added")
×
UNCOV
1627
                        cont.updateDeviceCluster()
×
UNCOV
1628
                }
×
1629
        }
1630
        if (devType == cont.env.OpFlexDeviceType()) && (domName == cont.config.AciVmmDomain) && (ctrlrName == cont.config.AciVmmController) {
2✔
1631
                cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Processing opflex device update")
1✔
1632
                if obj.GetAttrStr("state") == "disconnected" {
2✔
1633
                        cont.fabricPathLogger(obj.GetAttrStr("hostName"), obj).Debug("Opflex device disconnected")
1✔
1634
                        cont.indexMutex.Lock()
1✔
1635
                        for node, devices := range cont.nodeOpflexDevice {
1✔
UNCOV
1636
                                if node == obj.GetAttrStr("hostName") {
×
UNCOV
1637
                                        for _, device := range devices {
×
UNCOV
1638
                                                if device.GetDn() == obj.GetDn() {
×
UNCOV
1639
                                                        device.SetAttr("state", "disconnected")
×
UNCOV
1640
                                                        cont.fabricPathLogger(device.GetAttrStr("hostName"), device).Debug("Opflex device cache updated for disconnected node")
×
UNCOV
1641
                                                }
×
1642
                                        }
UNCOV
1643
                                        cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", devices)
×
UNCOV
1644
                                        break
×
1645
                                }
1646
                        }
1647
                        cont.indexMutex.Unlock()
1✔
1648
                        cont.updateDeviceCluster()
1✔
1649
                        return
1✔
1650
                }
1651
                var nodeUpdates []string
1✔
1652

1✔
1653
                cont.indexMutex.Lock()
1✔
1654
                nodefound := false
1✔
1655
                for node, devices := range cont.nodeOpflexDevice {
2✔
1656
                        found := false
1✔
1657

1✔
1658
                        if node == obj.GetAttrStr("hostName") {
2✔
1659
                                nodefound = true
1✔
1660
                        }
1✔
1661

1662
                        for i, device := range devices {
2✔
1663
                                if device.GetDn() != obj.GetDn() {
2✔
1664
                                        continue
1✔
1665
                                }
1666
                                found = true
1✔
1667

1✔
1668
                                if obj.GetAttrStr("hostName") != node {
2✔
1669
                                        cont.fabricPathLogger(node, device).
1✔
1670
                                                Debug("Moving opflex device from node")
1✔
1671

1✔
1672
                                        devices = append(devices[:i], devices[i+1:]...)
1✔
1673
                                        cont.nodeOpflexDevice[node] = devices
1✔
1674
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1675
                                        break
1✔
1676
                                } else if (device.GetAttrStr("mac") != obj.GetAttrStr("mac")) ||
1✔
1677
                                        (device.GetAttrStr("fabricPathDn") != obj.GetAttrStr("fabricPathDn")) ||
1✔
1678
                                        (device.GetAttrStr("state") != obj.GetAttrStr("state")) {
2✔
1679
                                        cont.fabricPathLogger(node, obj).
1✔
1680
                                                Debug("Updating opflex device")
1✔
1681

1✔
1682
                                        devices = append(append(devices[:i], devices[i+1:]...), obj)
1✔
1683
                                        cont.nodeOpflexDevice[node] = devices
1✔
1684
                                        nodeUpdates = append(nodeUpdates, node)
1✔
1685
                                        break
1✔
1686
                                }
1687
                        }
1688
                        if !found && obj.GetAttrStr("hostName") == node {
2✔
1689
                                cont.fabricPathLogger(node, obj).
1✔
1690
                                        Debug("Appending opflex device")
1✔
1691

1✔
1692
                                devices = append(devices, obj)
1✔
1693
                                cont.nodeOpflexDevice[node] = devices
1✔
1694
                                nodeUpdates = append(nodeUpdates, node)
1✔
1695
                        }
1✔
1696
                }
1697
                if !nodefound {
2✔
1698
                        node := obj.GetAttrStr("hostName")
1✔
1699
                        cont.fabricPathLogger(node, obj).Debug("Adding opflex device")
1✔
1700
                        cont.nodeOpflexDevice[node] = apicapi.ApicSlice{obj}
1✔
1701
                        nodeUpdates = append(nodeUpdates, node)
1✔
1702
                }
1✔
1703
                cont.log.Info("Opflex device list for node ", obj.GetAttrStr("hostName"), ": ", cont.nodeOpflexDevice[obj.GetAttrStr("hostName")])
1✔
1704
                cont.indexMutex.Unlock()
1✔
1705

1✔
1706
                for _, node := range nodeUpdates {
2✔
1707
                        cont.env.NodeServiceChanged(node)
1✔
1708
                        cont.erspanSyncOpflexDev()
1✔
1709
                }
1✔
1710
                cont.updateDeviceCluster()
1✔
1711
        }
1712
}
1713

1714
func (cont *AciController) postOpflexDeviceDelete(nodes []string) {
1✔
1715
        cont.updateDeviceCluster()
1✔
1716
        for _, node := range nodes {
2✔
1717
                cont.env.NodeServiceChanged(node)
1✔
1718
                cont.erspanSyncOpflexDev()
1✔
1719
        }
1✔
1720
}
1721

1722
func (cont *AciController) opflexDeviceDeleted(dn string) {
1✔
1723
        var nodeUpdates []string
1✔
1724
        var dnFound bool //to check if the dn belongs to this cluster
1✔
1725
        cont.log.Info("Processing opflex device delete notification of ", dn)
1✔
1726
        cont.indexMutex.Lock()
1✔
1727
        for node, devices := range cont.nodeOpflexDevice {
2✔
1728
                for i, device := range devices {
2✔
1729
                        if device.GetDn() != dn {
2✔
1730
                                continue
1✔
1731
                        }
1732
                        dnFound = true
1✔
1733
                        cont.fabricPathLogger(node, device).
1✔
1734
                                Debug("Deleting opflex device path")
1✔
1735
                        devices = append(devices[:i], devices[i+1:]...)
1✔
1736
                        cont.nodeOpflexDevice[node] = devices
1✔
1737
                        cont.log.Info("Deleted opflex device of node ", node, ": ", dn)
1✔
1738
                        nodeUpdates = append(nodeUpdates, node)
1✔
1739
                        break
1✔
1740
                }
1741
                if len(devices) == 0 {
2✔
1742
                        delete(cont.nodeOpflexDevice, node)
1✔
1743
                }
1✔
1744
        }
1745

1746
        // For clusters other than OpenShift On OpenStack,
1747
        // openStackFabricPathDnMap will be empty
1748
        for host, opflexOdevInfo := range cont.openStackFabricPathDnMap {
1✔
1749
                if _, ok := opflexOdevInfo.opflexODevDn[dn]; ok {
×
1750
                        cont.log.Info("Received OpenStack opflexODev delete notification for ", dn)
×
1751
                        delete(opflexOdevInfo.opflexODevDn, dn)
×
1752
                        if len(opflexOdevInfo.opflexODevDn) < 1 {
×
1753
                                delete(cont.openStackFabricPathDnMap, host)
×
1754
                                cont.log.Info("OpenStack opflexODev of host ", host, " is deleted from cache")
×
1755
                                dnFound = true
×
1756
                        } else {
×
UNCOV
1757
                                cont.openStackFabricPathDnMap[host] = opflexOdevInfo
×
1758
                        }
×
UNCOV
1759
                        break
×
1760
                }
1761
        }
1762
        cont.indexMutex.Unlock()
1✔
1763

1✔
1764
        if dnFound {
2✔
1765
                cont.postOpflexDeviceDelete(nodeUpdates)
1✔
1766
        }
1✔
1767
}
1768

1769
func (cont *AciController) writeApicSvc(key string, service *v1.Service) {
1✔
1770
        if cont.isCNOEnabled() {
1✔
UNCOV
1771
                return
×
1772
        }
×
1773
        aobj := apicapi.NewVmmInjectedSvc(cont.vmmDomainProvider(),
1✔
1774
                cont.config.AciVmmDomain, cont.config.AciVmmController,
1✔
1775
                service.Namespace, service.Name)
1✔
1776
        aobjDn := aobj.GetDn()
1✔
1777
        aobj.SetAttr("guid", string(service.UID))
1✔
1778

1✔
1779
        svcns := service.ObjectMeta.Namespace
1✔
1780
        _, exists, err := cont.namespaceIndexer.GetByKey(svcns)
1✔
1781
        if err != nil {
1✔
1782
                cont.log.Error("Failed to lookup ns : ", svcns, " ", err)
×
1783
                return
×
1784
        }
×
1785
        if !exists {
2✔
1786
                cont.log.Debug("Namespace of service ", service.ObjectMeta.Name, ": ", svcns, " doesn't exist, hence not sending an update to the APIC")
1✔
1787
                return
1✔
1788
        }
1✔
1789

1790
        if !cont.serviceEndPoints.SetServiceApicObject(aobj, service) {
2✔
1791
                return
1✔
1792
        }
1✔
1793
        var setApicSvcDnsName bool
1✔
1794
        if len(cont.config.ApicHosts) != 0 && apicapi.ApicVersion >= "5.1" {
1✔
UNCOV
1795
                setApicSvcDnsName = true
×
UNCOV
1796
        }
×
1797
        // APIC model only allows one of these
1798
        for _, ingress := range service.Status.LoadBalancer.Ingress {
1✔
UNCOV
1799
                if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
UNCOV
1800
                        aobj.SetAttr("lbIp", ingress.IP)
×
1801
                } else if ingress.Hostname != "" {
×
1802
                        ipList, err := net.LookupHost(ingress.Hostname)
×
1803
                        if err == nil && len(ipList) > 0 {
×
1804
                                aobj.SetAttr("lbIp", ipList[0])
×
1805
                        } else {
×
1806
                                cont.log.Errorf("Lookup: err: %v, ipList: %+v", err, ipList)
×
UNCOV
1807
                        }
×
1808
                }
UNCOV
1809
                break
×
1810
        }
1811
        if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != "None" {
2✔
1812
                aobj.SetAttr("clusterIp", service.Spec.ClusterIP)
1✔
1813
        }
1✔
1814

1815
        var t string
1✔
1816
        switch service.Spec.Type {
1✔
1817
        case v1.ServiceTypeClusterIP:
×
1818
                t = "clusterIp"
×
1819
        case v1.ServiceTypeNodePort:
×
UNCOV
1820
                t = "nodePort"
×
1821
        case v1.ServiceTypeLoadBalancer:
1✔
1822
                t = "loadBalancer"
1✔
UNCOV
1823
        case v1.ServiceTypeExternalName:
×
UNCOV
1824
                t = "externalName"
×
1825
        }
1826
        if t != "" {
2✔
1827
                aobj.SetAttr("type", t)
1✔
1828
        }
1✔
1829

1830
        if setApicSvcDnsName || cont.config.Flavor == "k8s-overlay" {
1✔
UNCOV
1831
                dnsName := fmt.Sprintf("%s.%s.svc.cluster.local", service.Name, service.Namespace)
×
UNCOV
1832

×
UNCOV
1833
                for _, ingress := range service.Status.LoadBalancer.Ingress {
×
UNCOV
1834
                        if ingress.Hostname != "" {
×
UNCOV
1835
                                aobj.SetAttr("dnsName", ingress.Hostname)
×
UNCOV
1836
                        } else if ingress.IP != "" && ingress.IP != "0.0.0.0" {
×
UNCOV
1837
                                aobj.SetAttr("dnsName", dnsName)
×
UNCOV
1838
                        }
×
1839
                }
UNCOV
1840
                if t == "clusterIp" || t == "nodePort" || t == "externalName" {
×
UNCOV
1841
                        aobj.SetAttr("dnsName", dnsName)
×
UNCOV
1842
                }
×
1843
        }
1844
        for _, port := range service.Spec.Ports {
2✔
1845
                proto := getProtocolStr(port.Protocol)
1✔
1846
                p := apicapi.NewVmmInjectedSvcPort(aobjDn,
1✔
1847
                        strconv.Itoa(int(port.Port)), proto, port.TargetPort.String())
1✔
1848
                p.SetAttr("nodePort", strconv.Itoa(int(port.NodePort)))
1✔
1849
                aobj.AddChild(p)
1✔
1850
        }
1✔
1851
        if cont.config.EnableVmmInjectedLabels && service.ObjectMeta.Labels != nil && apicapi.ApicVersion >= "5.2" {
1✔
UNCOV
1852
                for key, val := range service.ObjectMeta.Labels {
×
UNCOV
1853
                        newLabelKey := cont.aciNameForKey("label", key)
×
UNCOV
1854
                        label := apicapi.NewVmmInjectedLabel(aobj.GetDn(),
×
UNCOV
1855
                                newLabelKey, val)
×
UNCOV
1856
                        aobj.AddChild(label)
×
UNCOV
1857
                }
×
1858
        }
1859
        name := cont.aciNameForKey("service-vmm", key)
1✔
1860
        cont.log.Debug("Write Service Object: ", aobj)
1✔
1861
        cont.apicConn.WriteApicObjects(name, apicapi.ApicSlice{aobj})
1✔
1862
        cont.log.Debugf("svcObject: %+v", aobj)
1✔
1863
}
1864

1865
func removeAllConditions(conditions []metav1.Condition, conditionType string) []metav1.Condition {
1✔
1866
        i := 0
1✔
1867
        for _, cond := range conditions {
1✔
UNCOV
1868
                if cond.Type != conditionType {
×
UNCOV
1869
                        conditions[i] = cond
×
UNCOV
1870
                }
×
1871
        }
1872
        return conditions[:i]
1✔
1873
}
1874

1875
func (cont *AciController) updateServiceCondition(service *v1.Service, success bool, reason string, message string) bool {
1✔
1876
        conditionType := "LbIpamAllocation"
1✔
1877

1✔
1878
        var condition metav1.Condition
1✔
1879
        if success {
2✔
1880
                condition.Status = metav1.ConditionTrue
1✔
1881
        } else {
2✔
1882
                condition.Status = metav1.ConditionFalse
1✔
1883
                condition.Message = message
1✔
1884
        }
1✔
1885
        condition.Type = conditionType
1✔
1886
        condition.Reason = reason
1✔
1887
        condition.LastTransitionTime = metav1.Time{time.Now()}
1✔
1888
        for _, cond := range service.Status.Conditions {
2✔
1889
                if cond.Type == conditionType &&
1✔
1890
                        cond.Status == condition.Status &&
1✔
1891
                        cond.Message == condition.Message &&
1✔
1892
                        cond.Reason == condition.Reason {
2✔
1893
                        return false
1✔
1894
                }
1✔
1895
        }
1896

1897
        service.Status.Conditions = removeAllConditions(service.Status.Conditions, conditionType)
1✔
1898
        service.Status.Conditions = append(service.Status.Conditions, condition)
1✔
1899
        return true
1✔
1900
}
1901

1902
func (cont *AciController) validateRequestedIps(lbIpList []string) (net.IP, net.IP, bool) {
1✔
1903
        var ipv4, ipv6 net.IP
1✔
1904
        for _, lbIp := range lbIpList {
2✔
1905
                ip := net.ParseIP(lbIp)
1✔
1906
                if ip != nil {
2✔
1907
                        if ip.To4() != nil {
2✔
1908
                                if ipv4.Equal(net.IP{}) {
2✔
1909
                                        ipv4 = ip
1✔
1910
                                } else {
2✔
1911
                                        cont.log.Error("Annotation should have only one ipv4")
1✔
1912
                                        return ipv4, ipv6, false
1✔
1913
                                }
1✔
1914
                        } else if ip.To16() != nil {
2✔
1915
                                if ipv6.Equal(net.IP{}) {
2✔
1916
                                        ipv6 = ip
1✔
1917
                                } else {
2✔
1918
                                        cont.log.Error("Annotation should have only one ipv6")
1✔
1919
                                        return ipv4, ipv6, false
1✔
1920
                                }
1✔
1921
                        }
1922
                }
1923
        }
1924
        return ipv4, ipv6, true
1✔
1925
}
1926

1927
func (cont *AciController) returnUnusedStaticIngressIps(staticIngressIps, requestedIps []net.IP) {
1✔
1928
        for _, staticIp := range staticIngressIps {
2✔
1929
                found := false
1✔
1930
                for _, reqIp := range requestedIps {
2✔
1931
                        if reqIp.Equal(staticIp) {
2✔
1932
                                found = true
1✔
1933
                        }
1✔
1934
                }
1935
                if !found {
2✔
1936
                        returnIps(cont.staticServiceIps, []net.IP{staticIp})
1✔
1937
                }
1✔
1938
        }
1939
}
1940

1941
func (cont *AciController) allocateServiceIps(servicekey string,
1942
        service *v1.Service) bool {
1✔
1943
        logger := serviceLogger(cont.log, service)
1✔
1944
        cont.indexMutex.Lock()
1✔
1945
        meta, ok := cont.serviceMetaCache[servicekey]
1✔
1946
        if !ok {
2✔
1947
                meta = &serviceMeta{}
1✔
1948
                cont.serviceMetaCache[servicekey] = meta
1✔
1949

1✔
1950
                // Read any existing IPs and attempt to allocate them to the pod
1✔
1951
                for _, ingress := range service.Status.LoadBalancer.Ingress {
2✔
1952
                        ip := net.ParseIP(ingress.IP)
1✔
1953
                        if ip == nil {
1✔
UNCOV
1954
                                continue
×
1955
                        }
1956
                        if ip.To4() != nil {
2✔
1957
                                if cont.serviceIps.GetV4IpCache()[0].RemoveIp(ip) {
2✔
1958
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
1959
                                } else if cont.staticServiceIps.V4.RemoveIp(ip) {
3✔
1960
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
1961
                                }
1✔
1962
                        } else if ip.To16() != nil {
2✔
1963
                                if cont.serviceIps.GetV6IpCache()[0].RemoveIp(ip) {
2✔
1964
                                        meta.ingressIps = append(meta.ingressIps, ip)
1✔
1965
                                } else if cont.staticServiceIps.V6.RemoveIp(ip) {
3✔
1966
                                        meta.staticIngressIps = append(meta.staticIngressIps, ip)
1✔
1967
                                }
1✔
1968
                        }
1969
                }
1970
        }
1971

1972
        if !cont.serviceSyncEnabled {
2✔
1973
                cont.indexMutex.Unlock()
1✔
1974
                return false
1✔
1975
        }
1✔
1976

1977
        var requestedIps []net.IP
1✔
1978
        // try to give the requested load balancer IP to the pod
1✔
1979
        lbIps, ok := service.ObjectMeta.Annotations[metadata.LbIpAnnotation]
1✔
1980
        if ok {
2✔
1981
                lbIpList := strings.Split(lbIps, ",")
1✔
1982
                ipv4, ipv6, valid := cont.validateRequestedIps(lbIpList)
1✔
1983
                if valid {
2✔
1984
                        if ipv4 != nil {
2✔
1985
                                requestedIps = append(requestedIps, ipv4)
1✔
1986
                        }
1✔
1987
                        if ipv6 != nil {
2✔
1988
                                requestedIps = append(requestedIps, ipv6)
1✔
1989
                        }
1✔
1990
                } else {
1✔
1991
                        cont.returnServiceIps(meta.ingressIps)
1✔
1992
                        cont.log.Error("Invalid LB IP annotation for service ", servicekey)
1✔
1993
                        condUpdated := cont.updateServiceCondition(service, false, "InvalidAnnotation", "Invalid Loadbalancer IP annotation")
1✔
1994
                        if condUpdated {
2✔
1995
                                _, err := cont.updateServiceStatus(service)
1✔
1996
                                if err != nil {
1✔
UNCOV
1997
                                        logger.Error("Failed to update service status : ", err)
×
UNCOV
1998
                                        cont.indexMutex.Unlock()
×
UNCOV
1999
                                        return true
×
UNCOV
2000
                                }
×
2001
                        }
2002
                        cont.indexMutex.Unlock()
1✔
2003
                        return false
1✔
2004
                }
2005
        } else {
1✔
2006
                requestedIp := net.ParseIP(service.Spec.LoadBalancerIP)
1✔
2007
                if requestedIp != nil {
2✔
2008
                        requestedIps = append(requestedIps, requestedIp)
1✔
2009
                }
1✔
2010
        }
2011
        if len(requestedIps) > 0 {
2✔
2012
                meta.requestedIps = []net.IP{}
1✔
2013
                for _, requestedIp := range requestedIps {
2✔
2014
                        hasRequestedIp := false
1✔
2015
                        for _, ip := range meta.staticIngressIps {
2✔
2016
                                if reflect.DeepEqual(requestedIp, ip) {
2✔
2017
                                        hasRequestedIp = true
1✔
2018
                                }
1✔
2019
                        }
2020
                        if !hasRequestedIp {
2✔
2021
                                if requestedIp.To4() != nil &&
1✔
2022
                                        cont.staticServiceIps.V4.RemoveIp(requestedIp) {
2✔
2023
                                        hasRequestedIp = true
1✔
2024
                                } else if requestedIp.To16() != nil &&
2✔
2025
                                        cont.staticServiceIps.V6.RemoveIp(requestedIp) {
2✔
2026
                                        hasRequestedIp = true
1✔
2027
                                }
1✔
2028
                        }
2029
                        if hasRequestedIp {
2✔
2030
                                meta.requestedIps = append(meta.requestedIps, requestedIp)
1✔
2031
                        }
1✔
2032
                }
2033
                cont.returnUnusedStaticIngressIps(meta.staticIngressIps, meta.requestedIps)
1✔
2034
                meta.staticIngressIps = meta.requestedIps
1✔
2035
                cont.returnServiceIps(meta.ingressIps)
1✔
2036
                meta.ingressIps = nil
1✔
2037
                // If no requested ips are allocatable
1✔
2038
                if len(meta.requestedIps) < 1 {
2✔
2039
                        logger.Error("No Requested Ip addresses available for service ", servicekey)
1✔
2040
                        condUpdated := cont.updateServiceCondition(service, false, "RequestedIpsNotAllocatable", "The requested ips for loadbalancer service are not available or not in extern static range")
1✔
2041
                        if condUpdated {
2✔
2042
                                _, err := cont.updateServiceStatus(service)
1✔
2043
                                if err != nil {
1✔
UNCOV
2044
                                        cont.indexMutex.Unlock()
×
UNCOV
2045
                                        logger.Error("Failed to update service status: ", err)
×
UNCOV
2046
                                        return true
×
UNCOV
2047
                                }
×
2048
                        }
2049
                        cont.indexMutex.Unlock()
1✔
2050
                        return false
1✔
2051
                }
2052
        } else if len(meta.requestedIps) > 0 {
1✔
UNCOV
2053
                meta.requestedIps = nil
×
UNCOV
2054
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
×
UNCOV
2055
                meta.staticIngressIps = nil
×
UNCOV
2056
        }
×
2057
        ingressIps := make([]net.IP, 0)
1✔
2058
        ingressIps = append(ingressIps, meta.ingressIps...)
1✔
2059
        ingressIps = append(ingressIps, meta.staticIngressIps...)
1✔
2060

1✔
2061
        var ipv4, ipv6 net.IP
1✔
2062
        for _, ip := range ingressIps {
2✔
2063
                if ip.To4() != nil {
2✔
2064
                        ipv4 = ip
1✔
2065
                } else if ip.To16() != nil {
3✔
2066
                        ipv6 = ip
1✔
2067
                }
1✔
2068
        }
2069
        var clusterIPv4, clusterIPv6 net.IP
1✔
2070
        clusterIPs := append([]string{service.Spec.ClusterIP}, service.Spec.ClusterIPs...)
1✔
2071
        for _, ipStr := range clusterIPs {
2✔
2072
                ip := net.ParseIP(ipStr)
1✔
2073
                if ip == nil {
1✔
UNCOV
2074
                        continue
×
2075
                }
2076
                if ip.To4() != nil && clusterIPv4 == nil {
2✔
2077
                        clusterIPv4 = ip
1✔
2078
                } else if ip.To16() != nil && strings.Contains(ip.String(), ":") && clusterIPv6 == nil {
3✔
2079
                        clusterIPv6 = ip
1✔
2080
                }
1✔
2081
        }
2082
        if clusterIPv4 != nil && ipv4 == nil {
2✔
2083
                if len(requestedIps) < 1 {
2✔
2084
                        ipv4, _ = cont.serviceIps.AllocateIp(true)
1✔
2085
                        if ipv4 != nil {
2✔
2086
                                ingressIps = append(ingressIps, ipv4)
1✔
2087
                        }
1✔
2088
                }
2089
        } else if clusterIPv4 == nil && ipv4 != nil {
1✔
UNCOV
2090
                cont.removeIpFromIngressIPList(&ingressIps, ipv4)
×
UNCOV
2091
        }
×
2092

2093
        if clusterIPv6 != nil && ipv6 == nil {
2✔
2094
                if len(requestedIps) < 1 {
2✔
2095
                        ipv6, _ = cont.serviceIps.AllocateIp(false)
1✔
2096
                        if ipv6 != nil {
2✔
2097
                                ingressIps = append(ingressIps, ipv6)
1✔
2098
                        }
1✔
2099
                }
2100
        } else if clusterIPv6 == nil && ipv6 != nil {
1✔
UNCOV
2101
                cont.removeIpFromIngressIPList(&ingressIps, ipv6)
×
2102
        }
×
2103

2104
        if len(requestedIps) < 1 {
2✔
2105
                meta.ingressIps = ingressIps
1✔
2106
        }
1✔
2107
        if ipv4 == nil && ipv6 == nil {
2✔
2108
                logger.Error("No IP addresses available for service")
1✔
2109
                cont.indexMutex.Unlock()
1✔
2110
                return true
1✔
2111
        }
1✔
2112
        cont.indexMutex.Unlock()
1✔
2113
        var newIngress []v1.LoadBalancerIngress
1✔
2114
        for _, ip := range meta.ingressIps {
2✔
2115
                newIngress = append(newIngress, v1.LoadBalancerIngress{IP: ip.String()})
1✔
2116
        }
1✔
2117
        for _, ip := range meta.staticIngressIps {
2✔
2118
                newIngress = append(newIngress, v1.LoadBalancerIngress{IP: ip.String()})
1✔
2119
        }
1✔
2120

2121
        ipUpdated := false
1✔
2122
        if !reflect.DeepEqual(newIngress, service.Status.LoadBalancer.Ingress) {
2✔
2123
                service.Status.LoadBalancer.Ingress = newIngress
1✔
2124

1✔
2125
                logger.WithFields(logrus.Fields{
1✔
2126
                        "status": service.Status.LoadBalancer.Ingress,
1✔
2127
                }).Info("Updating service load balancer status")
1✔
2128

1✔
2129
                ipUpdated = true
1✔
2130
        }
1✔
2131

2132
        success := true
1✔
2133
        reason := "Success"
1✔
2134
        message := ""
1✔
2135
        if len(requestedIps) > 0 && len(requestedIps) != len(meta.staticIngressIps) {
1✔
2136
                success = false
×
2137
                reason = "OneIpNotAllocatable"
×
2138
                message = "One of the requested Ips is not allocatable"
×
UNCOV
2139
        }
×
2140
        condUpdated := cont.updateServiceCondition(service, success, reason, message)
1✔
2141
        if ipUpdated || condUpdated {
2✔
2142
                _, err := cont.updateServiceStatus(service)
1✔
2143
                if err != nil {
1✔
UNCOV
2144
                        logger.Error("Failed to update service status: ", err)
×
UNCOV
2145
                        return true
×
UNCOV
2146
                }
×
2147
        }
2148
        return false
1✔
2149
}
2150

2151
func (cont *AciController) handleServiceDelete(servicekey string) bool {
1✔
2152
        if cont.isCNOEnabled() {
1✔
UNCOV
2153
                return false
×
UNCOV
2154
        }
×
2155
        cont.clearLbService(servicekey)
1✔
2156
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("service-vmm",
1✔
2157
                servicekey))
1✔
2158
        return false
1✔
2159
}
2160

2161
func (cont *AciController) handleServiceUpdate(service *v1.Service) bool {
1✔
2162
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2163
        if err != nil {
1✔
UNCOV
2164
                serviceLogger(cont.log, service).
×
UNCOV
2165
                        Error("Could not create service key: ", err)
×
UNCOV
2166
                return false
×
UNCOV
2167
        }
×
2168
        if cont.isCNOEnabled() {
1✔
UNCOV
2169
                return false
×
UNCOV
2170
        }
×
2171
        var requeue bool
1✔
2172
        isLoadBalancer := service.Spec.Type == v1.ServiceTypeLoadBalancer
1✔
2173
        if isLoadBalancer {
2✔
2174
                if *cont.config.AllocateServiceIps {
2✔
2175
                        requeue = cont.allocateServiceIps(servicekey, service)
1✔
2176
                }
1✔
2177
                // Check if any existing SNAT policy (with no explicit SnatIp)
2178
                // matches this service. This handles the case where the SNAT
2179
                // policy was created before the service existed.
2180
                cont.indexMutex.Lock()
1✔
2181
                if _, exists := cont.snatServices[servicekey]; !exists {
2✔
2182
                        for _, policy := range cont.snatPolicyCache {
1✔
UNCOV
2183
                                if len(policy.SnatIp) == 0 {
×
UNCOV
2184
                                        if len(policy.Selector.Namespace) == 0 ||
×
UNCOV
2185
                                                policy.Selector.Namespace == service.ObjectMeta.Namespace {
×
UNCOV
2186
                                                selector := labels.SelectorFromSet(labels.Set(policy.Selector.Labels))
×
UNCOV
2187
                                                if selector.Matches(labels.Set(service.ObjectMeta.Labels)) {
×
UNCOV
2188
                                                        cont.snatServices[servicekey] = true
×
UNCOV
2189
                                                        break
×
2190
                                                }
2191
                                        }
2192
                                }
2193
                        }
2194
                }
2195
                if cont.serviceSyncEnabled {
2✔
2196
                        cont.indexMutex.Unlock()
1✔
2197
                        err = cont.updateServiceDeviceInstance(servicekey, service)
1✔
2198
                        if err != nil {
1✔
UNCOV
2199
                                serviceLogger(cont.log, service).
×
UNCOV
2200
                                        Error("Failed to update service device Instance: ", err)
×
UNCOV
2201
                                return true
×
UNCOV
2202
                        }
×
2203
                } else {
1✔
2204
                        cont.indexMutex.Unlock()
1✔
2205
                }
1✔
2206
        } else {
1✔
2207
                cont.clearLbService(servicekey)
1✔
2208
        }
1✔
2209
        cont.writeApicSvc(servicekey, service)
1✔
2210
        return requeue
1✔
2211
}
2212

2213
func (cont *AciController) clearLbService(servicekey string) {
1✔
2214
        cont.indexMutex.Lock()
1✔
2215
        if meta, ok := cont.serviceMetaCache[servicekey]; ok {
2✔
2216
                cont.returnServiceIps(meta.ingressIps)
1✔
2217
                returnIps(cont.staticServiceIps, meta.staticIngressIps)
1✔
2218
                delete(cont.serviceMetaCache, servicekey)
1✔
2219
                delete(cont.snatServices, servicekey)
1✔
2220
        }
1✔
2221
        cont.indexMutex.Unlock()
1✔
2222
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("svc", servicekey))
1✔
2223
}
2224

2225
func getEndpointsIps(endpoints *v1.Endpoints) map[string]bool {
1✔
2226
        ips := make(map[string]bool)
1✔
2227
        for _, subset := range endpoints.Subsets {
2✔
2228
                for _, addr := range subset.Addresses {
2✔
2229
                        ips[addr.IP] = true
1✔
2230
                }
1✔
2231
                for _, addr := range subset.NotReadyAddresses {
1✔
UNCOV
2232
                        ips[addr.IP] = true
×
UNCOV
2233
                }
×
2234
        }
2235
        return ips
1✔
2236
}
2237

2238
func (cont *AciController) processServiceTargetPorts(service *v1.Service, svcKey string, old bool) map[string]targetPort {
1✔
2239
        ports := make(map[string]targetPort)
1✔
2240
        for _, port := range service.Spec.Ports {
2✔
2241
                var key string
1✔
2242
                portnums := make(map[int]bool)
1✔
2243

1✔
2244
                if port.TargetPort.Type == intstr.String {
2✔
2245
                        entry, exists := cont.namedPortServiceIndex[svcKey]
1✔
2246
                        if !old {
2✔
2247
                                if !exists {
2✔
2248
                                        cont.log.Debugf("Creating named port index for service: %s, port: %s", svcKey, port.Name)
1✔
2249
                                        newEntry := make(namedPortServiceIndexEntry)
1✔
2250
                                        entry = &newEntry
1✔
2251
                                }
1✔
2252
                                (*entry)[port.Name] = &namedPortServiceIndexPort{
1✔
2253
                                        targetPortName: port.TargetPort.String(),
1✔
2254
                                        resolvedPorts:  make(map[int]bool),
1✔
2255
                                }
1✔
2256
                                cont.namedPortServiceIndex[svcKey] = entry
1✔
2257
                        } else if exists {
2✔
2258
                                delete(*entry, port.Name)
1✔
2259
                                cont.log.Debugf("Removed named port index for service: %s port: %s, entry: %v", svcKey, port.Name, entry)
1✔
2260
                                if len(*entry) == 0 {
2✔
2261
                                        delete(cont.namedPortServiceIndex, svcKey)
1✔
2262
                                } else {
2✔
2263
                                        cont.namedPortServiceIndex[svcKey] = entry
1✔
2264
                                }
1✔
2265
                        }
2266
                        key = portProto(&port.Protocol) + "-name-" + port.TargetPort.String()
1✔
2267
                } else {
1✔
2268
                        portNum := port.TargetPort.IntValue()
1✔
2269
                        if portNum <= 0 {
2✔
2270
                                portNum = int(port.Port)
1✔
2271
                        }
1✔
2272
                        key = portProto(&port.Protocol) + "-num-" + strconv.Itoa(portNum)
1✔
2273
                        portnums[portNum] = true
1✔
2274
                }
2275

2276
                ports[key] = targetPort{
1✔
2277
                        proto: port.Protocol,
1✔
2278
                        ports: portnums,
1✔
2279
                }
1✔
2280
        }
2281
        return ports
1✔
2282
}
2283

2284
func (cont *AciController) endpointsAdded(obj interface{}) {
1✔
2285
        endpoints := obj.(*v1.Endpoints)
1✔
2286
        servicekey, err := cache.MetaNamespaceKeyFunc(obj.(*v1.Endpoints))
1✔
2287
        if err != nil {
1✔
UNCOV
2288
                cont.log.Error("Could not create service key: ", err)
×
UNCOV
2289
                return
×
UNCOV
2290
        }
×
2291

2292
        ips := getEndpointsIps(endpoints)
1✔
2293
        cont.indexMutex.Lock()
1✔
2294
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
2295
        cont.queueIPNetPolUpdates(ips)
1✔
2296
        cont.indexMutex.Unlock()
1✔
2297

1✔
2298
        cont.queueEndpointsNetPolUpdates(endpoints)
1✔
2299

1✔
2300
        cont.queueServiceUpdateByKey(servicekey)
1✔
2301
}
2302

2303
func (cont *AciController) endpointsDeleted(obj interface{}) {
1✔
2304
        endpoints, isEndpoints := obj.(*v1.Endpoints)
1✔
2305
        if !isEndpoints {
1✔
UNCOV
2306
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
UNCOV
2307
                if !ok {
×
UNCOV
2308
                        cont.log.Error("Received unexpected object: ", obj)
×
2309
                        return
×
2310
                }
×
2311
                endpoints, ok = deletedState.Obj.(*v1.Endpoints)
×
2312
                if !ok {
×
2313
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpoints object: ", deletedState.Obj)
×
2314
                        return
×
UNCOV
2315
                }
×
2316
        }
2317
        servicekey, err := cache.MetaNamespaceKeyFunc(endpoints)
1✔
2318
        if err != nil {
1✔
2319
                cont.log.Error("Could not create service key: ", err)
×
2320
                return
×
2321
        }
×
2322

2323
        ips := getEndpointsIps(endpoints)
1✔
2324
        cont.indexMutex.Lock()
1✔
2325
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
2326
        cont.queueIPNetPolUpdates(ips)
1✔
2327
        cont.indexMutex.Unlock()
1✔
2328

1✔
2329
        cont.queueEndpointsNetPolUpdates(endpoints)
1✔
2330

1✔
2331
        cont.queueServiceUpdateByKey(servicekey)
1✔
2332
}
2333

2334
func (cont *AciController) endpointsUpdated(oldEps, newEps interface{}) {
1✔
2335
        oldendpoints := oldEps.(*v1.Endpoints)
1✔
2336
        newendpoints := newEps.(*v1.Endpoints)
1✔
2337
        servicekey, err := cache.MetaNamespaceKeyFunc(newendpoints)
1✔
2338
        if err != nil {
1✔
2339
                cont.log.Error("Could not create service key: ", err)
×
2340
                return
×
UNCOV
2341
        }
×
2342

2343
        oldIps := getEndpointsIps(oldendpoints)
1✔
2344
        newIps := getEndpointsIps(newendpoints)
1✔
2345
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
2346
                cont.indexMutex.Lock()
1✔
2347
                cont.queueIPNetPolUpdates(oldIps)
1✔
2348
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
2349
                cont.queueIPNetPolUpdates(newIps)
1✔
2350
                cont.indexMutex.Unlock()
1✔
2351
        }
1✔
2352

2353
        if !reflect.DeepEqual(oldendpoints.Subsets, newendpoints.Subsets) {
2✔
2354
                cont.queueEndpointsNetPolUpdates(oldendpoints)
1✔
2355
                cont.queueEndpointsNetPolUpdates(newendpoints)
1✔
2356
        }
1✔
2357

2358
        cont.queueServiceUpdateByKey(servicekey)
1✔
2359
}
2360

2361
func (cont *AciController) serviceAdded(obj interface{}) {
1✔
2362
        service := obj.(*v1.Service)
1✔
2363
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2364
        if err != nil {
1✔
UNCOV
2365
                serviceLogger(cont.log, service).
×
UNCOV
2366
                        Error("Could not create service key: ", err)
×
UNCOV
2367
                return
×
UNCOV
2368
        }
×
2369

2370
        cont.indexMutex.Lock()
1✔
2371
        ports := cont.processServiceTargetPorts(service, servicekey, false)
1✔
2372
        cont.queuePortNetPolUpdates(ports)
1✔
2373
        cont.updateTargetPortIndex(true, servicekey, nil, ports)
1✔
2374
        cont.indexMutex.Unlock()
1✔
2375

1✔
2376
        cont.queueServiceUpdateByKey(servicekey)
1✔
2377
}
2378

2379
func (cont *AciController) serviceUpdated(oldSvc, newSvc interface{}) {
1✔
2380
        oldservice := oldSvc.(*v1.Service)
1✔
2381
        newservice := newSvc.(*v1.Service)
1✔
2382
        servicekey, err := cache.MetaNamespaceKeyFunc(newservice)
1✔
2383
        if err != nil {
1✔
UNCOV
2384
                serviceLogger(cont.log, newservice).
×
UNCOV
2385
                        Error("Could not create service key: ", err)
×
UNCOV
2386
                return
×
UNCOV
2387
        }
×
2388
        if !reflect.DeepEqual(oldservice.Spec.Ports, newservice.Spec.Ports) {
1✔
UNCOV
2389
                cont.indexMutex.Lock()
×
UNCOV
2390
                oldPorts := cont.processServiceTargetPorts(oldservice, servicekey, true)
×
UNCOV
2391
                newPorts := cont.processServiceTargetPorts(newservice, servicekey, false)
×
UNCOV
2392
                cont.queuePortNetPolUpdates(oldPorts)
×
UNCOV
2393
                cont.updateTargetPortIndex(true, servicekey, oldPorts, newPorts)
×
UNCOV
2394
                cont.queuePortNetPolUpdates(newPorts)
×
UNCOV
2395
                cont.indexMutex.Unlock()
×
UNCOV
2396
        }
×
2397
        cont.queueServiceUpdateByKey(servicekey)
1✔
2398
}
2399

2400
func (cont *AciController) serviceDeleted(obj interface{}) {
1✔
2401
        service, isService := obj.(*v1.Service)
1✔
2402
        if !isService {
1✔
2403
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
UNCOV
2404
                if !ok {
×
UNCOV
2405
                        serviceLogger(cont.log, service).
×
2406
                                Error("Received unexpected object: ", obj)
×
2407
                        return
×
UNCOV
2408
                }
×
UNCOV
2409
                service, ok = deletedState.Obj.(*v1.Service)
×
UNCOV
2410
                if !ok {
×
UNCOV
2411
                        serviceLogger(cont.log, service).
×
UNCOV
2412
                                Error("DeletedFinalStateUnknown contained non-Services object: ", deletedState.Obj)
×
UNCOV
2413
                        return
×
UNCOV
2414
                }
×
2415
        }
2416
        servicekey, err := cache.MetaNamespaceKeyFunc(service)
1✔
2417
        if err != nil {
1✔
UNCOV
2418
                serviceLogger(cont.log, service).
×
UNCOV
2419
                        Error("Could not create service key: ", err)
×
UNCOV
2420
                return
×
UNCOV
2421
        }
×
2422

2423
        cont.indexMutex.Lock()
1✔
2424
        ports := cont.processServiceTargetPorts(service, servicekey, true)
1✔
2425
        cont.updateTargetPortIndex(true, servicekey, ports, nil)
1✔
2426
        cont.queuePortNetPolUpdates(ports)
1✔
2427
        cont.indexMutex.Unlock()
1✔
2428

1✔
2429
        deletedServiceKey := "DELETED_" + servicekey
1✔
2430
        cont.queueServiceUpdateByKey(deletedServiceKey)
1✔
2431
}
2432

2433
func (cont *AciController) serviceFullSync() {
1✔
2434
        cache.ListAll(cont.serviceIndexer, labels.Everything(),
1✔
2435
                func(sobj interface{}) {
2✔
2436
                        cont.queueServiceUpdate(sobj.(*v1.Service))
1✔
2437
                })
1✔
2438
}
2439

2440
func (cont *AciController) getEndpointSliceIps(endpointSlice *discovery.EndpointSlice) map[string]bool {
1✔
2441
        ips := make(map[string]bool)
1✔
2442
        for _, endpoints := range endpointSlice.Endpoints {
2✔
2443
                for _, addr := range endpoints.Addresses {
2✔
2444
                        ips[addr] = true
1✔
2445
                }
1✔
2446
        }
2447
        return ips
1✔
2448
}
2449

UNCOV
2450
func (cont *AciController) notReadyEndpointPresent(endpointSlice *discovery.EndpointSlice) bool {
×
2451
        for _, endpoints := range endpointSlice.Endpoints {
×
2452
                if (endpoints.Conditions.Ready != nil && !*endpoints.Conditions.Ready) &&
×
2453
                        (endpoints.Conditions.Terminating == nil || !*endpoints.Conditions.Terminating) {
×
2454
                        return true
×
2455
                }
×
2456
        }
2457
        return false
×
2458
}
2459

UNCOV
2460
func (cont *AciController) getEndpointSliceEpIps(endpoints *discovery.Endpoint) map[string]bool {
×
2461
        ips := make(map[string]bool)
×
2462
        for _, addr := range endpoints.Addresses {
×
2463
                ips[addr] = true
×
2464
        }
×
2465
        return ips
×
2466
}
2467

2468
func (cont *AciController) processDelayedEpSlices() {
1✔
2469
        var processEps []DelayedEpSlice
1✔
2470
        cont.indexMutex.Lock()
1✔
2471
        for i := 0; i < len(cont.delayedEpSlices); i++ {
1✔
2472
                delayedepslice := cont.delayedEpSlices[i]
×
2473
                if time.Now().After(delayedepslice.DelayedTime) {
×
2474
                        var toprocess DelayedEpSlice
×
UNCOV
2475
                        err := util.DeepCopyObj(&delayedepslice, &toprocess)
×
UNCOV
2476
                        if err != nil {
×
UNCOV
2477
                                cont.log.Error(err)
×
2478
                                continue
×
2479
                        }
2480
                        processEps = append(processEps, toprocess)
×
2481
                        cont.delayedEpSlices = append(cont.delayedEpSlices[:i], cont.delayedEpSlices[i+1:]...)
×
2482
                }
2483
        }
2484

2485
        cont.indexMutex.Unlock()
1✔
2486
        for _, epslice := range processEps {
1✔
2487
                //ignore the epslice if newly added endpoint is not ready
×
2488
                if cont.notReadyEndpointPresent(epslice.NewEpSlice) {
×
2489
                        cont.log.Debug("Ignoring the update as the new endpoint is not ready : ", epslice.NewEpSlice)
×
UNCOV
2490
                } else {
×
UNCOV
2491
                        cont.log.Debug("Processing update of epslice : ", epslice.NewEpSlice)
×
UNCOV
2492
                        cont.doendpointSliceUpdated(epslice.OldEpSlice, epslice.NewEpSlice)
×
UNCOV
2493
                }
×
2494
        }
2495
}
2496

2497
func (cont *AciController) resolveServiceNamedPortFromEpSlice(epSlice *discovery.EndpointSlice, serviceKey string, old bool) {
1✔
2498
        indexEntry, ok := cont.namedPortServiceIndex[serviceKey]
1✔
2499
        if !ok {
2✔
2500
                return
1✔
2501
        }
1✔
2502
        for _, port := range epSlice.Ports {
2✔
2503
                if port.Name == nil || port.Port == nil {
1✔
2504
                        continue
×
2505
                }
2506
                if portEntry, ok := (*indexEntry)[*port.Name]; ok && portEntry != nil {
2✔
2507
                        portNum := int(*port.Port)
1✔
2508
                        if old {
2✔
2509
                                delete(portEntry.resolvedPorts, portNum)
1✔
2510
                                cont.log.Debugf("Deleting port: %d from service %s resolved target port. Resolved ports: %v", portNum, serviceKey, portEntry.resolvedPorts)
1✔
2511
                        } else {
2✔
2512
                                portEntry.resolvedPorts[portNum] = true
1✔
2513
                                cont.log.Debugf("Adding port: %d to service %s resolved target port. Resolved ports: %v", portNum, serviceKey, portEntry.resolvedPorts)
1✔
2514
                        }
1✔
2515
                        key := portProto(port.Protocol) + "-num-" + strconv.Itoa(portNum)
1✔
2516
                        targetPortIndexEntry := cont.targetPortIndex[key]
1✔
2517
                        if targetPortIndexEntry == nil && len(portEntry.resolvedPorts) == 1 {
2✔
2518
                                targetPortIndexEntry = &portIndexEntry{
1✔
2519
                                        port: targetPort{
1✔
2520
                                                proto: *port.Protocol,
1✔
2521
                                                ports: make(map[int]bool),
1✔
2522
                                        },
1✔
2523
                                        serviceKeys:       make(map[string]bool),
1✔
2524
                                        networkPolicyKeys: make(map[string]bool),
1✔
2525
                                }
1✔
2526
                                targetPortIndexEntry.port.ports[portNum] = true
1✔
2527
                                cont.targetPortIndex[key] = targetPortIndexEntry
1✔
2528
                        }
1✔
2529
                        if targetPortIndexEntry != nil {
2✔
2530
                                if len(portEntry.resolvedPorts) == 1 {
2✔
2531
                                        targetPortIndexEntry.serviceKeys[serviceKey] = true
1✔
2532
                                } else {
2✔
2533
                                        delete(targetPortIndexEntry.serviceKeys, serviceKey)
1✔
2534
                                }
1✔
2535
                        }
2536
                }
2537
        }
2538
}
2539
func (cont *AciController) endpointSliceAdded(obj interface{}) {
1✔
2540
        endpointslice, ok := obj.(*discovery.EndpointSlice)
1✔
2541
        if !ok {
1✔
2542
                cont.log.Error("error processing Endpointslice object: ", obj)
×
2543
                return
×
2544
        }
×
2545
        servicekey, valid := getServiceKey(endpointslice)
1✔
2546
        if !valid {
1✔
UNCOV
2547
                return
×
UNCOV
2548
        }
×
2549
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
2550
        cont.indexMutex.Lock()
1✔
2551
        cont.updateIpIndex(cont.endpointsIpIndex, nil, ips, servicekey)
1✔
2552
        cont.resolveServiceNamedPortFromEpSlice(endpointslice, servicekey, false)
1✔
2553
        cont.queueIPNetPolUpdates(ips)
1✔
2554
        cont.indexMutex.Unlock()
1✔
2555

1✔
2556
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
2557

1✔
2558
        cont.queueServiceUpdateByKey(servicekey)
1✔
2559
        cont.log.Info("EndPointSlice Object Added: ", servicekey)
1✔
2560
}
2561

2562
func (cont *AciController) endpointSliceDeleted(obj interface{}) {
1✔
2563
        endpointslice, isEndpointslice := obj.(*discovery.EndpointSlice)
1✔
2564
        if !isEndpointslice {
1✔
UNCOV
2565
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
UNCOV
2566
                if !ok {
×
UNCOV
2567
                        cont.log.Error("Received unexpected object: ", obj)
×
UNCOV
2568
                        return
×
UNCOV
2569
                }
×
UNCOV
2570
                endpointslice, ok = deletedState.Obj.(*discovery.EndpointSlice)
×
UNCOV
2571
                if !ok {
×
UNCOV
2572
                        cont.log.Error("DeletedFinalStateUnknown contained non-Endpointslice object: ", deletedState.Obj)
×
UNCOV
2573
                        return
×
UNCOV
2574
                }
×
2575
        }
2576
        servicekey, valid := getServiceKey(endpointslice)
1✔
2577
        if !valid {
1✔
UNCOV
2578
                return
×
UNCOV
2579
        }
×
2580
        ips := cont.getEndpointSliceIps(endpointslice)
1✔
2581
        cont.indexMutex.Lock()
1✔
2582
        cont.updateIpIndex(cont.endpointsIpIndex, ips, nil, servicekey)
1✔
2583
        cont.resolveServiceNamedPortFromEpSlice(endpointslice, servicekey, true)
1✔
2584
        cont.queueIPNetPolUpdates(ips)
1✔
2585
        cont.indexMutex.Unlock()
1✔
2586
        cont.queueEndpointSliceNetPolUpdates(endpointslice)
1✔
2587
        cont.queueServiceUpdateByKey(servicekey)
1✔
2588
}
2589

2590
// Checks if the given service is present in the user configured list of services
2591
// for pbr delay and if present, returns the servie specific delay if configured
UNCOV
2592
func (cont *AciController) svcInAddDelayList(name, ns string) (int, bool) {
×
UNCOV
2593
        for _, svc := range cont.config.ServiceGraphEndpointAddDelay.Services {
×
UNCOV
2594
                if svc.Name == name && svc.Namespace == ns {
×
UNCOV
2595
                        return svc.Delay, true
×
UNCOV
2596
                }
×
2597
        }
UNCOV
2598
        return 0, false
×
2599
}
2600

2601
// Check if the endpointslice update notification has any deletion of enpoint
2602
func (cont *AciController) isDeleteEndpointSlice(oldendpointslice, newendpointslice *discovery.EndpointSlice) bool {
×
2603
        del := false
×
UNCOV
2604

×
UNCOV
2605
        // if any endpoint is removed from endpontslice
×
UNCOV
2606
        if len(newendpointslice.Endpoints) < len(oldendpointslice.Endpoints) {
×
2607
                del = true
×
2608
        }
×
2609

2610
        if !del {
×
2611
                // if any one of the endpoint is in terminating state
×
2612
                for _, endpoint := range newendpointslice.Endpoints {
×
UNCOV
2613
                        if endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating {
×
UNCOV
2614
                                del = true
×
UNCOV
2615
                                break
×
2616
                        }
2617
                }
2618
        }
UNCOV
2619
        if !del {
×
UNCOV
2620
                // if any one of endpoint moved from ready state to not-ready state
×
UNCOV
2621
                for ix := range oldendpointslice.Endpoints {
×
UNCOV
2622
                        oldips := cont.getEndpointSliceEpIps(&oldendpointslice.Endpoints[ix])
×
UNCOV
2623
                        for newIx := range newendpointslice.Endpoints {
×
UNCOV
2624
                                newips := cont.getEndpointSliceEpIps(&newendpointslice.Endpoints[newIx])
×
UNCOV
2625
                                if reflect.DeepEqual(oldips, newips) {
×
UNCOV
2626
                                        if (oldendpointslice.Endpoints[ix].Conditions.Ready != nil && *oldendpointslice.Endpoints[ix].Conditions.Ready) &&
×
2627
                                                (newendpointslice.Endpoints[newIx].Conditions.Ready != nil && !*newendpointslice.Endpoints[newIx].Conditions.Ready) {
×
2628
                                                del = true
×
UNCOV
2629
                                        }
×
UNCOV
2630
                                        break
×
2631
                                }
2632
                        }
2633
                }
2634
        }
UNCOV
2635
        return del
×
2636
}
2637

2638
func (cont *AciController) doendpointSliceUpdatedDelay(oldendpointslice *discovery.EndpointSlice,
UNCOV
2639
        newendpointslice *discovery.EndpointSlice) {
×
UNCOV
2640
        svc, ns, valid := getServiceNameAndNs(newendpointslice)
×
2641
        if !valid {
×
2642
                return
×
UNCOV
2643
        }
×
UNCOV
2644
        svckey, valid := getServiceKey(newendpointslice)
×
UNCOV
2645
        if !valid {
×
UNCOV
2646
                return
×
UNCOV
2647
        }
×
UNCOV
2648
        delay := cont.config.ServiceGraphEndpointAddDelay.Delay
×
UNCOV
2649
        svcDelay, exists := cont.svcInAddDelayList(svc, ns)
×
UNCOV
2650
        if svcDelay > 0 {
×
UNCOV
2651
                delay = svcDelay
×
UNCOV
2652
        }
×
UNCOV
2653
        delayedsvc := exists && delay > 0
×
UNCOV
2654
        if delayedsvc {
×
2655
                cont.log.Debug("Delay of ", delay, " seconds is applicable for svc :", svc, " in ns: ", ns)
×
2656
                var delayedepslice DelayedEpSlice
×
2657
                delayedepslice.OldEpSlice = oldendpointslice
×
2658
                delayedepslice.ServiceKey = svckey
×
2659
                delayedepslice.NewEpSlice = newendpointslice
×
2660
                currentTime := time.Now()
×
2661
                delayedepslice.DelayedTime = currentTime.Add(time.Duration(delay) * time.Second)
×
2662
                cont.indexMutex.Lock()
×
2663
                cont.delayedEpSlices = append(cont.delayedEpSlices, &delayedepslice)
×
2664
                cont.indexMutex.Unlock()
×
2665
        } else {
×
2666
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
2667
        }
×
2668

2669
        if delayedsvc && cont.isDeleteEndpointSlice(oldendpointslice, newendpointslice) {
×
UNCOV
2670
                cont.log.Debug("Proceeding by ignoring delay as the update is due to delete of endpoint")
×
UNCOV
2671
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
×
UNCOV
2672
        }
×
2673
}
2674

2675
func (cont *AciController) endpointSliceUpdated(oldobj, newobj interface{}) {
1✔
2676
        oldendpointslice, ok := oldobj.(*discovery.EndpointSlice)
1✔
2677
        if !ok {
1✔
2678
                cont.log.Error("error processing Endpointslice object: ", oldobj)
×
UNCOV
2679
                return
×
UNCOV
2680
        }
×
2681
        newendpointslice, ok := newobj.(*discovery.EndpointSlice)
1✔
2682
        if !ok {
1✔
2683
                cont.log.Error("error processing Endpointslice object: ", newobj)
×
2684
                return
×
2685
        }
×
2686
        if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
2687
                cont.doendpointSliceUpdatedDelay(oldendpointslice, newendpointslice)
×
2688
        } else {
1✔
2689
                cont.doendpointSliceUpdated(oldendpointslice, newendpointslice)
1✔
2690
        }
1✔
2691
}
2692

2693
func (cont *AciController) doendpointSliceUpdated(oldendpointslice *discovery.EndpointSlice,
2694
        newendpointslice *discovery.EndpointSlice) {
1✔
2695
        servicekey, valid := getServiceKey(newendpointslice)
1✔
2696
        if !valid {
1✔
2697
                return
×
UNCOV
2698
        }
×
2699
        oldIps := cont.getEndpointSliceIps(oldendpointslice)
1✔
2700
        newIps := cont.getEndpointSliceIps(newendpointslice)
1✔
2701
        if !reflect.DeepEqual(oldIps, newIps) {
2✔
2702
                cont.indexMutex.Lock()
1✔
2703
                cont.resolveServiceNamedPortFromEpSlice(oldendpointslice, servicekey, true)
1✔
2704
                cont.resolveServiceNamedPortFromEpSlice(newendpointslice, servicekey, false)
1✔
2705
                cont.queueIPNetPolUpdates(oldIps)
1✔
2706
                cont.updateIpIndex(cont.endpointsIpIndex, oldIps, newIps, servicekey)
1✔
2707
                cont.queueIPNetPolUpdates(newIps)
1✔
2708
                cont.indexMutex.Unlock()
1✔
2709
        }
1✔
2710

2711
        if !reflect.DeepEqual(oldendpointslice.Endpoints, newendpointslice.Endpoints) {
2✔
2712
                cont.queueEndpointSliceNetPolUpdates(oldendpointslice)
1✔
2713
                cont.queueEndpointSliceNetPolUpdates(newendpointslice)
1✔
2714
        }
1✔
2715
        cont.log.Debug("EndPointSlice Object Update: ", servicekey)
1✔
2716
        cont.queueServiceUpdateByKey(servicekey)
1✔
2717
}
2718

2719
func (cont *AciController) queueEndpointSliceNetPolUpdates(endpointslice *discovery.EndpointSlice) {
1✔
2720
        for _, endpoint := range endpointslice.Endpoints {
2✔
2721
                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" ||
1✔
2722
                        endpoint.TargetRef.Namespace == "" || endpoint.TargetRef.Name == "" {
2✔
2723
                        continue
1✔
2724
                }
2725
                if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
1✔
UNCOV
2726
                        continue
×
2727
                }
2728
                podkey := endpoint.TargetRef.Namespace + "/" + endpoint.TargetRef.Name
1✔
2729
                npkeys := cont.netPolEgressPods.GetObjForPod(podkey)
1✔
2730
                ps := make(map[string]bool)
1✔
2731
                for _, npkey := range npkeys {
2✔
2732
                        cont.queueNetPolUpdateByKey(npkey)
1✔
2733
                }
1✔
2734
                // Process if the  any matching namedport wildcard policy is present
2735
                // ignore np already processed policies
2736
                cont.queueMatchingNamedNp(ps, podkey)
1✔
2737
        }
2738
}
2739

2740
func getServiceKey(endPointSlice *discovery.EndpointSlice) (string, bool) {
1✔
2741
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
1✔
2742
        if !ok {
1✔
UNCOV
2743
                return "", false
×
UNCOV
2744
        }
×
2745
        return endPointSlice.ObjectMeta.Namespace + "/" + serviceName, true
1✔
2746
}
2747

UNCOV
2748
func getServiceNameAndNs(endPointSlice *discovery.EndpointSlice) (string, string, bool) {
×
UNCOV
2749
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
×
UNCOV
2750
        if !ok {
×
UNCOV
2751
                return "", "", false
×
UNCOV
2752
        }
×
UNCOV
2753
        return serviceName, endPointSlice.ObjectMeta.Namespace, true
×
2754
}
2755

2756
// can be called with index lock
2757
func (sep *serviceEndpoint) UpdateServicesForNode(nodename string) {
1✔
2758
        cont := sep.cont
1✔
2759
        cache.ListAll(cont.endpointsIndexer, labels.Everything(),
1✔
2760
                func(endpointsobj interface{}) {
2✔
2761
                        endpoints := endpointsobj.(*v1.Endpoints)
1✔
2762
                        for _, subset := range endpoints.Subsets {
2✔
2763
                                for _, addr := range subset.Addresses {
2✔
2764
                                        if addr.NodeName != nil && *addr.NodeName == nodename {
2✔
2765
                                                servicekey, err :=
1✔
2766
                                                        cache.MetaNamespaceKeyFunc(endpointsobj.(*v1.Endpoints))
1✔
2767
                                                if err != nil {
1✔
2768
                                                        cont.log.Error("Could not create endpoints key: ", err)
×
2769
                                                        return
×
2770
                                                }
×
2771
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
2772
                                                return
1✔
2773
                                        }
2774
                                }
2775
                        }
2776
                })
2777
}
2778

2779
func (seps *serviceEndpointSlice) UpdateServicesForNode(nodename string) {
1✔
2780
        // 1. List all the endpointslice and check for matching nodename
1✔
2781
        // 2. if it matches trigger the Service update and mark it visited
1✔
2782
        cont := seps.cont
1✔
2783
        visited := make(map[string]bool)
1✔
2784
        cache.ListAll(cont.endpointSliceIndexer, labels.Everything(),
1✔
2785
                func(endpointSliceobj interface{}) {
2✔
2786
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2787
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2788
                                if endpoint.NodeName != nil && *endpoint.NodeName == nodename {
2✔
2789
                                        servicekey, valid := getServiceKey(endpointSlices)
1✔
2790
                                        if !valid {
1✔
UNCOV
2791
                                                return
×
UNCOV
2792
                                        }
×
2793
                                        if _, ok := visited[servicekey]; !ok {
2✔
2794
                                                cont.queueServiceUpdateByKey(servicekey)
1✔
2795
                                                visited[servicekey] = true
1✔
2796
                                                return
1✔
2797
                                        }
1✔
2798
                                }
2799
                        }
2800
                })
2801
}
2802
func (cont *AciController) setNodeMap(nodeMap map[string]*metadata.ServiceEndpoint, nodeName string) {
1✔
2803
        nodeMeta, ok := cont.nodeServiceMetaCache[nodeName]
1✔
2804
        if !ok {
2✔
2805
                return
1✔
2806
        }
1✔
2807
        _, ok = cont.fabricPathForNode(nodeName)
1✔
2808
        if !ok {
2✔
2809
                return
1✔
2810
        }
1✔
2811
        nodeMap[nodeName] = &nodeMeta.serviceEp
1✔
2812
}
2813

2814
// 2 cases when epslices corresponding to given service is presnt in delayedEpSlices:
2815
//  1. endpoint not present in delayedEpSlices of the service
2816
//  2. endpoint present in delayedEpSlices of the service but in not ready state
2817
//
2818
// indexMutex lock must be acquired before calling the function
UNCOV
2819
func (cont *AciController) isDelayedEndpoint(endpoint *discovery.Endpoint, svckey string) bool {
×
UNCOV
2820
        delayed := false
×
UNCOV
2821
        endpointips := cont.getEndpointSliceEpIps(endpoint)
×
UNCOV
2822
        for _, delayedepslices := range cont.delayedEpSlices {
×
UNCOV
2823
                if delayedepslices.ServiceKey == svckey {
×
UNCOV
2824
                        var found bool
×
UNCOV
2825
                        epslice := delayedepslices.OldEpSlice
×
UNCOV
2826
                        for ix := range epslice.Endpoints {
×
UNCOV
2827
                                epips := cont.getEndpointSliceEpIps(&epslice.Endpoints[ix])
×
UNCOV
2828
                                if reflect.DeepEqual(endpointips, epips) {
×
UNCOV
2829
                                        // case 2
×
UNCOV
2830
                                        if epslice.Endpoints[ix].Conditions.Ready != nil && !*epslice.Endpoints[ix].Conditions.Ready {
×
UNCOV
2831
                                                delayed = true
×
UNCOV
2832
                                        }
×
UNCOV
2833
                                        found = true
×
2834
                                }
2835
                        }
2836
                        // case 1
UNCOV
2837
                        if !found {
×
UNCOV
2838
                                delayed = true
×
UNCOV
2839
                        }
×
2840
                }
2841
        }
UNCOV
2842
        return delayed
×
2843
}
2844

2845
// set nodemap only if endoint is ready and not in delayedEpSlices
2846
func (cont *AciController) setNodeMapDelay(nodeMap map[string]*metadata.ServiceEndpoint,
UNCOV
2847
        endpoint *discovery.Endpoint, service *v1.Service) {
×
UNCOV
2848
        svckey, err := cache.MetaNamespaceKeyFunc(service)
×
UNCOV
2849
        if err != nil {
×
UNCOV
2850
                cont.log.Error("Could not create service key: ", err)
×
UNCOV
2851
                return
×
UNCOV
2852
        }
×
UNCOV
2853
        if cont.config.NoWaitForServiceEpReadiness ||
×
UNCOV
2854
                (endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready) {
×
UNCOV
2855
                if endpoint.NodeName != nil && *endpoint.NodeName != "" {
×
UNCOV
2856
                        // donot setNodeMap for endpoint if:
×
UNCOV
2857
                        //   endpoint is newly added
×
UNCOV
2858
                        //   endpoint status changed from not ready to ready
×
UNCOV
2859
                        if !cont.isDelayedEndpoint(endpoint, svckey) {
×
UNCOV
2860
                                cont.setNodeMap(nodeMap, *endpoint.NodeName)
×
UNCOV
2861
                        }
×
2862
                }
2863
        }
2864
}
2865

2866
func (sep *serviceEndpoint) GetnodesMetadata(key string,
2867
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
2868
        cont := sep.cont
1✔
2869
        endpointsobj, exists, err := cont.endpointsIndexer.GetByKey(key)
1✔
2870
        if err != nil {
1✔
UNCOV
2871
                cont.log.Error("Could not lookup endpoints for " +
×
UNCOV
2872
                        key + ": " + err.Error())
×
UNCOV
2873
        }
×
2874
        if exists && endpointsobj != nil {
2✔
2875
                endpoints := endpointsobj.(*v1.Endpoints)
1✔
2876
                for _, subset := range endpoints.Subsets {
2✔
2877
                        for _, addr := range subset.Addresses {
2✔
2878
                                if addr.NodeName == nil {
2✔
2879
                                        continue
1✔
2880
                                }
2881
                                cont.setNodeMap(nodeMap, *addr.NodeName)
1✔
2882
                        }
2883
                }
2884
        }
2885
        cont.log.Info("NodeMap: ", nodeMap)
1✔
2886
}
2887

2888
func (seps *serviceEndpointSlice) GetnodesMetadata(key string,
2889
        service *v1.Service, nodeMap map[string]*metadata.ServiceEndpoint) {
1✔
2890
        cont := seps.cont
1✔
2891
        // 1. Get all the Endpoint slices matching the label service-name
1✔
2892
        // 2. update the node map matching with endpoints nodes name
1✔
2893
        label := map[string]string{discovery.LabelServiceName: service.ObjectMeta.Name}
1✔
2894
        selector := labels.SelectorFromSet(label)
1✔
2895
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2896
                func(endpointSliceobj interface{}) {
2✔
2897
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2898
                        for ix := range endpointSlices.Endpoints {
2✔
2899
                                if cont.config.ServiceGraphEndpointAddDelay.Delay > 0 {
1✔
UNCOV
2900
                                        cont.setNodeMapDelay(nodeMap, &endpointSlices.Endpoints[ix], service)
×
2901
                                } else if cont.config.NoWaitForServiceEpReadiness ||
1✔
2902
                                        (endpointSlices.Endpoints[ix].Conditions.Ready != nil && *endpointSlices.Endpoints[ix].Conditions.Ready) {
2✔
2903
                                        if endpointSlices.Endpoints[ix].NodeName != nil && *endpointSlices.Endpoints[ix].NodeName != "" {
2✔
2904
                                                cont.setNodeMap(nodeMap, *endpointSlices.Endpoints[ix].NodeName)
1✔
2905
                                        }
1✔
2906
                                }
2907
                        }
2908
                })
2909
        cont.log.Debug("NodeMap: ", nodeMap)
1✔
2910
}
2911

2912
func (sep *serviceEndpoint) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2913
        cont := sep.cont
1✔
2914
        key, err := cache.MetaNamespaceKeyFunc(service)
1✔
2915
        if err != nil {
1✔
UNCOV
2916
                serviceLogger(cont.log, service).
×
UNCOV
2917
                        Error("Could not create service key: ", err)
×
UNCOV
2918
                return false
×
UNCOV
2919
        }
×
2920
        endpointsobj, _, err := cont.endpointsIndexer.GetByKey(key)
1✔
2921
        if err != nil {
1✔
UNCOV
2922
                cont.log.Error("Could not lookup endpoints for " +
×
UNCOV
2923
                        key + ": " + err.Error())
×
UNCOV
2924
                return false
×
UNCOV
2925
        }
×
2926
        if endpointsobj != nil {
2✔
2927
                for _, subset := range endpointsobj.(*v1.Endpoints).Subsets {
2✔
2928
                        for _, addr := range subset.Addresses {
2✔
2929
                                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" {
1✔
UNCOV
2930
                                        continue
×
2931
                                }
2932
                                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(),
1✔
2933
                                        addr.TargetRef.Name))
1✔
2934
                        }
2935
                }
2936
        }
2937
        return true
1✔
2938
}
2939

2940
func (seps *serviceEndpointSlice) SetServiceApicObject(aobj apicapi.ApicObject, service *v1.Service) bool {
1✔
2941
        cont := seps.cont
1✔
2942
        label := map[string]string{discovery.LabelServiceName: service.ObjectMeta.Name}
1✔
2943
        selector := labels.SelectorFromSet(label)
1✔
2944
        epcount := 0
1✔
2945
        childs := make(map[string]struct{})
1✔
2946
        var exists = struct{}{}
1✔
2947
        cache.ListAllByNamespace(cont.endpointSliceIndexer, service.ObjectMeta.Namespace, selector,
1✔
2948
                func(endpointSliceobj interface{}) {
2✔
2949
                        endpointSlices := endpointSliceobj.(*discovery.EndpointSlice)
1✔
2950
                        for _, endpoint := range endpointSlices.Endpoints {
2✔
2951
                                if endpoint.TargetRef == nil || endpoint.TargetRef.Kind != "Pod" {
1✔
UNCOV
2952
                                        continue
×
2953
                                }
2954
                                epcount++
1✔
2955
                                childs[endpoint.TargetRef.Name] = exists
1✔
2956
                                cont.log.Debug("EndPoint added: ", endpoint.TargetRef.Name)
1✔
2957
                        }
2958
                })
2959
        for child := range childs {
2✔
2960
                aobj.AddChild(apicapi.NewVmmInjectedSvcEp(aobj.GetDn(), child))
1✔
2961
        }
1✔
2962
        return epcount != 0
1✔
2963
}
2964

2965
func getProtocolStr(proto v1.Protocol) string {
1✔
2966
        var protostring string
1✔
2967
        switch proto {
1✔
2968
        case v1.ProtocolUDP:
1✔
2969
                protostring = "udp"
1✔
2970
        case v1.ProtocolTCP:
1✔
2971
                protostring = "tcp"
1✔
UNCOV
2972
        case v1.ProtocolSCTP:
×
UNCOV
2973
                protostring = "sctp"
×
UNCOV
2974
        default:
×
UNCOV
2975
                protostring = "tcp"
×
2976
        }
2977
        return protostring
1✔
2978
}
2979

UNCOV
2980
func (cont *AciController) removeIpFromIngressIPList(ingressIps *[]net.IP, ip net.IP) {
×
UNCOV
2981
        cont.returnServiceIps([]net.IP{ip})
×
UNCOV
2982
        index := -1
×
UNCOV
2983
        for i, v := range *ingressIps {
×
UNCOV
2984
                if v.Equal(ip) {
×
UNCOV
2985
                        index = i
×
UNCOV
2986
                        break
×
2987
                }
2988
        }
UNCOV
2989
        if index == -1 {
×
UNCOV
2990
                return
×
UNCOV
2991
        }
×
UNCOV
2992
        *ingressIps = append((*ingressIps)[:index], (*ingressIps)[index+1:]...)
×
2993
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc