• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 11188

27 Oct 2025 11:37AM UTC coverage: 64.92% (+0.09%) from 64.828%
11188

Pull #1607

travis-pro

web-flow
Merge 2868fb55f into 178f2f987
Pull Request #1607: [mmr-6.1.1] For NodePort service, fetch pod ip from nat rule.

13369 of 20593 relevant lines covered (64.92%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.22
/pkg/hostagent/services.go
1
// Copyright 2016 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package hostagent
16

17
import (
18
        "context"
19
        "encoding/json"
20
        "fmt"
21
        "net"
22
        "os"
23
        "path/filepath"
24
        "reflect"
25
        "strings"
26

27
        "github.com/noironetworks/aci-containers/pkg/util"
28
        configv1 "github.com/openshift/api/config/v1"
29
        "github.com/sirupsen/logrus"
30
        v1 "k8s.io/api/core/v1"
31
        discovery "k8s.io/api/discovery/v1"
32
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33
        "k8s.io/apimachinery/pkg/labels"
34
        "k8s.io/apimachinery/pkg/runtime"
35
        "k8s.io/apimachinery/pkg/types"
36
        "k8s.io/apimachinery/pkg/watch"
37
        "k8s.io/client-go/kubernetes"
38
        "k8s.io/client-go/tools/cache"
39
        "k8s.io/kubernetes/pkg/controller"
40
        "sigs.k8s.io/controller-runtime/pkg/client"
41
        "sigs.k8s.io/controller-runtime/pkg/client/config"
42
)
43

44
type opflexServiceMapping struct {
45
        ServiceIp    string `json:"service-ip,omitempty"`
46
        ServiceProto string `json:"service-proto,omitempty"`
47
        ServicePort  uint16 `json:"service-port,omitempty"`
48

49
        NextHopIps            []string `json:"next-hop-ips"`
50
        TerminatingNextHopIps []string `json:"terminating-next-hop-ips"`
51
        NextHopPort           uint16   `json:"next-hop-port,omitempty"`
52

53
        Conntrack       bool                         `json:"conntrack-enabled"`
54
        ConntrackNat    bool                         `json:"conntrack-nat,omitempty"`
55
        NodePort        uint16                       `json:"node-port,omitempty"`
56
        SessionAffinity *opflexSessionAffinityConfig `json:"session-affinity,omitempty"`
57
}
58

59
type opflexService struct {
60
        Uuid string `json:"uuid"`
61

62
        DomainPolicySpace string `json:"domain-policy-space,omitempty"`
63
        DomainName        string `json:"domain-name,omitempty"`
64

65
        ServiceMode   string `json:"service-mode,omitempty"`
66
        ServiceMac    string `json:"service-mac,omitempty"`
67
        InterfaceName string `json:"interface-name,omitempty"`
68
        InterfaceIp   string `json:"interface-ip,omitempty"`
69
        InterfaceVlan uint16 `json:"interface-vlan,omitempty"`
70
        ServiceType   string `json:"service-type,omitempty"`
71

72
        ServiceMappings []opflexServiceMapping `json:"service-mapping"`
73

74
        Attributes map[string]string `json:"attributes,omitempty"`
75
}
76

77
type opflexSessionAffinityConfig struct {
78
        // clientIP contains the configurations of Client IP based session affinity.
79
        ClientIP opflexClientIPConfig `json:"client-ip,omitempty"`
80
}
81

82
// ClientIPConfig represents the configurations of Client IP based session affinity.
83
type opflexClientIPConfig struct {
84
        // timeoutSeconds specifies the seconds of ClientIP type session sticky time.
85
        // The value must be >0 && <=86400(for 1 day) if ServiceAffinity == "ClientIP".
86
        // Default value is 10800(for 3 hours).
87
        TimeoutSeconds int32 `json:"timeout-seconds,omitempty"`
88
}
89

90
// Default Session value is 10800(for 3 hours)
91
const (
92
        DefaultSessionAffinityTimer = 10800
93
        TempSessionAffinityTimer    = 1
94
)
95

96
// Name of the Openshift Service
97
const (
98
        RouterInternalDefault string = "router-internal-default"
99
)
100

101
// Namespace of Openshift Service
102
const (
103
        OpenShiftIngressNs string = "openshift-ingress"
104
)
105

106
// Represent the Openshift services
107
type opflexOcService struct {
108
        Name      string
109
        Namespace string
110
}
111

112
var Version = map[string]bool{
113
        "openshift-4.6-baremetal":              true,
114
        "openshift-4.7-baremetal":              true,
115
        "openshift-4.8-baremetal":              true,
116
        "openshift-4.9-baremetal":              true,
117
        "openshift-4.10-baremetal":             true,
118
        "openshift-4.11-baremetal":             true,
119
        "openshift-4.12-baremetal":             true,
120
        "openshift-4.13-baremetal":             true,
121
        "openshift-4.14-baremetal":             true,
122
        "openshift-4.15-baremetal":             true,
123
        "openshift-4.16-baremetal":             true,
124
        "openshift-4.17-baremetal":             true,
125
        "openshift-4.18-baremetal":             true,
126
        "openshift-4.19-baremetal":             true,
127
        "openshift-4.14-agent-based-baremetal": true,
128
        "openshift-4.15-agent-based-baremetal": true,
129
        "openshift-4.16-agent-based-baremetal": true,
130
        "openshift-4.17-agent-based-baremetal": true,
131
        "openshift-4.18-agent-based-baremetal": true,
132
        "openshift-4.19-agent-based-baremetal": true,
133
        "openshift-4.4-esx":                    true,
134
        "openshift-4.5-esx":                    true,
135
        "openshift-4.6-esx":                    true,
136
        "openshift-4.7-esx":                    true,
137
        "openshift-4.8-esx":                    true,
138
        "openshift-4.9-esx":                    true,
139
        "openshift-4.10-esx":                   true,
140
        "openshift-4.11-esx":                   true,
141
        "openshift-4.12-esx":                   true,
142
        "openshift-4.13-esx":                   true,
143
        "openshift-4.14-esx":                   true,
144
        "openshift-4.15-esx":                   true,
145
        "openshift-4.16-esx":                   true,
146
        "openshift-4.17-esx":                   true,
147
        "openshift-4.18-esx":                   true,
148
        "openshift-4.19-esx":                   true,
149
        "openshift-4.14-agent-based-esx":       true,
150
        "openshift-4.15-agent-based-esx":       true,
151
        "openshift-4.16-agent-based-esx":       true,
152
        "openshift-4.17-agent-based-esx":       true,
153
        "openshift-4.18-agent-based-esx":       true,
154
        "openshift-4.19-agent-based-esx":       true,
155
}
156

157
func (agent *HostAgent) initEndpointsInformerFromClient(
158
        kubeClient *kubernetes.Clientset) {
×
159
        agent.initEndpointsInformerBase(
×
160
                &cache.ListWatch{
×
161
                        ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
×
162
                                obj, err := kubeClient.CoreV1().Endpoints(metav1.NamespaceAll).List(context.TODO(), options)
×
163
                                if err != nil {
×
164
                                        agent.log.Fatalf("Failed to list Endpoints during initialization of EndpointsInformer: %s", err)
×
165
                                }
×
166
                                return obj, err
×
167
                        },
168
                        WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
×
169
                                obj, err := kubeClient.CoreV1().Endpoints(metav1.NamespaceAll).Watch(context.TODO(), options)
×
170
                                if err != nil {
×
171
                                        agent.log.Fatalf("Failed to watch Endpoints during initialization of EndpointsInformer: %s", err)
×
172
                                }
×
173
                                return obj, err
×
174
                        },
175
                })
176
}
177

178
func (agent *HostAgent) initEndpointsInformerBase(listWatch *cache.ListWatch) {
1✔
179
        agent.endpointsInformer = cache.NewSharedIndexInformer(
1✔
180
                listWatch,
1✔
181
                &v1.Endpoints{},
1✔
182
                controller.NoResyncPeriodFunc(),
1✔
183
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
1✔
184
        )
1✔
185
        agent.endpointsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
1✔
186
                AddFunc: func(obj interface{}) {
2✔
187
                        agent.endpointsChanged(obj)
1✔
188
                },
1✔
189
                UpdateFunc: func(_ interface{}, obj interface{}) {
×
190
                        agent.endpointsChanged(obj)
×
191
                },
×
192
                DeleteFunc: func(obj interface{}) {
×
193
                        agent.endpointsChanged(obj)
×
194
                },
×
195
        })
196
}
197

198
func (agent *HostAgent) initEndpointSliceInformerFromClient(
199
        kubeClient *kubernetes.Clientset) {
×
200
        agent.initEndpointSliceInformerBase(
×
201
                &cache.ListWatch{
×
202
                        ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
×
203
                                obj, err := kubeClient.DiscoveryV1().EndpointSlices(metav1.NamespaceAll).List(context.TODO(), options)
×
204
                                if err != nil {
×
205
                                        agent.log.Fatalf("Failed to list EndpointSlices during initialization of EndpointSliceInformer: %s", err)
×
206
                                }
×
207
                                return obj, err
×
208
                        },
209
                        WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
×
210
                                obj, err := kubeClient.DiscoveryV1().EndpointSlices(metav1.NamespaceAll).Watch(context.TODO(), options)
×
211
                                if err != nil {
×
212
                                        agent.log.Fatalf("Failed to watch EndpointSlices during initialization of EndpointSliceInformer: %s", err)
×
213
                                }
×
214
                                return obj, err
×
215
                        },
216
                })
217
}
218

219
func (agent *HostAgent) initEndpointSliceInformerBase(listWatch *cache.ListWatch) {
1✔
220
        agent.endpointSliceInformer = cache.NewSharedIndexInformer(
1✔
221
                listWatch,
1✔
222
                &discovery.EndpointSlice{},
1✔
223
                controller.NoResyncPeriodFunc(),
1✔
224
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
1✔
225
        )
1✔
226
        agent.endpointSliceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
1✔
227
                AddFunc: func(obj interface{}) {
2✔
228
                        agent.endpointSliceChanged(obj)
1✔
229
                },
1✔
230
                UpdateFunc: func(_ interface{}, obj interface{}) {
×
231
                        agent.endpointSliceChanged(obj)
×
232
                },
×
233
                DeleteFunc: func(obj interface{}) {
×
234
                        agent.endpointSliceChanged(obj)
×
235
                },
×
236
        })
237
}
238

239
func (agent *HostAgent) initServiceInformerFromClient(
240
        kubeClient *kubernetes.Clientset) {
×
241
        agent.initServiceInformerBase(
×
242
                &cache.ListWatch{
×
243
                        ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
×
244
                                obj, err := kubeClient.CoreV1().Services(metav1.NamespaceAll).List(context.TODO(), options)
×
245
                                if err != nil {
×
246
                                        agent.log.Fatalf("Failed to list Services during initialization of ServiceInformer: %s", err)
×
247
                                }
×
248
                                return obj, err
×
249
                        },
250
                        WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
×
251
                                obj, err := kubeClient.CoreV1().Services(metav1.NamespaceAll).Watch(context.TODO(), options)
×
252
                                if err != nil {
×
253
                                        agent.log.Fatalf("Failed to watch Services during initialization of ServiceInformer: %s", err)
×
254
                                }
×
255
                                return obj, err
×
256
                        },
257
                })
258
}
259

260
func (agent *HostAgent) initServiceInformerBase(listWatch *cache.ListWatch) {
1✔
261
        agent.serviceInformer = cache.NewSharedIndexInformer(
1✔
262
                listWatch,
1✔
263
                &v1.Service{},
1✔
264
                controller.NoResyncPeriodFunc(),
1✔
265
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
1✔
266
        )
1✔
267
        agent.serviceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
1✔
268
                AddFunc: func(obj interface{}) {
2✔
269
                        agent.serviceChanged(obj)
1✔
270
                },
1✔
271
                UpdateFunc: func(_ interface{}, obj interface{}) {
1✔
272
                        agent.serviceChanged(obj)
1✔
273
                },
1✔
274
                DeleteFunc: func(obj interface{}) {
1✔
275
                        agent.serviceDeleted(obj)
1✔
276
                },
1✔
277
        })
278
}
279

280
func writeAs(asfile string, as *opflexService) (bool, error) {
1✔
281
        newdata, err := json.MarshalIndent(as, "", "  ")
1✔
282
        if err != nil {
1✔
283
                return true, err
×
284
        }
×
285
        existingdata, err := os.ReadFile(asfile)
1✔
286
        if err == nil && reflect.DeepEqual(existingdata, newdata) {
2✔
287
                return false, nil
1✔
288
        }
1✔
289

290
        err = os.WriteFile(asfile, newdata, 0644)
1✔
291
        return true, err
1✔
292
}
293

294
func serviceLogger(log *logrus.Logger, as *v1.Service) *logrus.Entry {
×
295
        return log.WithFields(logrus.Fields{
×
296
                "namespace": as.ObjectMeta.Namespace,
×
297
                "name":      as.ObjectMeta.Name,
×
298
                "type":      as.Spec.Type,
×
299
        })
×
300
}
×
301

302
func opflexServiceLogger(log *logrus.Logger, as *opflexService) *logrus.Entry {
1✔
303
        return log.WithFields(logrus.Fields{
1✔
304
                "namespace": as.Attributes["namespace"],
1✔
305
                "name":      as.Attributes["name"],
1✔
306
                "uuid":      as.Uuid,
1✔
307
                "tenant":    as.DomainPolicySpace,
1✔
308
                "vrf":       as.DomainName,
1✔
309
        })
1✔
310
}
1✔
311

312
func (agent *HostAgent) syncServices() bool {
1✔
313
        if !agent.syncEnabled {
1✔
314
                return false
×
315
        }
×
316

317
        agent.log.Debug("Syncing services")
1✔
318
        agent.indexMutex.Lock()
1✔
319
        opflexServices := make(map[string]*opflexService)
1✔
320
        for k, v := range agent.opflexServices {
2✔
321
                val := &opflexService{}
1✔
322
                err := util.DeepCopyObj(v, val)
1✔
323
                if err != nil {
1✔
324
                        continue
×
325
                }
326
                opflexServices[k] = val
1✔
327
        }
328
        agent.indexMutex.Unlock()
1✔
329

1✔
330
        files, err := os.ReadDir(agent.config.OpFlexServiceDir)
1✔
331
        if err != nil {
1✔
332
                agent.log.WithFields(
×
333
                        logrus.Fields{"serviceDir": agent.config.OpFlexServiceDir},
×
334
                ).Error("Could not read directory " + err.Error())
×
335
                return true
×
336
        }
×
337
        seen := make(map[string]bool)
1✔
338
        for _, f := range files {
2✔
339
                uuid := f.Name()
1✔
340
                if strings.HasSuffix(uuid, ".as") {
1✔
341
                        uuid = uuid[:len(uuid)-3]
×
342
                } else if strings.HasSuffix(f.Name(), ".service") {
2✔
343
                        uuid = uuid[:len(uuid)-8]
1✔
344
                } else {
2✔
345
                        continue
1✔
346
                }
347

348
                asfile := filepath.Join(agent.config.OpFlexServiceDir, f.Name())
1✔
349
                logger := agent.log.WithFields(
1✔
350
                        logrus.Fields{"Uuid": uuid},
1✔
351
                )
1✔
352
                existing, ok := opflexServices[uuid]
1✔
353
                if ok {
2✔
354
                        wrote, err := writeAs(asfile, existing)
1✔
355
                        if err != nil {
1✔
356
                                opflexServiceLogger(agent.log, existing).
×
357
                                        Error("Error writing service file: ", err)
×
358
                        } else if wrote {
2✔
359
                                opflexServiceLogger(agent.log, existing).Info("Updated service")
1✔
360
                        }
1✔
361
                        seen[uuid] = true
1✔
362
                } else {
1✔
363
                        logger.Info("Removing service")
1✔
364
                        os.Remove(asfile)
1✔
365
                }
1✔
366
        }
367

368
        for _, as := range opflexServices {
2✔
369
                if seen[as.Uuid] {
2✔
370
                        continue
1✔
371
                }
372

373
                opflexServiceLogger(agent.log, as).Info("Adding service")
1✔
374
                asfile :=
1✔
375
                        filepath.Join(agent.config.OpFlexServiceDir, as.Uuid+".service")
1✔
376
                _, err = writeAs(asfile, as)
1✔
377
                if err != nil {
1✔
378
                        opflexServiceLogger(agent.log, as).
×
379
                                Error("Error writing service file: ", err)
×
380
                }
×
381
        }
382

383
        agent.log.Debug("Finished service sync")
1✔
384
        return false
1✔
385
}
386

387
// Must have index lock
388
func (agent *HostAgent) updateServiceDesc(external bool, as *v1.Service, key string) bool {
1✔
389
        if as.Spec.ClusterIP == "None" {
1✔
390
                agent.log.Debugf("ClusterIP of service %s is set to None", as.ObjectMeta.Name)
×
391
                return true
×
392
        }
×
393

394
        ofas := &opflexService{
1✔
395
                Uuid:              string(as.ObjectMeta.UID),
1✔
396
                DomainPolicySpace: agent.config.AciVrfTenant,
1✔
397
                DomainName:        agent.config.AciVrf,
1✔
398
                ServiceMode:       "loadbalancer",
1✔
399
                ServiceMappings:   make([]opflexServiceMapping, 0),
1✔
400
        }
1✔
401
        switch as.Spec.Type {
1✔
402
        case v1.ServiceTypeClusterIP:
×
403
                ofas.ServiceType = "clusterIp"
×
404
        case v1.ServiceTypeNodePort:
×
405
                ofas.ServiceType = "nodePort"
×
406
        case v1.ServiceTypeLoadBalancer:
1✔
407
                ofas.ServiceType = "loadBalancer"
1✔
408
        case v1.ServiceTypeExternalName:
×
409
                ofas.ServiceType = "externalName"
×
410
        }
411

412
        if external {
2✔
413
                if agent.config.UplinkIface == "" ||
1✔
414
                        agent.serviceEp.Ipv4 == nil ||
1✔
415
                        agent.serviceEp.Mac == "" {
2✔
416
                        return false
1✔
417
                }
1✔
418

419
                ofas.InterfaceName = agent.config.UplinkIface
1✔
420
                ofas.InterfaceVlan = uint16(agent.config.ServiceVlan)
1✔
421
                // Directly using the Uplink MacAdress instead of using Opflex injected mac
1✔
422
                ofas.ServiceMac = agent.config.UplinkMacAdress
1✔
423
                ofas.InterfaceIp = agent.serviceEp.Ipv4.String()
1✔
424
                ofas.Uuid += "-external"
1✔
425
        }
426
        hasValidMapping := false
1✔
427
        for ix := range as.Spec.Ports {
2✔
428
                hasValidMapping = agent.serviceEndPoints.SetOpflexService(ofas, as, external, key, &as.Spec.Ports[ix])
1✔
429
        }
1✔
430

431
        id := fmt.Sprintf("%s_%s", as.ObjectMeta.Namespace, as.ObjectMeta.Name)
1✔
432
        ofas.Attributes = as.ObjectMeta.Labels
1✔
433
        if ofas.Attributes == nil {
1✔
434
                ofas.Attributes = make(map[string]string)
×
435
        }
×
436
        ofas.Attributes["namespace"] = as.ObjectMeta.Namespace
1✔
437
        ofas.Attributes["name"] = as.ObjectMeta.Name
1✔
438
        ofas.Attributes["service-name"] = id
1✔
439

1✔
440
        existing, ok := agent.opflexServices[ofas.Uuid]
1✔
441
        if hasValidMapping {
2✔
442
                if (ok && !reflect.DeepEqual(existing, ofas)) || !ok {
2✔
443
                        agent.opflexServices[ofas.Uuid] = ofas
1✔
444
                        // Check matching oc serivce and create a extra service file.
1✔
445
                        // This Change is specfic to Openshfit domain
1✔
446
                        agent.setOpenShfitService(as, external, ofas)
1✔
447
                }
1✔
448
                return true
1✔
449
        }
450
        if ok {
1✔
451
                delete(agent.opflexServices, ofas.Uuid)
×
452
                return true
×
453
        }
×
454

455
        return false
1✔
456
}
457

458
// must have index lock
459
func (agent *HostAgent) doUpdateService(key string) {
1✔
460
        asobj, exists, err := agent.serviceInformer.GetStore().GetByKey(key)
1✔
461
        if err != nil {
1✔
462
                agent.log.Error("Could not lookup service for " +
×
463
                        key + ": " + err.Error())
×
464
                return
×
465
        }
×
466
        if !exists || asobj == nil {
2✔
467
                return
1✔
468
        }
1✔
469
        as := asobj.(*v1.Service)
1✔
470
        doSync := false
1✔
471
        doSync = agent.updateServiceDesc(false, as, key) || doSync
1✔
472
        doSync = agent.updateServiceDesc(true, as, key) || doSync
1✔
473
        if doSync {
2✔
474
                agent.scheduleSyncServices()
1✔
475
                agent.updateEpFileWithClusterIp(as, false)
1✔
476
                agent.scheduleSyncEps()
1✔
477
        }
1✔
478
}
479

480
func (agent *HostAgent) endpointsChanged(obj interface{}) {
1✔
481
        agent.indexMutex.Lock()
1✔
482
        defer agent.indexMutex.Unlock()
1✔
483

1✔
484
        endpoints := obj.(*v1.Endpoints)
1✔
485
        agent.log.Debugf("Endpoint changed: name=%s namespace=%s",
1✔
486
                endpoints.ObjectMeta.Name, endpoints.ObjectMeta.Namespace)
1✔
487

1✔
488
        key, err := cache.MetaNamespaceKeyFunc(endpoints)
1✔
489
        if err != nil {
1✔
490
                agent.log.Error("Could not create key:" + err.Error())
×
491
                return
×
492
        }
×
493
        agent.doUpdateService(key)
1✔
494
}
495
func getServiceKey(endPointSlice *discovery.EndpointSlice) (string, bool) {
1✔
496
        serviceName, ok := endPointSlice.Labels[discovery.LabelServiceName]
1✔
497
        if !ok {
1✔
498
                return "", false
×
499
        }
×
500
        return endPointSlice.ObjectMeta.Namespace + "/" + serviceName, true
1✔
501
}
502

503
func (agent *HostAgent) endpointSliceChanged(obj interface{}) {
1✔
504
        agent.indexMutex.Lock()
1✔
505
        defer agent.indexMutex.Unlock()
1✔
506
        endpointslice := obj.(*discovery.EndpointSlice)
1✔
507
        agent.log.Debugf("endpointslice changed: name=%s namespace=%s",
1✔
508
                endpointslice.ObjectMeta.Name, endpointslice.ObjectMeta.Namespace)
1✔
509
        servicekey, ok := getServiceKey(endpointslice)
1✔
510
        if !ok {
1✔
511
                return
×
512
        }
×
513
        agent.doUpdateService(servicekey)
1✔
514
}
515

516
func (agent *HostAgent) serviceChanged(obj interface{}) {
1✔
517
        agent.indexMutex.Lock()
1✔
518
        defer agent.indexMutex.Unlock()
1✔
519

1✔
520
        as := obj.(*v1.Service)
1✔
521
        agent.log.Debugf("Service changed: name=%s namespace=%s",
1✔
522
                as.ObjectMeta.Name, as.ObjectMeta.Namespace)
1✔
523

1✔
524
        key, err := cache.MetaNamespaceKeyFunc(as)
1✔
525
        if err != nil {
1✔
526
                serviceLogger(agent.log, as).
×
527
                        Error("Could not create service object key:" + err.Error())
×
528
                return
×
529
        }
×
530
        agent.doUpdateService(key)
1✔
531
        agent.handleObjectUpdateForSnat(obj)
1✔
532
}
533

534
func (agent *HostAgent) serviceDeleted(obj interface{}) {
1✔
535
        agent.indexMutex.Lock()
1✔
536
        defer agent.indexMutex.Unlock()
1✔
537

1✔
538
        as, isService := obj.(*v1.Service)
1✔
539
        if !isService {
1✔
540
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
541
                if !ok {
×
542
                        agent.log.Error("Received unexpected object: ", obj)
×
543
                        return
×
544
                }
×
545
                as, ok = deletedState.Obj.(*v1.Service)
×
546
                if !ok {
×
547
                        agent.log.Error("DeletedFinalStateUnknown contained non-Services object: ", deletedState.Obj)
×
548
                        return
×
549
                }
×
550
        }
551
        agent.log.Debugf("Service deleted: name=%s namespace=%s",
1✔
552
                as.ObjectMeta.Name, as.ObjectMeta.Namespace)
1✔
553

1✔
554
        u := string(as.ObjectMeta.UID)
1✔
555
        if _, ok := agent.opflexServices[u]; ok {
2✔
556
                delete(agent.opflexServices, u)
1✔
557
                delete(agent.opflexServices, u+"-external")
1✔
558
                for _, v := range agent.ocServices {
2✔
559
                        if v.Name == as.ObjectMeta.Name &&
1✔
560
                                v.Namespace == as.ObjectMeta.Namespace {
1✔
561
                                delete(agent.opflexServices, u+"-"+v.Name)
×
562
                        }
×
563
                }
564
                agent.scheduleSyncServices()
1✔
565
                agent.deleteServIpFromEp(u)
1✔
566
                agent.scheduleSyncEps()
1✔
567
        }
568
        agent.handleObjectDeleteForSnat(obj)
1✔
569
}
570

571
func (agent *HostAgent) updateAllServices() {
1✔
572
        if agent.serviceInformer == nil {
1✔
573
                return
×
574
        }
×
575
        store := agent.serviceInformer.GetStore()
1✔
576
        if store == nil {
1✔
577
                return
×
578
        }
×
579
        keys := agent.serviceInformer.GetStore().ListKeys()
1✔
580
        if keys == nil {
1✔
581
                return
×
582
        }
×
583

584
        agent.indexMutex.Lock()
1✔
585
        defer agent.indexMutex.Unlock()
1✔
586
        for _, key := range keys {
1✔
587
                agent.doUpdateService(key)
×
588
        }
×
589
}
590

591
// This API is get the OpenShift InfrastructreIp's
592
func (agent *HostAgent) getInfrastucreIp(serviceName string) string {
×
593
        if _, ok := Version[agent.config.Flavor]; ok {
×
594
                if serviceName == RouterInternalDefault {
×
595
                        return agent.config.InstallerProvlbIp
×
596
                }
×
597
                return ""
×
598
        }
599
        infraStructureInfo := &configv1.Infrastructure{
×
600
                TypeMeta:   metav1.TypeMeta{APIVersion: configv1.GroupVersion.String(), Kind: "Infrastructure"},
×
601
                ObjectMeta: metav1.ObjectMeta{Name: "cluster"},
×
602
        }
×
603
        cfg, _ := config.GetConfig()
×
604
        scheme := runtime.NewScheme()
×
605
        scheme.AddKnownTypes(configv1.SchemeGroupVersion, &configv1.Infrastructure{})
×
606
        scheme.AddKnownTypes(configv1.SchemeGroupVersion, &metav1.GetOptions{})
×
607
        rclient, err := client.New(cfg, client.Options{Scheme: scheme})
×
608
        if err != nil {
×
609
                agent.log.Error(err.Error())
×
610
                return ""
×
611
        }
×
612
        if rclient == nil {
×
613
                agent.log.Error("client is nil")
×
614
                return ""
×
615
        }
×
616
        err = rclient.Get(context.TODO(), types.NamespacedName{
×
617
                Name: "cluster"}, infraStructureInfo)
×
618
        if err != nil {
×
619
                agent.log.Error(err.Error())
×
620
                return ""
×
621
        }
×
622
        if infraStructureInfo.Status.Platform == configv1.OpenStackPlatformType {
×
623
                if infraStructureInfo.Status.PlatformStatus != nil &&
×
624
                        infraStructureInfo.Status.PlatformStatus.OpenStack != nil {
×
625
                        if serviceName == RouterInternalDefault {
×
626
                                return infraStructureInfo.Status.PlatformStatus.OpenStack.IngressIP
×
627
                        }
×
628
                }
629
        }
630
        return ""
×
631
}
632

633
func (agent *HostAgent) setOpenShfitService(as *v1.Service, external bool, ofas *opflexService) {
1✔
634
        if agent.config.AciVmmDomainType == "OpenShift" {
1✔
635
                if !external {
×
636
                        for _, v := range agent.ocServices {
×
637
                                // Check for Namespace is equal
×
638
                                if v.Namespace != as.ObjectMeta.Namespace {
×
639
                                        continue
×
640
                                }
641
                                // Check Service Name is equal
642
                                if v.Name != as.ObjectMeta.Name {
×
643
                                        continue
×
644
                                }
645
                                InfraIp := agent.getInfrastucreIp(as.ObjectMeta.Name)
×
646
                                agent.log.Debug("InfraIp: ", InfraIp)
×
647
                                if InfraIp == "" {
×
648
                                        continue
×
649
                                }
650
                                ocas := &opflexService{
×
651
                                        Uuid:              string(as.ObjectMeta.UID),
×
652
                                        DomainPolicySpace: agent.config.AciVrfTenant,
×
653
                                        DomainName:        agent.config.AciVrf,
×
654
                                        ServiceMode:       "loadbalancer",
×
655
                                        ServiceMappings:   make([]opflexServiceMapping, 0),
×
656
                                }
×
657
                                ocas.Uuid = ocas.Uuid + "-" + as.ObjectMeta.Name
×
658
                                for _, val := range ofas.ServiceMappings {
×
659
                                        val.ServiceIp = InfraIp
×
660
                                        ocas.ServiceMappings = append(ocas.ServiceMappings, val)
×
661
                                }
×
662
                                ocas.ServiceType = ofas.ServiceType
×
663
                                ocas.Attributes = ofas.Attributes
×
664
                                agent.opflexServices[ocas.Uuid] = ocas
×
665
                        }
666
                }
667
        }
668
}
669

670
func (sep *serviceEndpoint) SetOpflexService(ofas *opflexService, as *v1.Service,
671
        external bool, key string, sp *v1.ServicePort) bool {
1✔
672
        agent := sep.agent
1✔
673
        endpointsobj, exists, err :=
1✔
674
                agent.endpointsInformer.GetStore().GetByKey(key)
1✔
675
        if err != nil {
1✔
676
                agent.log.Error("Could not lookup endpoints for " +
×
677
                        key + ": " + err.Error())
×
678
                return false
×
679
        }
×
680
        if !exists || endpointsobj == nil {
1✔
681
                agent.log.Debugf("no endpoints for service %s/%s", as.Namespace, as.Name)
×
682
                return false
×
683
        }
×
684
        endpoints := endpointsobj.(*v1.Endpoints)
1✔
685
        hasValidMapping := false
1✔
686

1✔
687
        type void struct{}
1✔
688
        var ipexists void
1✔
689
        clusterIPs := make(map[string]void)
1✔
690
        clusterIPs[as.Spec.ClusterIP] = ipexists
1✔
691
        clusterIPsField := reflect.ValueOf(as.Spec).FieldByName("ClusterIPs")
1✔
692
        if clusterIPsField.IsValid() {
2✔
693
                for _, ip := range as.Spec.ClusterIPs {
1✔
694
                        clusterIPs[ip] = ipexists
×
695
                }
×
696
        }
697

698
        for clusterIP := range clusterIPs {
2✔
699
                for _, e := range endpoints.Subsets {
2✔
700
                        if len(e.Addresses) == 0 {
1✔
701
                                continue
×
702
                        }
703
                        parsedClusterIp := net.ParseIP(clusterIP)
1✔
704
                        parsedPodIp := net.ParseIP(e.Addresses[0].IP)
1✔
705

1✔
706
                        if parsedClusterIp == nil || parsedPodIp == nil {
1✔
707
                                agent.log.Info("Not a valid IP address..", parsedClusterIp, parsedPodIp)
×
708
                                continue
×
709
                        }
710
                        if parsedClusterIp.To4() != nil && parsedPodIp.To4() != nil {
2✔
711
                                agent.log.Info("Both are IPv4 addresses..", parsedClusterIp, parsedPodIp, "Adding to map..")
1✔
712
                        } else if parsedClusterIp.To4() == nil && parsedPodIp.To4() == nil {
3✔
713
                                agent.log.Info("Both are IPv6 addresses..", parsedClusterIp, parsedPodIp, "Adding to map..")
1✔
714
                        } else {
1✔
715
                                continue
×
716
                        }
717
                        for _, p := range e.Ports {
2✔
718
                                if p.Protocol != sp.Protocol {
1✔
719
                                        continue
×
720
                                }
721
                                if p.Name != sp.Name {
1✔
722
                                        continue
×
723
                                }
724

725
                                sm := &opflexServiceMapping{
1✔
726
                                        ServicePort:  uint16(sp.Port),
1✔
727
                                        ServiceProto: strings.ToLower(string(sp.Protocol)),
1✔
728
                                        NextHopIps:   make([]string, 0),
1✔
729
                                        NextHopPort:  uint16(p.Port),
1✔
730
                                        Conntrack:    true,
1✔
731
                                        NodePort:     uint16(sp.NodePort),
1✔
732
                                }
1✔
733

1✔
734
                                if external {
2✔
735
                                        if as.Spec.Type == v1.ServiceTypeLoadBalancer &&
1✔
736
                                                len(as.Status.LoadBalancer.Ingress) > 0 {
2✔
737
                                                for _, ip := range as.Status.LoadBalancer.Ingress {
2✔
738
                                                        LBIp := net.ParseIP(ip.IP)
1✔
739
                                                        if (LBIp.To4() != nil) == (parsedPodIp.To4() != nil) {
2✔
740
                                                                sm.ServiceIp = ip.IP
1✔
741
                                                                break
1✔
742
                                                        }
743
                                                }
744
                                        }
745
                                } else {
1✔
746
                                        sm.ServiceIp = clusterIP
1✔
747
                                }
1✔
748
                                sm.setServiceAffinityConfig(as)
1✔
749
                                for _, a := range e.Addresses {
2✔
750
                                        if !external ||
1✔
751
                                                (a.NodeName != nil && *a.NodeName == agent.config.NodeName) {
2✔
752
                                                sm.NextHopIps = append(sm.NextHopIps, a.IP)
1✔
753
                                        }
1✔
754
                                }
755
                                if sm.ServiceIp != "" && len(sm.NextHopIps) > 0 {
2✔
756
                                        hasValidMapping = true
1✔
757
                                }
1✔
758

759
                                ofas.ServiceMappings = append(ofas.ServiceMappings, *sm)
1✔
760
                        }
761
                }
762
        }
763
        return hasValidMapping
1✔
764
}
765

766
func (sm *opflexServiceMapping) setServiceAffinityConfig(as *v1.Service) {
1✔
767
        if as.Spec.SessionAffinity == "ClientIP" {
2✔
768
                config := as.Spec.SessionAffinityConfig
1✔
769
                if config != nil && config.ClientIP != nil && config.ClientIP.TimeoutSeconds != nil {
2✔
770
                        sm.SessionAffinity = &opflexSessionAffinityConfig{ClientIP: opflexClientIPConfig{TimeoutSeconds: *config.ClientIP.TimeoutSeconds}}
1✔
771
                } else {
1✔
772
                        sm.SessionAffinity = &opflexSessionAffinityConfig{ClientIP: opflexClientIPConfig{TimeoutSeconds: DefaultSessionAffinityTimer}}
×
773
                }
×
774
        } else {
×
775
                sm.SessionAffinity = &opflexSessionAffinityConfig{ClientIP: opflexClientIPConfig{TimeoutSeconds: TempSessionAffinityTimer}}
×
776
        }
×
777
}
778

779
func (seps *serviceEndpointSlice) addEmptyConntrackNatServiceMapping(ofsm *[]opflexServiceMapping, clusterIP string, sp *v1.ServicePort, as *v1.Service) {
×
780
        seps.agent.log.Infof("Service %s:%s has resilient hashing enabled", as.ObjectMeta.Namespace, as.ObjectMeta.Name)
×
781
        seps.agent.log.Infof("Adding empty service mapping for %s:%d for service %s:%s", clusterIP, sp.Port, as.ObjectMeta.Namespace, as.ObjectMeta.Name)
×
782
        sm := &opflexServiceMapping{
×
783
                ServiceIp:             clusterIP,
×
784
                ServicePort:           uint16(sp.Port),
×
785
                ServiceProto:          strings.ToLower(string(sp.Protocol)),
×
786
                NextHopIps:            make([]string, 0),
×
787
                TerminatingNextHopIps: make([]string, 0),
×
788
                Conntrack:             true,
×
789
                ConntrackNat:          true,
×
790
                NodePort:              uint16(sp.NodePort),
×
791
        }
×
792
        sm.setServiceAffinityConfig(as)
×
793
        *ofsm = append(*ofsm, *sm)
×
794
}
×
795

796
func (seps *serviceEndpointSlice) SetOpflexService(ofas *opflexService, as *v1.Service,
797
        external bool, key string, sp *v1.ServicePort) bool {
1✔
798
        agent := seps.agent
1✔
799
        hasValidMapping := false
1✔
800
        var endpointSlices []*discovery.EndpointSlice
1✔
801
        label := map[string]string{discovery.LabelServiceName: as.ObjectMeta.Name}
1✔
802
        selector := labels.SelectorFromSet(labels.Set(label))
1✔
803
        cache.ListAllByNamespace(agent.endpointSliceInformer.GetIndexer(), as.ObjectMeta.Namespace, selector,
1✔
804
                func(endpointSliceobj interface{}) {
2✔
805
                        endpointSlices = append(endpointSlices, endpointSliceobj.(*discovery.EndpointSlice))
1✔
806
                })
1✔
807

808
        type void struct{}
1✔
809
        var ipexists void
1✔
810
        clusterIPs := make(map[string]void)
1✔
811
        clusterIPs[as.Spec.ClusterIP] = ipexists
1✔
812
        clusterIPsField := reflect.ValueOf(as.Spec).FieldByName("ClusterIPs")
1✔
813
        if clusterIPsField.IsValid() {
2✔
814
                for _, ip := range as.Spec.ClusterIPs {
1✔
815
                        clusterIPs[ip] = ipexists
×
816
                }
×
817
        }
818

819
        for clusterIP := range clusterIPs {
2✔
820
                emtpyService := true
1✔
821
                for _, endpointSlice := range endpointSlices {
2✔
822
                        if !(len(endpointSlice.Endpoints) > 0 && len(endpointSlice.Endpoints[0].Addresses) > 0) {
1✔
823
                                continue
×
824
                        }
825
                        parsedClusterIp := net.ParseIP(clusterIP)
1✔
826
                        parsedPodIp := net.ParseIP(endpointSlice.Endpoints[0].Addresses[0])
1✔
827
                        if parsedClusterIp == nil || parsedPodIp == nil {
1✔
828
                                agent.log.Info("Not a valid IP address..", parsedClusterIp, parsedPodIp)
×
829
                                continue
×
830
                        }
831
                        if parsedClusterIp.To4() != nil && parsedPodIp.To4() != nil {
2✔
832
                                agent.log.Info("Both are IPv4 addresses..", parsedClusterIp, parsedPodIp, "Adding to map..")
1✔
833
                        } else if parsedClusterIp.To4() == nil && parsedPodIp.To4() == nil {
3✔
834
                                agent.log.Info("Both are IPv6 addresses..", parsedClusterIp, parsedPodIp, "Adding to map..")
1✔
835
                        } else {
1✔
836
                                continue
×
837
                        }
838
                        for _, p := range endpointSlice.Ports {
2✔
839
                                if p.Protocol != nil && *p.Protocol != sp.Protocol {
1✔
840
                                        continue
×
841
                                }
842

843
                                if p.Name != nil && *p.Name != sp.Name {
1✔
844
                                        continue
×
845
                                }
846

847
                                sm := &opflexServiceMapping{
1✔
848
                                        ServicePort:           uint16(sp.Port),
1✔
849
                                        ServiceProto:          strings.ToLower(string(sp.Protocol)),
1✔
850
                                        NextHopIps:            make([]string, 0),
1✔
851
                                        TerminatingNextHopIps: make([]string, 0),
1✔
852
                                        NextHopPort:           uint16(*p.Port),
1✔
853
                                        Conntrack:             true,
1✔
854
                                        NodePort:              uint16(sp.NodePort),
1✔
855
                                }
1✔
856

1✔
857
                                if external {
2✔
858
                                        if as.Spec.Type == v1.ServiceTypeLoadBalancer &&
1✔
859
                                                len(as.Status.LoadBalancer.Ingress) > 0 {
2✔
860
                                                for _, ip := range as.Status.LoadBalancer.Ingress {
2✔
861
                                                        LBIp := net.ParseIP(ip.IP)
1✔
862
                                                        if (LBIp.To4() != nil) == (parsedPodIp.To4() != nil) {
2✔
863
                                                                sm.ServiceIp = ip.IP
1✔
864
                                                                break
1✔
865
                                                        }
866
                                                }
867
                                        }
868
                                } else {
1✔
869
                                        sm.ServiceIp = clusterIP
1✔
870
                                }
1✔
871
                                nexthops := make(map[string][]string)
1✔
872
                                terminatingnexthops := make(map[string][]string)
1✔
873
                                var nodeZone string
1✔
874
                                for _, e := range endpointSlice.Endpoints {
2✔
875
                                        for _, a := range e.Addresses {
2✔
876
                                                if !external || (e.NodeName != nil && *e.NodeName == agent.config.NodeName) {
2✔
877
                                                        obj, exists, err := agent.nodeInformer.GetStore().GetByKey(agent.config.NodeName)
1✔
878
                                                        if err != nil {
1✔
879
                                                                agent.log.Error("Could not lookup node: ", err)
×
880
                                                                continue
×
881
                                                        }
882
                                                        if !exists && obj == nil {
1✔
883
                                                                agent.log.Error("Object nil")
×
884
                                                                continue
×
885
                                                        }
886
                                                        node := obj.(*v1.Node)
1✔
887
                                                        // Services need an annotation to inform the
1✔
888
                                                        // endpointslice controller to add hints
1✔
889
                                                        // Currently-1.22 only zones are used as hints.
1✔
890
                                                        // https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/2433-topology-aware-hints
1✔
891
                                                        var hintsEnabled = false
1✔
892
                                                        if val1, ok1 := as.ObjectMeta.Annotations["service.kubernetes.io/topology-aware-routing"]; ok1 && (strings.ToLower(val1) == "auto") {
1✔
893
                                                                hintsEnabled = true
×
894
                                                        }
×
895
                                                        // annotation deprecated in 1.27 but still supported
896
                                                        if val2, ok2 := as.ObjectMeta.Annotations["service.kubernetes.io/topology-aware-hints"]; ok2 && (strings.ToLower(val2) == "auto") {
1✔
897
                                                                hintsEnabled = true
×
898
                                                        }
×
899
                                                        // annotation as of 1.27
900
                                                        if val3, ok3 := as.ObjectMeta.Annotations[v1.AnnotationTopologyMode]; ok3 && (strings.ToLower(val3) == "auto") {
2✔
901
                                                                hintsEnabled = true
1✔
902
                                                        }
1✔
903
                                                        zone, zoneOk := node.ObjectMeta.Labels[v1.LabelTopologyZone]
1✔
904
                                                        nodeZone = zone
1✔
905
                                                        if !external && zoneOk && hintsEnabled && e.Hints != nil {
2✔
906
                                                                for _, hintZone := range e.Hints.ForZones {
2✔
907
                                                                        if nodeZone == hintZone.Name {
2✔
908
                                                                                if *e.Conditions.Ready {
2✔
909
                                                                                        nexthops["topologyawarehints"] =
1✔
910
                                                                                                append(nexthops["topologyawarehints"], a)
1✔
911
                                                                                } else if as.Spec.Type == v1.ServiceTypeClusterIP && e.Conditions.Terminating != nil && *e.Conditions.Terminating {
1✔
912
                                                                                        terminatingnexthops["topologyawarehints"] =
×
913
                                                                                                append(terminatingnexthops["topologyawarehints"], a)
×
914
                                                                                }
×
915
                                                                        }
916
                                                                }
917
                                                        } else {
1✔
918
                                                                if *e.Conditions.Ready {
2✔
919
                                                                        nexthops["any"] = append(nexthops["any"], a)
1✔
920
                                                                } else if as.Spec.Type == v1.ServiceTypeClusterIP && e.Conditions.Terminating != nil && *e.Conditions.Terminating {
1✔
921
                                                                        terminatingnexthops["any"] =
×
922
                                                                                append(terminatingnexthops["any"], a)
×
923
                                                                }
×
924
                                                        }
925
                                                }
926
                                        }
927
                                }
928
                                // Select the high priority keys as datapath doesn't have support for fallback
929
                                if _, ok := nexthops["topologyawarehints"]; ok {
2✔
930
                                        sm.NextHopIps = append(sm.NextHopIps, nexthops["topologyawarehints"]...)
1✔
931
                                        sm.TerminatingNextHopIps = append(sm.TerminatingNextHopIps, terminatingnexthops["topologyawarehints"]...)
1✔
932
                                        agent.log.Info("NextHopIps are", sm.NextHopIps)
1✔
933
                                        agent.log.Info("TerminatingNextHopIps are", sm.TerminatingNextHopIps)
1✔
934
                                        agent.log.Debugf("Topology matching hint: %s Nexthops: %s", nodeZone, sm.NextHopIps)
1✔
935
                                } else {
2✔
936
                                        sm.NextHopIps = append(sm.NextHopIps, nexthops["any"]...)
1✔
937
                                        sm.TerminatingNextHopIps = append(sm.TerminatingNextHopIps, terminatingnexthops["any"]...)
1✔
938
                                        agent.log.Info("NextHopIps are", sm.NextHopIps)
1✔
939
                                        agent.log.Info("TerminatingNextHopIps are", sm.TerminatingNextHopIps)
1✔
940
                                }
1✔
941
                                sm.setServiceAffinityConfig(as)
1✔
942
                                if !seps.agent.config.DisableOpflexResilientHashing && as.Spec.Type == v1.ServiceTypeClusterIP && as.Spec.SessionAffinity == "ClientIP" {
1✔
943
                                        sm.ConntrackNat = true
×
944
                                        agent.log.Infof("Service %s:%s has resilient hashing enabled", as.ObjectMeta.Namespace, as.ObjectMeta.Name)
×
945
                                }
×
946
                                // Mark the mapping as valid to prevent opflex service deletion, even if the
947
                                // service has only terminating endpoints. This is necessary to support graceful
948
                                // connection termination and sending connection resets when resilient hashing
949
                                // is enabled.
950
                                if sm.ServiceIp != "" &&
1✔
951
                                        (len(sm.NextHopIps) > 0 || (sm.ConntrackNat && len(sm.TerminatingNextHopIps) > 0)) {
2✔
952
                                        hasValidMapping = true
1✔
953
                                }
1✔
954
                                ofas.ServiceMappings = append(ofas.ServiceMappings, *sm)
1✔
955
                                emtpyService = false
1✔
956
                        }
957
                }
958
                // For service with resilient hashing, add an empty mapping to support sending
959
                // RST / ICMP Unreachable messages to clients when there are no endpoints.
960
                if emtpyService &&
1✔
961
                        !external &&
1✔
962
                        !seps.agent.config.DisableOpflexResilientHashing &&
1✔
963
                        as.Spec.Type == v1.ServiceTypeClusterIP &&
1✔
964
                        as.Spec.SessionAffinity == "ClientIP" {
1✔
965
                        seps.addEmptyConntrackNatServiceMapping(&ofas.ServiceMappings, clusterIP, sp, as)
×
966
                        hasValidMapping = true
×
967
                }
×
968
        }
969
        return hasValidMapping
1✔
970
}
971

972
func (agent *HostAgent) updateEpFileWithClusterIp(as *v1.Service, deleted bool) {
1✔
973
        suid := string(as.ObjectMeta.UID)
1✔
974
        ofas, ok := agent.opflexServices[suid]
1✔
975
        var dummy struct{}
1✔
976
        if ok {
2✔
977
                if as.Spec.ClusterIP == "None" {
1✔
978
                        return
×
979
                }
×
980
                podKeys := agent.getPodKeysFromSm(ofas.ServiceMappings)
1✔
981
                current := make(map[string]struct{})
1✔
982
                for _, key := range podKeys {
2✔
983
                        obj, exists, err := agent.podInformer.GetStore().GetByKey(key)
1✔
984
                        if err == nil && exists && (obj != nil) {
2✔
985
                                pod := obj.(*v1.Pod)
1✔
986
                                if agent.config.NodeName != pod.Spec.NodeName {
1✔
987
                                        continue
×
988
                                }
989
                                poduid := string(pod.ObjectMeta.UID)
1✔
990
                                if _, sok := agent.servicetoPodUids[suid]; !sok {
2✔
991
                                        agent.servicetoPodUids[suid] = make(map[string]struct{})
1✔
992
                                }
1✔
993
                                agent.servicetoPodUids[suid][poduid] = dummy
1✔
994
                                if _, podok := agent.podtoServiceUids[poduid]; !podok {
2✔
995
                                        agent.podtoServiceUids[poduid] = make(map[string][]string)
1✔
996
                                }
1✔
997

998
                                type void struct{}
1✔
999
                                var ipexists void
1✔
1000
                                clusterIPs := make(map[string]void)
1✔
1001
                                clusterIPs[as.Spec.ClusterIP] = ipexists
1✔
1002
                                clusterIPsField := reflect.ValueOf(as.Spec).FieldByName("ClusterIPs")
1✔
1003
                                if clusterIPsField.IsValid() {
2✔
1004
                                        for _, ip := range as.Spec.ClusterIPs {
1✔
1005
                                                clusterIPs[ip] = ipexists
×
1006
                                        }
×
1007
                                }
1008

1009
                                var listClusterIPs []string
1✔
1010
                                for ip := range clusterIPs {
2✔
1011
                                        listClusterIPs = append(listClusterIPs, ip)
1✔
1012
                                }
1✔
1013
                                agent.podtoServiceUids[poduid][suid] = listClusterIPs
1✔
1014
                                agent.log.Info("EpUpdated: ", poduid, " with ClusterIp: ", listClusterIPs)
1✔
1015
                                current[poduid] = dummy
1✔
1016
                        }
1017
                }
1018
                // reconcile with the current pods matching the service
1019
                // if there is any stale info remove that from service matching the pods
1020
                // update the revese map for the pod to service
1021
                poduids := agent.servicetoPodUids[suid]
1✔
1022
                for id := range poduids {
2✔
1023
                        if _, ok := current[id]; !ok {
1✔
1024
                                delete(agent.servicetoPodUids[suid], id)
×
1025
                                delete(agent.podtoServiceUids[id], suid)
×
1026
                                if len(agent.podtoServiceUids[id]) == 0 {
×
1027
                                        delete(agent.podtoServiceUids, id)
×
1028
                                }
×
1029
                        }
1030
                }
1031
        } else {
×
1032
                agent.deleteServIpFromEp(suid)
×
1033
        }
×
1034
}
1035

1036
func (agent *HostAgent) getPodKeysFromSm(sm []opflexServiceMapping) []string {
1✔
1037
        var podkeys []string
1✔
1038
        if len(sm) > 0 {
2✔
1039
                podIps := sm[0].NextHopIps
1✔
1040
                for _, ip := range podIps {
2✔
1041
                        podkey, ok := agent.podIpToName[ip]
1✔
1042
                        if ok {
2✔
1043
                                podkeys = append(podkeys, podkey)
1✔
1044
                        }
1✔
1045
                }
1046
        }
1047
        return podkeys
1✔
1048
}
1049

1050
func (agent *HostAgent) getServiceIPs(poduid string) []string {
1✔
1051
        var ips []string
1✔
1052
        agent.indexMutex.Lock()
1✔
1053
        v, ok := agent.podtoServiceUids[poduid]
1✔
1054
        if ok {
2✔
1055
                for _, service_ips := range v {
2✔
1056
                        ips = append(ips, service_ips...)
1✔
1057
                }
1✔
1058
        }
1059
        agent.indexMutex.Unlock()
1✔
1060
        return ips
1✔
1061
}
1062

1063
func (agent *HostAgent) deleteServIpFromEp(suid string) {
1✔
1064
        v, ok := agent.servicetoPodUids[suid]
1✔
1065
        if ok {
2✔
1066
                for poduid := range v {
2✔
1067
                        if _, ok := agent.podtoServiceUids[poduid]; ok {
2✔
1068
                                delete(agent.podtoServiceUids[poduid], suid)
1✔
1069
                                if len(agent.podtoServiceUids[poduid]) == 0 {
2✔
1070
                                        delete(agent.podtoServiceUids, poduid)
1✔
1071
                                }
1✔
1072
                        }
1073
                }
1074
                delete(agent.servicetoPodUids, suid)
1✔
1075
        }
1076
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc