• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 9623

17 Aug 2024 08:14PM UTC coverage: 70.854% (-0.05%) from 70.908%
9623

push

travis-pro

abhis2112
Adding packet tracing feature for ovs datapath

13035 of 18397 relevant lines covered (70.85%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.1
/pkg/hostagent/services.go
1
// Copyright 2016 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package hostagent
16

17
import (
18
        "context"
19
        "encoding/json"
20
        "fmt"
21
        "net"
22
        "os"
23
        "path/filepath"
24
        "reflect"
25
        "strings"
26

27
        "github.com/noironetworks/aci-containers/pkg/util"
28
        configv1 "github.com/openshift/api/config/v1"
29
        "github.com/sirupsen/logrus"
30
        v1 "k8s.io/api/core/v1"
31
        discovery "k8s.io/api/discovery/v1"
32
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33
        "k8s.io/apimachinery/pkg/labels"
34
        "k8s.io/apimachinery/pkg/runtime"
35
        "k8s.io/apimachinery/pkg/types"
36
        "k8s.io/apimachinery/pkg/watch"
37
        "k8s.io/client-go/kubernetes"
38
        "k8s.io/client-go/tools/cache"
39
        "k8s.io/kubernetes/pkg/controller"
40
        "sigs.k8s.io/controller-runtime/pkg/client"
41
        "sigs.k8s.io/controller-runtime/pkg/client/config"
42
)
43

44
type opflexServiceMapping struct {
45
        ServiceIp    string `json:"service-ip,omitempty"`
46
        ServiceProto string `json:"service-proto,omitempty"`
47
        ServicePort  uint16 `json:"service-port,omitempty"`
48

49
        NextHopIps  []string `json:"next-hop-ips"`
50
        NextHopPort uint16   `json:"next-hop-port,omitempty"`
51

52
        Conntrack       bool                         `json:"conntrack-enabled"`
53
        NodePort        uint16                       `json:"node-port,omitempty"`
54
        SessionAffinity *opflexSessionAffinityConfig `json:"session-affinity,omitempty"`
55
}
56

57
type opflexService struct {
58
        Uuid string `json:"uuid"`
59

60
        DomainPolicySpace string `json:"domain-policy-space,omitempty"`
61
        DomainName        string `json:"domain-name,omitempty"`
62

63
        ServiceMode   string `json:"service-mode,omitempty"`
64
        ServiceMac    string `json:"service-mac,omitempty"`
65
        InterfaceName string `json:"interface-name,omitempty"`
66
        InterfaceIp   string `json:"interface-ip,omitempty"`
67
        InterfaceVlan uint16 `json:"interface-vlan,omitempty"`
68
        ServiceType   string `json:"service-type,omitempty"`
69

70
        ServiceMappings []opflexServiceMapping `json:"service-mapping"`
71

72
        Attributes map[string]string `json:"attributes,omitempty"`
73
}
74

75
type opflexSessionAffinityConfig struct {
76
        // clientIP contains the configurations of Client IP based session affinity.
77
        ClientIP opflexClientIPConfig `json:"client-ip,omitempty"`
78
}
79

80
// ClientIPConfig represents the configurations of Client IP based session affinity.
81
type opflexClientIPConfig struct {
82
        // timeoutSeconds specifies the seconds of ClientIP type session sticky time.
83
        // The value must be >0 && <=86400(for 1 day) if ServiceAffinity == "ClientIP".
84
        // Default value is 10800(for 3 hours).
85
        TimeoutSeconds int32 `json:"timeout-seconds,omitempty"`
86
}
87

88
// Default Session value is 10800(for 3 hours)
89
const (
90
        DefaultSessionAffinityTimer = 10800
91
        TempSessionAffinityTimer    = 1
92
)
93

94
// Name of the Openshift Service
95
const (
96
        RouterInternalDefault string = "router-internal-default"
97
)
98

99
// Namespace of Openshift Service
100
const (
101
        OpenShiftIngressNs string = "openshift-ingress"
102
)
103

104
// Represent the Openshift services
105
type opflexOcService struct {
106
        Name      string
107
        Namespace string
108
}
109

110
var Version = map[string]bool{
111
        "openshift-4.6-baremetal":              true,
112
        "openshift-4.7-baremetal":              true,
113
        "openshift-4.8-baremetal":              true,
114
        "openshift-4.9-baremetal":              true,
115
        "openshift-4.10-baremetal":             true,
116
        "openshift-4.11-baremetal":             true,
117
        "openshift-4.12-baremetal":             true,
118
        "openshift-4.13-baremetal":             true,
119
        "openshift-4.14-baremetal":             true,
120
        "openshift-4.15-baremetal":             true,
121
        "openshift-4.14-agent-based-baremetal": true,
122
        "openshift-4.4-esx":                    true,
123
        "openshift-4.5-esx":                    true,
124
        "openshift-4.6-esx":                    true,
125
        "openshift-4.7-esx":                    true,
126
        "openshift-4.8-esx":                    true,
127
        "openshift-4.9-esx":                    true,
128
        "openshift-4.10-esx":                   true,
129
        "openshift-4.11-esx":                   true,
130
        "openshift-4.12-esx":                   true,
131
        "openshift-4.13-esx":                   true,
132
        "openshift-4.14-esx":                   true,
133
        "openshift-4.15-esx":                   true,
134
        "openshift-4.14-agent-based-esx":       true,
135
}
136

137
func (agent *HostAgent) initEndpointsInformerFromClient(
138
        kubeClient *kubernetes.Clientset) {
×
139
        agent.initEndpointsInformerBase(
×
140
                &cache.ListWatch{
×
141
                        ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
×
142
                                obj, err := kubeClient.CoreV1().Endpoints(metav1.NamespaceAll).List(context.TODO(), options)
×
143
                                if err != nil {
×
144
                                        agent.log.Fatalf("Failed to list Endpoints during initialization of EndpointsInformer: %s", err)
×
145
                                }
×
146
                                return obj, err
×
147
                        },
148
                        WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
×
149
                                obj, err := kubeClient.CoreV1().Endpoints(metav1.NamespaceAll).Watch(context.TODO(), options)
×
150
                                if err != nil {
×
151
                                        agent.log.Fatalf("Failed to watch Endpoints during initialization of EndpointsInformer: %s", err)
×
152
                                }
×
153
                                return obj, err
×
154
                        },
155
                })
156
}
157

158
func (agent *HostAgent) initEndpointsInformerBase(listWatch *cache.ListWatch) {
1✔
159
        agent.endpointsInformer = cache.NewSharedIndexInformer(
1✔
160
                listWatch,
1✔
161
                &v1.Endpoints{},
1✔
162
                controller.NoResyncPeriodFunc(),
1✔
163
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
1✔
164
        )
1✔
165
        agent.endpointsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
1✔
166
                AddFunc: func(obj interface{}) {
2✔
167
                        agent.endpointsChanged(obj)
1✔
168
                },
1✔
169
                UpdateFunc: func(_ interface{}, obj interface{}) {
×
170
                        agent.endpointsChanged(obj)
×
171
                },
×
172
                DeleteFunc: func(obj interface{}) {
×
173
                        agent.endpointsChanged(obj)
×
174
                },
×
175
        })
176
}
177

178
func (agent *HostAgent) initEndpointSliceInformerFromClient(
179
        kubeClient *kubernetes.Clientset) {
×
180
        agent.initEndpointSliceInformerBase(
×
181
                &cache.ListWatch{
×
182
                        ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
×
183
                                obj, err := kubeClient.DiscoveryV1().EndpointSlices(metav1.NamespaceAll).List(context.TODO(), options)
×
184
                                if err != nil {
×
185
                                        agent.log.Fatalf("Failed to list EndpointSlices during initialization of EndpointSliceInformer: %s", err)
×
186
                                }
×
187
                                return obj, err
×
188
                        },
189
                        WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
×
190
                                obj, err := kubeClient.DiscoveryV1().EndpointSlices(metav1.NamespaceAll).Watch(context.TODO(), options)
×
191
                                if err != nil {
×
192
                                        agent.log.Fatalf("Failed to watch EndpointSlices during initialization of EndpointSliceInformer: %s", err)
×
193
                                }
×
194
                                return obj, err
×
195
                        },
196
                })
197
}
198

199
func (agent *HostAgent) initEndpointSliceInformerBase(listWatch *cache.ListWatch) {
1✔
200
        agent.endpointSliceInformer = cache.NewSharedIndexInformer(
1✔
201
                listWatch,
1✔
202
                &discovery.EndpointSlice{},
1✔
203
                controller.NoResyncPeriodFunc(),
1✔
204
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
1✔
205
        )
1✔
206
        agent.endpointSliceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
1✔
207
                AddFunc: func(obj interface{}) {
2✔
208
                        agent.endpointSliceChanged(obj)
1✔
209
                },
1✔
210
                UpdateFunc: func(_ interface{}, obj interface{}) {
×
211
                        agent.endpointSliceChanged(obj)
×
212
                },
×
213
                DeleteFunc: func(obj interface{}) {
×
214
                        agent.endpointSliceChanged(obj)
×
215
                },
×
216
        })
217
}
218

219
func (agent *HostAgent) initServiceInformerFromClient(
220
        kubeClient *kubernetes.Clientset) {
×
221
        agent.initServiceInformerBase(
×
222
                &cache.ListWatch{
×
223
                        ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
×
224
                                obj, err := kubeClient.CoreV1().Services(metav1.NamespaceAll).List(context.TODO(), options)
×
225
                                if err != nil {
×
226
                                        agent.log.Fatalf("Failed to list Services during initialization of ServiceInformer: %s", err)
×
227
                                }
×
228
                                return obj, err
×
229
                        },
230
                        WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
×
231
                                obj, err := kubeClient.CoreV1().Services(metav1.NamespaceAll).Watch(context.TODO(), options)
×
232
                                if err != nil {
×
233
                                        agent.log.Fatalf("Failed to watch Services during initialization of ServiceInformer: %s", err)
×
234
                                }
×
235
                                return obj, err
×
236
                        },
237
                })
238
}
239

240
func (agent *HostAgent) initServiceInformerBase(listWatch *cache.ListWatch) {
1✔
241
        agent.serviceInformer = cache.NewSharedIndexInformer(
1✔
242
                listWatch,
1✔
243
                &v1.Service{},
1✔
244
                controller.NoResyncPeriodFunc(),
1✔
245
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
1✔
246
        )
1✔
247
        agent.serviceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
1✔
248
                AddFunc: func(obj interface{}) {
2✔
249
                        agent.serviceChanged(obj)
1✔
250
                },
1✔
251
                UpdateFunc: func(_ interface{}, obj interface{}) {
1✔
252
                        agent.serviceChanged(obj)
1✔
253
                },
1✔
254
                DeleteFunc: func(obj interface{}) {
1✔
255
                        agent.serviceDeleted(obj)
1✔
256
                },
1✔
257
        })
258
}
259

260
func writeAs(asfile string, as *opflexService) (bool, error) {
1✔
261
        newdata, err := json.MarshalIndent(as, "", "  ")
1✔
262
        if err != nil {
1✔
263
                return true, err
×
264
        }
×
265
        existingdata, err := os.ReadFile(asfile)
1✔
266
        if err == nil && reflect.DeepEqual(existingdata, newdata) {
2✔
267
                return false, nil
1✔
268
        }
1✔
269

270
        err = os.WriteFile(asfile, newdata, 0644)
1✔
271
        return true, err
1✔
272
}
273

274
func serviceLogger(log *logrus.Logger, as *v1.Service) *logrus.Entry {
×
275
        return log.WithFields(logrus.Fields{
×
276
                "namespace": as.ObjectMeta.Namespace,
×
277
                "name":      as.ObjectMeta.Name,
×
278
                "type":      as.Spec.Type,
×
279
        })
×
280
}
×
281

282
func opflexServiceLogger(log *logrus.Logger, as *opflexService) *logrus.Entry {
1✔
283
        return log.WithFields(logrus.Fields{
1✔
284
                "namespace": as.Attributes["namespace"],
1✔
285
                "name":      as.Attributes["name"],
1✔
286
                "uuid":      as.Uuid,
1✔
287
                "tenant":    as.DomainPolicySpace,
1✔
288
                "vrf":       as.DomainName,
1✔
289
        })
1✔
290
}
1✔
291

292
func (agent *HostAgent) syncServices() bool {
1✔
293
        if !agent.syncEnabled {
1✔
294
                return false
×
295
        }
×
296

297
        agent.log.Debug("Syncing services")
1✔
298
        agent.indexMutex.Lock()
1✔
299
        opflexServices := make(map[string]*opflexService)
1✔
300
        for k, v := range agent.opflexServices {
2✔
301
                val := &opflexService{}
1✔
302
                err := util.DeepCopyObj(v, val)
1✔
303
                if err != nil {
1✔
304
                        continue
×
305
                }
306
                opflexServices[k] = val
1✔
307
        }
308
        agent.indexMutex.Unlock()
1✔
309

1✔
310
        files, err := os.ReadDir(agent.config.OpFlexServiceDir)
1✔
311
        if err != nil {
1✔
312
                agent.log.WithFields(
×
313
                        logrus.Fields{"serviceDir": agent.config.OpFlexServiceDir},
×
314
                ).Error("Could not read directory " + err.Error())
×
315
                return true
×
316
        }
×
317
        seen := make(map[string]bool)
1✔
318
        for _, f := range files {
2✔
319
                uuid := f.Name()
1✔
320
                if strings.HasSuffix(uuid, ".as") {
1✔
321
                        uuid = uuid[:len(uuid)-3]
×
322
                } else if strings.HasSuffix(f.Name(), ".service") {
2✔
323
                        uuid = uuid[:len(uuid)-8]
1✔
324
                } else {
2✔
325
                        continue
1✔
326
                }
327

328
                asfile := filepath.Join(agent.config.OpFlexServiceDir, f.Name())
1✔
329
                logger := agent.log.WithFields(
1✔
330
                        logrus.Fields{"Uuid": uuid},
1✔
331
                )
1✔
332
                existing, ok := opflexServices[uuid]
1✔
333
                if ok {
2✔
334
                        wrote, err := writeAs(asfile, existing)
1✔
335
                        if err != nil {
1✔
336
                                opflexServiceLogger(agent.log, existing).
×
337
                                        Error("Error writing service file: ", err)
×
338
                        } else if wrote {
2✔
339
                                opflexServiceLogger(agent.log, existing).Info("Updated service")
1✔
340
                        }
1✔
341
                        seen[uuid] = true
1✔
342
                } else {
1✔
343
                        logger.Info("Removing service")
1✔
344
                        os.Remove(asfile)
1✔
345
                }
1✔
346
        }
347

348
        for _, as := range opflexServices {
2✔
349
                if seen[as.Uuid] {
2✔
350
                        continue
1✔
351
                }
352

353
                opflexServiceLogger(agent.log, as).Info("Adding service")
1✔
354
                asfile :=
1✔
355
                        filepath.Join(agent.config.OpFlexServiceDir, as.Uuid+".service")
1✔
356
                _, err = writeAs(asfile, as)
1✔
357
                if err != nil {
1✔
358
                        opflexServiceLogger(agent.log, as).
×
359
                                Error("Error writing service file: ", err)
×
360
                }
×
361
        }
362

363
        agent.log.Debug("Finished service sync")
1✔
364
        return false
1✔
365
}
366

367
// Must have index lock
368
func (agent *HostAgent) updateServiceDesc(external bool, as *v1.Service, key string) bool {
1✔
369
        if as.Spec.ClusterIP == "None" {
1✔
370
                agent.log.Debugf("ClusterIP of service %s is set to None", as.ObjectMeta.Name)
×
371
                return true
×
372
        }
×
373

374
        ofas := &opflexService{
1✔
375
                Uuid:              string(as.ObjectMeta.UID),
1✔
376
                DomainPolicySpace: agent.config.AciVrfTenant,
1✔
377
                DomainName:        agent.config.AciVrf,
1✔
378
                ServiceMode:       "loadbalancer",
1✔
379
                ServiceMappings:   make([]opflexServiceMapping, 0),
1✔
380
        }
1✔
381
        switch as.Spec.Type {
1✔
382
        case v1.ServiceTypeClusterIP:
×
383
                ofas.ServiceType = "clusterIp"
×
384
        case v1.ServiceTypeNodePort:
×
385
                ofas.ServiceType = "nodePort"
×
386
        case v1.ServiceTypeLoadBalancer:
1✔
387
                ofas.ServiceType = "loadBalancer"
1✔
388
        case v1.ServiceTypeExternalName:
×
389
                ofas.ServiceType = "externalName"
×
390
        }
391

392
        if external {
2✔
393
                if agent.config.UplinkIface == "" ||
1✔
394
                        agent.serviceEp.Ipv4 == nil ||
1✔
395
                        agent.serviceEp.Mac == "" {
2✔
396
                        return false
1✔
397
                }
1✔
398

399
                ofas.InterfaceName = agent.config.UplinkIface
1✔
400
                ofas.InterfaceVlan = uint16(agent.config.ServiceVlan)
1✔
401
                // Directly using the Uplink MacAdress instead of using Opflex injected mac
1✔
402
                ofas.ServiceMac = agent.config.UplinkMacAdress
1✔
403
                ofas.InterfaceIp = agent.serviceEp.Ipv4.String()
1✔
404
                ofas.Uuid += "-external"
1✔
405
        }
406
        hasValidMapping := false
1✔
407
        for ix := range as.Spec.Ports {
2✔
408
                hasValidMapping = agent.serviceEndPoints.SetOpflexService(ofas, as, external, key, &as.Spec.Ports[ix])
1✔
409
        }
1✔
410

411
        id := fmt.Sprintf("%s_%s", as.ObjectMeta.Namespace, as.ObjectMeta.Name)
1✔
412
        ofas.Attributes = as.ObjectMeta.Labels
1✔
413
        if ofas.Attributes == nil {
1✔
414
                ofas.Attributes = make(map[string]string)
×
415
        }
×
416
        ofas.Attributes["namespace"] = as.ObjectMeta.Namespace
1✔
417
        ofas.Attributes["name"] = as.ObjectMeta.Name
1✔
418
        ofas.Attributes["service-name"] = id
1✔
419

1✔
420
        existing, ok := agent.opflexServices[ofas.Uuid]
1✔
421
        if hasValidMapping {
2✔
422
                if (ok && !reflect.DeepEqual(existing, ofas)) || !ok {
2✔
423
                        agent.opflexServices[ofas.Uuid] = ofas
1✔
424
                        // Check matching oc serivce and create a extra service file.
1✔
425
                        // This Change is specfic to Openshfit domain
1✔
426
                        agent.setOpenShfitService(as, external, ofas)
1✔
427
                }
1✔
428
                return true
1✔
429
        }
430
        if ok {
1✔
431
                delete(agent.opflexServices, ofas.Uuid)
×
432
                return true
×
433
        }
×
434

435
        return false
1✔
436
}
437

438
// must have index lock
439
func (agent *HostAgent) doUpdateService(key string) {
1✔
440
        asobj, exists, err := agent.serviceInformer.GetStore().GetByKey(key)
1✔
441
        if err != nil {
1✔
442
                agent.log.Error("Could not lookup service for " +
×
443
                        key + ": " + err.Error())
×
444
                return
×
445
        }
×
446
        if !exists || asobj == nil {
2✔
447
                return
1✔
448
        }
1✔
449
        as := asobj.(*v1.Service)
1✔
450
        doSync := false
1✔
451
        doSync = agent.updateServiceDesc(false, as, key) || doSync
1✔
452
        doSync = agent.updateServiceDesc(true, as, key) || doSync
1✔
453
        if doSync {
2✔
454
                agent.scheduleSyncServices()
1✔
455
                agent.updateEpFileWithClusterIp(as, false)
1✔
456
                agent.scheduleSyncEps()
1✔
457
        }
1✔
458
}
459

460
func (agent *HostAgent) endpointsChanged(obj interface{}) {
1✔
461
        agent.indexMutex.Lock()
1✔
462
        defer agent.indexMutex.Unlock()
1✔
463

1✔
464
        endpoints := obj.(*v1.Endpoints)
1✔
465
        agent.log.Debugf("Endpoint changed: name=%s namespace=%s",
1✔
466
                endpoints.ObjectMeta.Name, endpoints.ObjectMeta.Namespace)
1✔
467

1✔
468
        key, err := cache.MetaNamespaceKeyFunc(endpoints)
1✔
469
        if err != nil {
1✔
470
                agent.log.Error("Could not create key:" + err.Error())
×
471
                return
×
472
        }
×
473
        agent.doUpdateService(key)
1✔
474
}
475
func getServiceKey(endPointSlice *discovery.EndpointSlice) (string, bool) {
1✔
476
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
1✔
477
        if !ok {
1✔
478
                return "", false
×
479
        }
×
480
        return endPointSlice.ObjectMeta.Namespace + "/" + serviceName, true
1✔
481
}
482

483
func (agent *HostAgent) endpointSliceChanged(obj interface{}) {
1✔
484
        agent.indexMutex.Lock()
1✔
485
        defer agent.indexMutex.Unlock()
1✔
486
        endpointslice := obj.(*discovery.EndpointSlice)
1✔
487
        agent.log.Debugf("endpointslice changed: name=%s namespace=%s",
1✔
488
                endpointslice.ObjectMeta.Name, endpointslice.ObjectMeta.Namespace)
1✔
489
        servicekey, ok := getServiceKey(endpointslice)
1✔
490
        if !ok {
1✔
491
                return
×
492
        }
×
493
        agent.doUpdateService(servicekey)
1✔
494
}
495

496
func (agent *HostAgent) serviceChanged(obj interface{}) {
1✔
497
        agent.indexMutex.Lock()
1✔
498
        defer agent.indexMutex.Unlock()
1✔
499

1✔
500
        as := obj.(*v1.Service)
1✔
501
        agent.log.Debugf("Service changed: name=%s namespace=%s",
1✔
502
                as.ObjectMeta.Name, as.ObjectMeta.Namespace)
1✔
503

1✔
504
        key, err := cache.MetaNamespaceKeyFunc(as)
1✔
505
        if err != nil {
1✔
506
                serviceLogger(agent.log, as).
×
507
                        Error("Could not create service object key:" + err.Error())
×
508
                return
×
509
        }
×
510
        agent.doUpdateService(key)
1✔
511
        agent.handleObjectUpdateForSnat(obj)
1✔
512
}
513

514
func (agent *HostAgent) serviceDeleted(obj interface{}) {
1✔
515
        agent.indexMutex.Lock()
1✔
516
        defer agent.indexMutex.Unlock()
1✔
517

1✔
518
        as, isService := obj.(*v1.Service)
1✔
519
        if !isService {
1✔
520
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
521
                if !ok {
×
522
                        agent.log.Error("Received unexpected object: ", obj)
×
523
                        return
×
524
                }
×
525
                as, ok = deletedState.Obj.(*v1.Service)
×
526
                if !ok {
×
527
                        agent.log.Error("DeletedFinalStateUnknown contained non-Services object: ", deletedState.Obj)
×
528
                        return
×
529
                }
×
530
        }
531
        agent.log.Debugf("Service deleted: name=%s namespace=%s",
1✔
532
                as.ObjectMeta.Name, as.ObjectMeta.Namespace)
1✔
533

1✔
534
        u := string(as.ObjectMeta.UID)
1✔
535
        if _, ok := agent.opflexServices[u]; ok {
2✔
536
                delete(agent.opflexServices, u)
1✔
537
                delete(agent.opflexServices, u+"-external")
1✔
538
                for _, v := range agent.ocServices {
2✔
539
                        if v.Name == as.ObjectMeta.Name &&
1✔
540
                                v.Namespace == as.ObjectMeta.Namespace {
1✔
541
                                delete(agent.opflexServices, u+"-"+v.Name)
×
542
                        }
×
543
                }
544
                agent.scheduleSyncServices()
1✔
545
                agent.deleteServIpFromEp(u)
1✔
546
                agent.scheduleSyncEps()
1✔
547
        }
548
        agent.handleObjectDeleteForSnat(obj)
1✔
549
}
550

551
func (agent *HostAgent) updateAllServices() {
1✔
552
        if agent.serviceInformer == nil {
1✔
553
                return
×
554
        }
×
555
        store := agent.serviceInformer.GetStore()
1✔
556
        if store == nil {
1✔
557
                return
×
558
        }
×
559
        keys := agent.serviceInformer.GetStore().ListKeys()
1✔
560
        if keys == nil {
1✔
561
                return
×
562
        }
×
563

564
        agent.indexMutex.Lock()
1✔
565
        defer agent.indexMutex.Unlock()
1✔
566
        for _, key := range keys {
1✔
567
                agent.doUpdateService(key)
×
568
        }
×
569
}
570

571
// This API is get the OpenShift InfrastructreIp's
572
func (agent *HostAgent) getInfrastucreIp(serviceName string) string {
×
573
        if _, ok := Version[agent.config.Flavor]; ok {
×
574
                if serviceName == RouterInternalDefault {
×
575
                        return agent.config.InstallerProvlbIp
×
576
                }
×
577
                return ""
×
578
        }
579
        infraStructureInfo := &configv1.Infrastructure{
×
580
                TypeMeta:   metav1.TypeMeta{APIVersion: configv1.GroupVersion.String(), Kind: "Infrastructure"},
×
581
                ObjectMeta: metav1.ObjectMeta{Name: "cluster"},
×
582
        }
×
583
        cfg, _ := config.GetConfig()
×
584
        scheme := runtime.NewScheme()
×
585
        scheme.AddKnownTypes(configv1.SchemeGroupVersion, &configv1.Infrastructure{})
×
586
        scheme.AddKnownTypes(configv1.SchemeGroupVersion, &metav1.GetOptions{})
×
587
        rclient, err := client.New(cfg, client.Options{Scheme: scheme})
×
588
        if err != nil {
×
589
                agent.log.Error(err.Error())
×
590
                return ""
×
591
        }
×
592
        if rclient == nil {
×
593
                agent.log.Error("client is nil")
×
594
                return ""
×
595
        }
×
596
        err = rclient.Get(context.TODO(), types.NamespacedName{
×
597
                Name: "cluster"}, infraStructureInfo)
×
598
        if err != nil {
×
599
                agent.log.Error(err.Error())
×
600
                return ""
×
601
        }
×
602
        if infraStructureInfo.Status.Platform == configv1.OpenStackPlatformType {
×
603
                if infraStructureInfo.Status.PlatformStatus != nil &&
×
604
                        infraStructureInfo.Status.PlatformStatus.OpenStack != nil {
×
605
                        if serviceName == RouterInternalDefault {
×
606
                                return infraStructureInfo.Status.PlatformStatus.OpenStack.IngressIP
×
607
                        }
×
608
                }
609
        }
610
        return ""
×
611
}
612

613
func (agent *HostAgent) setOpenShfitService(as *v1.Service, external bool, ofas *opflexService) {
1✔
614
        if agent.config.AciVmmDomainType == "OpenShift" {
1✔
615
                if !external {
×
616
                        for _, v := range agent.ocServices {
×
617
                                // Check for Namespace is equal
×
618
                                if v.Namespace != as.ObjectMeta.Namespace {
×
619
                                        continue
×
620
                                }
621
                                // Check Service Name is equal
622
                                if v.Name != as.ObjectMeta.Name {
×
623
                                        continue
×
624
                                }
625
                                InfraIp := agent.getInfrastucreIp(as.ObjectMeta.Name)
×
626
                                agent.log.Debug("InfraIp: ", InfraIp)
×
627
                                if InfraIp == "" {
×
628
                                        continue
×
629
                                }
630
                                ocas := &opflexService{
×
631
                                        Uuid:              string(as.ObjectMeta.UID),
×
632
                                        DomainPolicySpace: agent.config.AciVrfTenant,
×
633
                                        DomainName:        agent.config.AciVrf,
×
634
                                        ServiceMode:       "loadbalancer",
×
635
                                        ServiceMappings:   make([]opflexServiceMapping, 0),
×
636
                                }
×
637
                                ocas.Uuid = ocas.Uuid + "-" + as.ObjectMeta.Name
×
638
                                for _, val := range ofas.ServiceMappings {
×
639
                                        val.ServiceIp = InfraIp
×
640
                                        ocas.ServiceMappings = append(ocas.ServiceMappings, val)
×
641
                                }
×
642
                                ocas.ServiceType = ofas.ServiceType
×
643
                                ocas.Attributes = ofas.Attributes
×
644
                                agent.opflexServices[ocas.Uuid] = ocas
×
645
                        }
646
                }
647
        }
648
}
649

650
func (sep *serviceEndpoint) SetOpflexService(ofas *opflexService, as *v1.Service,
651
        external bool, key string, sp *v1.ServicePort) bool {
1✔
652
        agent := sep.agent
1✔
653
        endpointsobj, exists, err :=
1✔
654
                agent.endpointsInformer.GetStore().GetByKey(key)
1✔
655
        if err != nil {
1✔
656
                agent.log.Error("Could not lookup endpoints for " +
×
657
                        key + ": " + err.Error())
×
658
                return false
×
659
        }
×
660
        if !exists || endpointsobj == nil {
1✔
661
                agent.log.Debugf("no endpoints for service %s/%s", as.Namespace, as.Name)
×
662
                return false
×
663
        }
×
664
        endpoints := endpointsobj.(*v1.Endpoints)
1✔
665
        hasValidMapping := false
1✔
666

1✔
667
        type void struct{}
1✔
668
        var ipexists void
1✔
669
        clusterIPs := make(map[string]void)
1✔
670
        clusterIPs[as.Spec.ClusterIP] = ipexists
1✔
671
        clusterIPsField := reflect.ValueOf(as.Spec).FieldByName("ClusterIPs")
1✔
672
        if clusterIPsField.IsValid() {
2✔
673
                for _, ip := range as.Spec.ClusterIPs {
1✔
674
                        clusterIPs[ip] = ipexists
×
675
                }
×
676
        }
677

678
        for clusterIP := range clusterIPs {
2✔
679
                for _, e := range endpoints.Subsets {
2✔
680
                        if len(e.Addresses) == 0 {
1✔
681
                                continue
×
682
                        }
683
                        parsedClusterIp := net.ParseIP(clusterIP)
1✔
684
                        parsedPodIp := net.ParseIP(e.Addresses[0].IP)
1✔
685

1✔
686
                        if parsedClusterIp == nil || parsedPodIp == nil {
1✔
687
                                agent.log.Info("Not a valid IP address..", parsedClusterIp, parsedPodIp)
×
688
                                continue
×
689
                        }
690
                        if parsedClusterIp.To4() != nil && parsedPodIp.To4() != nil {
2✔
691
                                agent.log.Info("Both are IPv4 addresses..", parsedClusterIp, parsedPodIp, "Adding to map..")
1✔
692
                        } else if parsedClusterIp.To4() == nil && parsedPodIp.To4() == nil {
3✔
693
                                agent.log.Info("Both are IPv6 addresses..", parsedClusterIp, parsedPodIp, "Adding to map..")
1✔
694
                        } else {
1✔
695
                                continue
×
696
                        }
697
                        for _, p := range e.Ports {
2✔
698
                                if p.Protocol != sp.Protocol {
1✔
699
                                        continue
×
700
                                }
701
                                if p.Name != sp.Name {
1✔
702
                                        continue
×
703
                                }
704

705
                                sm := &opflexServiceMapping{
1✔
706
                                        ServicePort:  uint16(sp.Port),
1✔
707
                                        ServiceProto: strings.ToLower(string(sp.Protocol)),
1✔
708
                                        NextHopIps:   make([]string, 0),
1✔
709
                                        NextHopPort:  uint16(p.Port),
1✔
710
                                        Conntrack:    true,
1✔
711
                                        NodePort:     uint16(sp.NodePort),
1✔
712
                                }
1✔
713

1✔
714
                                if external {
2✔
715
                                        if as.Spec.Type == v1.ServiceTypeLoadBalancer &&
1✔
716
                                                len(as.Status.LoadBalancer.Ingress) > 0 {
2✔
717
                                                for _, ip := range as.Status.LoadBalancer.Ingress {
2✔
718
                                                        LBIp := net.ParseIP(ip.IP)
1✔
719
                                                        if (LBIp.To4() != nil) == (parsedPodIp.To4() != nil) {
2✔
720
                                                                sm.ServiceIp = ip.IP
1✔
721
                                                                break
1✔
722
                                                        }
723
                                                }
724
                                        }
725
                                } else {
1✔
726
                                        sm.ServiceIp = clusterIP
1✔
727
                                }
1✔
728
                                sm.SessionAffinity = getSessionAffinity(as)
1✔
729
                                for _, a := range e.Addresses {
2✔
730
                                        if !external ||
1✔
731
                                                (a.NodeName != nil && *a.NodeName == agent.config.NodeName) {
2✔
732
                                                sm.NextHopIps = append(sm.NextHopIps, a.IP)
1✔
733
                                        }
1✔
734
                                }
735
                                if sm.ServiceIp != "" && len(sm.NextHopIps) > 0 {
2✔
736
                                        hasValidMapping = true
1✔
737
                                }
1✔
738

739
                                ofas.ServiceMappings = append(ofas.ServiceMappings, *sm)
1✔
740
                        }
741
                }
742
        }
743
        return hasValidMapping
1✔
744
}
745

746
func getSessionAffinity(as *v1.Service) *opflexSessionAffinityConfig {
1✔
747
        if as.Spec.SessionAffinityConfig != nil && as.Spec.SessionAffinity == "ClientIP" {
2✔
748
                config := as.Spec.SessionAffinityConfig
1✔
749
                if config.ClientIP != nil && config.ClientIP.TimeoutSeconds != nil {
2✔
750
                        return &opflexSessionAffinityConfig{ClientIP: opflexClientIPConfig{TimeoutSeconds: *config.ClientIP.TimeoutSeconds}}
1✔
751
                } else {
1✔
752
                        return &opflexSessionAffinityConfig{ClientIP: opflexClientIPConfig{TimeoutSeconds: DefaultSessionAffinityTimer}}
×
753
                }
×
754
        }
755
        return &opflexSessionAffinityConfig{ClientIP: opflexClientIPConfig{TimeoutSeconds: TempSessionAffinityTimer}}
×
756
}
757

758
func (seps *serviceEndpointSlice) SetOpflexService(ofas *opflexService, as *v1.Service,
759
        external bool, key string, sp *v1.ServicePort) bool {
1✔
760
        agent := seps.agent
1✔
761
        hasValidMapping := false
1✔
762
        var endpointSlices []*discovery.EndpointSlice
1✔
763
        label := map[string]string{discovery.LabelServiceName: as.ObjectMeta.Name}
1✔
764
        selector := labels.SelectorFromSet(labels.Set(label))
1✔
765
        cache.ListAllByNamespace(agent.endpointSliceInformer.GetIndexer(), as.ObjectMeta.Namespace, selector,
1✔
766
                func(endpointSliceobj interface{}) {
2✔
767
                        endpointSlices = append(endpointSlices, endpointSliceobj.(*discovery.EndpointSlice))
1✔
768
                })
1✔
769

770
        type void struct{}
1✔
771
        var ipexists void
1✔
772
        clusterIPs := make(map[string]void)
1✔
773
        clusterIPs[as.Spec.ClusterIP] = ipexists
1✔
774
        clusterIPsField := reflect.ValueOf(as.Spec).FieldByName("ClusterIPs")
1✔
775
        if clusterIPsField.IsValid() {
2✔
776
                for _, ip := range as.Spec.ClusterIPs {
1✔
777
                        clusterIPs[ip] = ipexists
×
778
                }
×
779
        }
780

781
        for clusterIP := range clusterIPs {
2✔
782
                for _, endpointSlice := range endpointSlices {
2✔
783
                        if !(len(endpointSlice.Endpoints) > 0 && len(endpointSlice.Endpoints[0].Addresses) > 0) {
1✔
784
                                continue
×
785
                        }
786
                        parsedClusterIp := net.ParseIP(clusterIP)
1✔
787
                        parsedPodIp := net.ParseIP(endpointSlice.Endpoints[0].Addresses[0])
1✔
788
                        if parsedClusterIp == nil || parsedPodIp == nil {
1✔
789
                                agent.log.Info("Not a valid IP address..", parsedClusterIp, parsedPodIp)
×
790
                                continue
×
791
                        }
792
                        if parsedClusterIp.To4() != nil && parsedPodIp.To4() != nil {
2✔
793
                                agent.log.Info("Both are IPv4 addresses..", parsedClusterIp, parsedPodIp, "Adding to map..")
1✔
794
                        } else if parsedClusterIp.To4() == nil && parsedPodIp.To4() == nil {
3✔
795
                                agent.log.Info("Both are IPv6 addresses..", parsedClusterIp, parsedPodIp, "Adding to map..")
1✔
796
                        } else {
1✔
797
                                continue
×
798
                        }
799
                        for _, p := range endpointSlice.Ports {
2✔
800
                                if p.Protocol != nil && *p.Protocol != sp.Protocol {
1✔
801
                                        continue
×
802
                                }
803

804
                                if p.Name != nil && *p.Name != sp.Name {
1✔
805
                                        continue
×
806
                                }
807

808
                                sm := &opflexServiceMapping{
1✔
809
                                        ServicePort:  uint16(sp.Port),
1✔
810
                                        ServiceProto: strings.ToLower(string(sp.Protocol)),
1✔
811
                                        NextHopIps:   make([]string, 0),
1✔
812
                                        NextHopPort:  uint16(*p.Port),
1✔
813
                                        Conntrack:    true,
1✔
814
                                        NodePort:     uint16(sp.NodePort),
1✔
815
                                }
1✔
816

1✔
817
                                if external {
2✔
818
                                        if as.Spec.Type == v1.ServiceTypeLoadBalancer &&
1✔
819
                                                len(as.Status.LoadBalancer.Ingress) > 0 {
2✔
820
                                                for _, ip := range as.Status.LoadBalancer.Ingress {
2✔
821
                                                        LBIp := net.ParseIP(ip.IP)
1✔
822
                                                        if (LBIp.To4() != nil) == (parsedPodIp.To4() != nil) {
2✔
823
                                                                sm.ServiceIp = ip.IP
1✔
824
                                                                break
1✔
825
                                                        }
826
                                                }
827
                                        }
828
                                } else {
1✔
829
                                        sm.ServiceIp = clusterIP
1✔
830
                                }
1✔
831
                                nexthops := make(map[string][]string)
1✔
832
                                var nodeZone string
1✔
833
                                for _, e := range endpointSlice.Endpoints {
2✔
834
                                        for _, a := range e.Addresses {
2✔
835
                                                if !external || (e.NodeName != nil && *e.NodeName == agent.config.NodeName) {
2✔
836
                                                        obj, exists, err := agent.nodeInformer.GetStore().GetByKey(agent.config.NodeName)
1✔
837
                                                        if err != nil {
1✔
838
                                                                agent.log.Error("Could not lookup node: ", err)
×
839
                                                                continue
×
840
                                                        }
841
                                                        if !exists && obj == nil {
1✔
842
                                                                agent.log.Error("Object nil")
×
843
                                                                continue
×
844
                                                        }
845
                                                        node := obj.(*v1.Node)
1✔
846
                                                        // Services need an annotation to inform the
1✔
847
                                                        // endpointslice controller to add hints
1✔
848
                                                        // Currently-1.22 only zones are used as hints.
1✔
849
                                                        // https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/2433-topology-aware-hints
1✔
850
                                                        var hintsEnabled = false
1✔
851
                                                        if val1, ok1 := as.ObjectMeta.Annotations["service.kubernetes.io/topology-aware-routing"]; ok1 && (strings.ToLower(val1) == "auto") {
1✔
852
                                                                hintsEnabled = true
×
853
                                                        }
×
854
                                                        // annotation deprecated in 1.27 but still supported
855
                                                        if val2, ok2 := as.ObjectMeta.Annotations["service.kubernetes.io/topology-aware-hints"]; ok2 && (strings.ToLower(val2) == "auto") {
1✔
856
                                                                hintsEnabled = true
×
857
                                                        }
×
858
                                                        // annotation as of 1.27
859
                                                        if val3, ok3 := as.ObjectMeta.Annotations[v1.AnnotationTopologyMode]; ok3 && (strings.ToLower(val3) == "auto") {
2✔
860
                                                                hintsEnabled = true
1✔
861
                                                        }
1✔
862
                                                        zone, zoneOk := node.ObjectMeta.Labels[v1.LabelTopologyZone]
1✔
863
                                                        nodeZone = zone
1✔
864
                                                        if !external && zoneOk && hintsEnabled && e.Hints != nil {
2✔
865
                                                                for _, hintZone := range e.Hints.ForZones {
2✔
866
                                                                        if nodeZone == hintZone.Name && *e.Conditions.Ready {
2✔
867
                                                                                nexthops["topologyawarehints"] =
1✔
868
                                                                                        append(nexthops["topologyawarehints"], a)
1✔
869
                                                                        }
1✔
870
                                                                }
871
                                                        } else if *e.Conditions.Ready {
2✔
872
                                                                nexthops["any"] = append(nexthops["any"], a)
1✔
873
                                                        }
1✔
874
                                                }
875
                                        }
876
                                }
877
                                // Select the high priority keys as datapath doesn't have support for fallback
878
                                if _, ok := nexthops["topologyawarehints"]; ok {
2✔
879
                                        sm.NextHopIps = append(sm.NextHopIps, nexthops["topologyawarehints"]...)
1✔
880
                                        agent.log.Info("NextHopIps are", sm.NextHopIps)
1✔
881
                                        agent.log.Debugf("Topology matching hint: %s Nexthops: %s", nodeZone, sm.NextHopIps)
1✔
882
                                } else {
2✔
883
                                        sm.NextHopIps = append(sm.NextHopIps, nexthops["any"]...)
1✔
884
                                        agent.log.Info("NextHopIps are", sm.NextHopIps)
1✔
885
                                }
1✔
886
                                if sm.ServiceIp != "" && len(sm.NextHopIps) > 0 {
2✔
887
                                        hasValidMapping = true
1✔
888
                                }
1✔
889
                                sm.SessionAffinity = getSessionAffinity(as)
1✔
890
                                ofas.ServiceMappings = append(ofas.ServiceMappings, *sm)
1✔
891
                        }
892
                }
893
        }
894
        return hasValidMapping
1✔
895
}
896

897
func (agent *HostAgent) updateEpFileWithClusterIp(as *v1.Service, deleted bool) {
1✔
898
        suid := string(as.ObjectMeta.UID)
1✔
899
        ofas, ok := agent.opflexServices[suid]
1✔
900
        var dummy struct{}
1✔
901
        if ok {
2✔
902
                if as.Spec.ClusterIP == "None" {
1✔
903
                        return
×
904
                }
×
905
                podKeys := agent.getPodKeysFromSm(ofas.ServiceMappings)
1✔
906
                current := make(map[string]struct{})
1✔
907
                for _, key := range podKeys {
2✔
908
                        obj, exists, err := agent.podInformer.GetStore().GetByKey(key)
1✔
909
                        if err == nil && exists && (obj != nil) {
2✔
910
                                pod := obj.(*v1.Pod)
1✔
911
                                if agent.config.NodeName != pod.Spec.NodeName {
1✔
912
                                        continue
×
913
                                }
914
                                poduid := string(pod.ObjectMeta.UID)
1✔
915
                                if _, sok := agent.servicetoPodUids[suid]; !sok {
2✔
916
                                        agent.servicetoPodUids[suid] = make(map[string]struct{})
1✔
917
                                }
1✔
918
                                agent.servicetoPodUids[suid][poduid] = dummy
1✔
919
                                if _, podok := agent.podtoServiceUids[poduid]; !podok {
2✔
920
                                        agent.podtoServiceUids[poduid] = make(map[string][]string)
1✔
921
                                }
1✔
922

923
                                type void struct{}
1✔
924
                                var ipexists void
1✔
925
                                clusterIPs := make(map[string]void)
1✔
926
                                clusterIPs[as.Spec.ClusterIP] = ipexists
1✔
927
                                clusterIPsField := reflect.ValueOf(as.Spec).FieldByName("ClusterIPs")
1✔
928
                                if clusterIPsField.IsValid() {
2✔
929
                                        for _, ip := range as.Spec.ClusterIPs {
1✔
930
                                                clusterIPs[ip] = ipexists
×
931
                                        }
×
932
                                }
933

934
                                var listClusterIPs []string
1✔
935
                                for ip := range clusterIPs {
2✔
936
                                        listClusterIPs = append(listClusterIPs, ip)
1✔
937
                                }
1✔
938
                                agent.podtoServiceUids[poduid][suid] = listClusterIPs
1✔
939
                                agent.log.Info("EpUpdated: ", poduid, " with ClusterIp: ", listClusterIPs)
1✔
940
                                current[poduid] = dummy
1✔
941
                        }
942
                }
943
                // reconcile with the current pods matching the service
944
                // if there is any stale info remove that from service matching the pods
945
                // update the revese map for the pod to service
946
                poduids := agent.servicetoPodUids[suid]
1✔
947
                for id := range poduids {
2✔
948
                        if _, ok := current[id]; !ok {
1✔
949
                                delete(agent.servicetoPodUids[suid], id)
×
950
                                delete(agent.podtoServiceUids[id], suid)
×
951
                                if len(agent.podtoServiceUids[id]) == 0 {
×
952
                                        delete(agent.podtoServiceUids, id)
×
953
                                }
×
954
                        }
955
                }
956
        } else {
×
957
                agent.deleteServIpFromEp(suid)
×
958
        }
×
959
}
960

961
func (agent *HostAgent) getPodKeysFromSm(sm []opflexServiceMapping) []string {
1✔
962
        var podkeys []string
1✔
963
        if len(sm) > 0 {
2✔
964
                podIps := sm[0].NextHopIps
1✔
965
                for _, ip := range podIps {
2✔
966
                        podkey, ok := agent.podIpToName[ip]
1✔
967
                        if ok {
2✔
968
                                podkeys = append(podkeys, podkey)
1✔
969
                        }
1✔
970
                }
971
        }
972
        return podkeys
1✔
973
}
974

975
func (agent *HostAgent) getServiceIPs(poduid string) []string {
1✔
976
        var ips []string
1✔
977
        agent.indexMutex.Lock()
1✔
978
        v, ok := agent.podtoServiceUids[poduid]
1✔
979
        if ok {
2✔
980
                for _, service_ips := range v {
2✔
981
                        ips = append(ips, service_ips...)
1✔
982
                }
1✔
983
        }
984
        agent.indexMutex.Unlock()
1✔
985
        return ips
1✔
986
}
987

988
func (agent *HostAgent) deleteServIpFromEp(suid string) {
1✔
989
        v, ok := agent.servicetoPodUids[suid]
1✔
990
        if ok {
2✔
991
                for poduid := range v {
2✔
992
                        if _, ok := agent.podtoServiceUids[poduid]; ok {
2✔
993
                                delete(agent.podtoServiceUids[poduid], suid)
1✔
994
                                if len(agent.podtoServiceUids[poduid]) == 0 {
2✔
995
                                        delete(agent.podtoServiceUids, poduid)
1✔
996
                                }
1✔
997
                        }
998
                }
999
                delete(agent.servicetoPodUids, suid)
1✔
1000
        }
1001
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc