• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 13255351680

11 Feb 2025 03:34AM UTC coverage: 22.115% (-0.2%) from 22.323%
13255351680

Pull #4894

github

changluyi
fix e2e

Signed-off-by: clyi <clyi@alauda.io>
Pull Request #4894: ovn lb select the local chassis's backend prefer

0 of 446 new or added lines in 8 files covered. (0.0%)

3 existing lines in 2 files now uncovered.

10376 of 46919 relevant lines covered (22.11%)

0.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/daemon/controller_linux.go
1
package daemon
2

3
import (
4
        "errors"
5
        "fmt"
6
        "net"
7
        "os"
8
        "os/exec"
9
        "path/filepath"
10
        "reflect"
11
        "slices"
12
        "strings"
13
        "syscall"
14

15
        ovsutil "github.com/digitalocean/go-openvswitch/ovs"
16
        nadutils "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/utils"
17
        "github.com/kubeovn/felix/ipsets"
18
        "github.com/kubeovn/go-iptables/iptables"
19
        "github.com/vishvananda/netlink"
20
        "golang.org/x/sys/unix"
21
        v1 "k8s.io/api/core/v1"
22
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
23
        "k8s.io/apimachinery/pkg/labels"
24
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
25
        "k8s.io/client-go/tools/cache"
26
        "k8s.io/klog/v2"
27
        k8sipset "k8s.io/kubernetes/pkg/proxy/ipvs/ipset"
28
        k8siptables "k8s.io/kubernetes/pkg/util/iptables"
29

30
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
31
        "github.com/kubeovn/kube-ovn/pkg/ovs"
32
        "github.com/kubeovn/kube-ovn/pkg/util"
33
)
34

35
const (
36
        kernelModuleIPTables  = "ip_tables"
37
        kernelModuleIP6Tables = "ip6_tables"
38
)
39

40
// ControllerRuntime represents runtime specific controller members
41
type ControllerRuntime struct {
42
        iptables         map[string]*iptables.IPTables
43
        iptablesObsolete map[string]*iptables.IPTables
44
        k8siptables      map[string]k8siptables.Interface
45
        k8sipsets        k8sipset.Interface
46
        ipsets           map[string]*ipsets.IPSets
47
        gwCounters       map[string]*util.GwIPtableCounters
48

49
        nmSyncer  *networkManagerSyncer
50
        ovsClient *ovsutil.Client
51
}
52

53
type LbServiceRules struct {
54
        IP          string
55
        Port        uint16
56
        Protocol    string
57
        BridgeName  string
58
        DstMac      string
59
        UnderlayNic string
60
}
61

62
func evalCommandSymlinks(cmd string) (string, error) {
×
63
        path, err := exec.LookPath(cmd)
×
64
        if err != nil {
×
65
                return "", fmt.Errorf("failed to search for command %q: %w", cmd, err)
×
66
        }
×
67
        file, err := filepath.EvalSymlinks(path)
×
68
        if err != nil {
×
69
                return "", fmt.Errorf("failed to read evaluate symbolic links for file %q: %w", path, err)
×
70
        }
×
71

72
        return file, nil
×
73
}
74

75
func isLegacyIptablesMode() (bool, error) {
×
76
        path, err := evalCommandSymlinks("iptables")
×
77
        if err != nil {
×
78
                return false, err
×
79
        }
×
80
        pathLegacy, err := evalCommandSymlinks("iptables-legacy")
×
81
        if err != nil {
×
82
                return false, err
×
83
        }
×
84
        return path == pathLegacy, nil
×
85
}
86

87
func (c *Controller) initRuntime() error {
×
88
        ok, err := isLegacyIptablesMode()
×
89
        if err != nil {
×
90
                klog.Errorf("failed to check iptables mode: %v", err)
×
91
                return err
×
92
        }
×
93
        if !ok {
×
94
                // iptables works in nft mode, we should migrate iptables rules
×
95
                c.iptablesObsolete = make(map[string]*iptables.IPTables, 2)
×
96
        }
×
97

98
        c.iptables = make(map[string]*iptables.IPTables)
×
99
        c.ipsets = make(map[string]*ipsets.IPSets)
×
100
        c.gwCounters = make(map[string]*util.GwIPtableCounters)
×
101
        c.k8siptables = make(map[string]k8siptables.Interface)
×
102
        c.k8sipsets = k8sipset.New(c.k8sExec)
×
NEW
103
        c.ovsClient = ovsutil.New()
×
104

×
105
        if c.protocol == kubeovnv1.ProtocolIPv4 || c.protocol == kubeovnv1.ProtocolDual {
×
106
                ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv4)
×
107
                if err != nil {
×
108
                        klog.Error(err)
×
109
                        return err
×
110
                }
×
111
                c.iptables[kubeovnv1.ProtocolIPv4] = ipt
×
112
                if c.iptablesObsolete != nil {
×
113
                        ok, err := kernelModuleLoaded(kernelModuleIPTables)
×
114
                        if err != nil {
×
115
                                klog.Errorf("failed to check kernel module %s: %v", kernelModuleIPTables, err)
×
116
                        }
×
117
                        if ok {
×
118
                                if ipt, err = iptables.NewWithProtocolAndMode(iptables.ProtocolIPv4, "legacy"); err != nil {
×
119
                                        klog.Error(err)
×
120
                                        return err
×
121
                                }
×
122
                                c.iptablesObsolete[kubeovnv1.ProtocolIPv4] = ipt
×
123
                        }
124
                }
125
                c.ipsets[kubeovnv1.ProtocolIPv4] = ipsets.NewIPSets(ipsets.NewIPVersionConfig(ipsets.IPFamilyV4, IPSetPrefix, nil, nil))
×
126
                c.k8siptables[kubeovnv1.ProtocolIPv4] = k8siptables.New(c.k8sExec, k8siptables.ProtocolIPv4)
×
127
        }
128
        if c.protocol == kubeovnv1.ProtocolIPv6 || c.protocol == kubeovnv1.ProtocolDual {
×
129
                ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv6)
×
130
                if err != nil {
×
131
                        klog.Error(err)
×
132
                        return err
×
133
                }
×
134
                c.iptables[kubeovnv1.ProtocolIPv6] = ipt
×
135
                if c.iptablesObsolete != nil {
×
136
                        ok, err := kernelModuleLoaded(kernelModuleIP6Tables)
×
137
                        if err != nil {
×
138
                                klog.Errorf("failed to check kernel module %s: %v", kernelModuleIP6Tables, err)
×
139
                        }
×
140
                        if ok {
×
141
                                if ipt, err = iptables.NewWithProtocolAndMode(iptables.ProtocolIPv6, "legacy"); err != nil {
×
142
                                        klog.Error(err)
×
143
                                        return err
×
144
                                }
×
145
                                c.iptablesObsolete[kubeovnv1.ProtocolIPv6] = ipt
×
146
                        }
147
                }
148
                c.ipsets[kubeovnv1.ProtocolIPv6] = ipsets.NewIPSets(ipsets.NewIPVersionConfig(ipsets.IPFamilyV6, IPSetPrefix, nil, nil))
×
149
                c.k8siptables[kubeovnv1.ProtocolIPv6] = k8siptables.New(c.k8sExec, k8siptables.ProtocolIPv6)
×
150
        }
151

152
        c.nmSyncer = newNetworkManagerSyncer()
×
153
        c.nmSyncer.Run(c.transferAddrsAndRoutes)
×
154

×
155
        return nil
×
156
}
157

NEW
158
func (c *Controller) handleEnableExternalLBAddressChange(oldSubnet, newSubnet *kubeovnv1.Subnet) error {
×
NEW
159
        var subnetName string
×
NEW
160
        var action string
×
NEW
161

×
NEW
162
        switch {
×
NEW
163
        case oldSubnet != nil && newSubnet != nil:
×
NEW
164
                subnetName = oldSubnet.Name
×
NEW
165
                if oldSubnet.Spec.EnableExternalLBAddress != newSubnet.Spec.EnableExternalLBAddress {
×
NEW
166
                        klog.Infof("EnableExternalLBAddress changed for subnet %s", newSubnet.Name)
×
NEW
167
                        if newSubnet.Spec.EnableExternalLBAddress {
×
NEW
168
                                action = "add"
×
NEW
169
                        } else {
×
NEW
170
                                action = "remove"
×
NEW
171
                        }
×
172
                }
NEW
173
        case oldSubnet != nil:
×
NEW
174
                subnetName = oldSubnet.Name
×
NEW
175
                if oldSubnet.Spec.EnableExternalLBAddress {
×
NEW
176
                        klog.Infof("EnableExternalLBAddress removed for subnet %s", oldSubnet.Name)
×
NEW
177
                        action = "remove"
×
NEW
178
                }
×
NEW
179
        case newSubnet != nil:
×
NEW
180
                subnetName = newSubnet.Name
×
NEW
181
                if newSubnet.Spec.EnableExternalLBAddress {
×
NEW
182
                        klog.Infof("EnableExternalLBAddress added for subnet %s", newSubnet.Name)
×
NEW
183
                        action = "add"
×
NEW
184
                }
×
185
        }
186

NEW
187
        if action != "" {
×
NEW
188
                services, err := c.servicesLister.List(labels.Everything())
×
NEW
189
                if err != nil {
×
NEW
190
                        klog.Errorf("failed to list services: %v", err)
×
NEW
191
                        return err
×
NEW
192
                }
×
193

NEW
194
                for _, svc := range services {
×
NEW
195
                        if svc.Annotations[util.ServiceExternalIPFromSubnetAnnotation] == subnetName {
×
NEW
196
                                klog.Infof("Service %s/%s has external LB address pool annotation from subnet %s, action: %s", svc.Namespace, svc.Name, subnetName, action)
×
NEW
197
                                switch action {
×
NEW
198
                                case "add":
×
NEW
199
                                        c.serviceQueue.Add(&serviceEvent{newObj: svc})
×
NEW
200
                                case "remove":
×
NEW
201
                                        c.serviceQueue.Add(&serviceEvent{oldObj: svc})
×
202
                                }
203
                        }
204
                }
205
        }
NEW
206
        return nil
×
207
}
208

209
func (c *Controller) reconcileRouters(event *subnetEvent) error {
×
210
        subnets, err := c.subnetsLister.List(labels.Everything())
×
211
        if err != nil {
×
212
                klog.Errorf("failed to list subnets %v", err)
×
213
                return err
×
214
        }
×
215

216
        if event != nil {
×
217
                var ok bool
×
218
                var oldSubnet, newSubnet *kubeovnv1.Subnet
×
219
                if event.oldObj != nil {
×
220
                        if oldSubnet, ok = event.oldObj.(*kubeovnv1.Subnet); !ok {
×
221
                                klog.Errorf("expected old subnet in subnetEvent but got %#v", event.oldObj)
×
222
                                return nil
×
223
                        }
×
224
                }
225
                if event.newObj != nil {
×
226
                        if newSubnet, ok = event.newObj.(*kubeovnv1.Subnet); !ok {
×
227
                                klog.Errorf("expected new subnet in subnetEvent but got %#v", event.newObj)
×
228
                                return nil
×
229
                        }
×
230
                }
231

NEW
232
                if err = c.handleEnableExternalLBAddressChange(oldSubnet, newSubnet); err != nil {
×
NEW
233
                        klog.Errorf("failed to handle enable external lb address change: %v", err)
×
NEW
234
                        return err
×
NEW
235
                }
×
236
                // handle policy routing
237
                rulesToAdd, rulesToDel, routesToAdd, routesToDel, err := c.diffPolicyRouting(oldSubnet, newSubnet)
×
238
                if err != nil {
×
239
                        klog.Errorf("failed to get policy routing difference: %v", err)
×
240
                        return err
×
241
                }
×
242
                // add new routes first
243
                for _, r := range routesToAdd {
×
244
                        if err = netlink.RouteReplace(&r); err != nil && !errors.Is(err, syscall.EEXIST) {
×
245
                                klog.Errorf("failed to replace route for subnet %s: %v", newSubnet.Name, err)
×
246
                                return err
×
247
                        }
×
248
                }
249
                // next, add new rules
250
                for _, r := range rulesToAdd {
×
251
                        if err = netlink.RuleAdd(&r); err != nil && !errors.Is(err, syscall.EEXIST) {
×
252
                                klog.Errorf("failed to add network rule for subnet %s: %v", newSubnet.Name, err)
×
253
                                return err
×
254
                        }
×
255
                }
256
                // then delete old network rules
257
                for _, r := range rulesToDel {
×
258
                        // loop to delete all matched rules
×
259
                        for {
×
260
                                if err = netlink.RuleDel(&r); err != nil {
×
261
                                        if !errors.Is(err, syscall.ENOENT) {
×
262
                                                klog.Errorf("failed to delete network rule for subnet %s: %v", oldSubnet.Name, err)
×
263
                                                return err
×
264
                                        }
×
265
                                        break
×
266
                                }
267
                        }
268
                }
269
                // last, delete old network routes
270
                for _, r := range routesToDel {
×
271
                        if err = netlink.RouteDel(&r); err != nil && !errors.Is(err, syscall.ENOENT) {
×
272
                                klog.Errorf("failed to delete route for subnet %s: %v", oldSubnet.Name, err)
×
273
                                return err
×
274
                        }
×
275
                }
276
        }
277

278
        node, err := c.nodesLister.Get(c.config.NodeName)
×
279
        if err != nil {
×
280
                klog.Errorf("failed to get node %s %v", c.config.NodeName, err)
×
281
                return err
×
282
        }
×
283
        nodeIPv4, nodeIPv6 := util.GetNodeInternalIP(*node)
×
284
        var joinIPv4, joinIPv6 string
×
285
        if len(node.Annotations) != 0 {
×
286
                joinIPv4, joinIPv6 = util.SplitStringIP(node.Annotations[util.IPAddressAnnotation])
×
287
        }
×
288

289
        joinCIDR := make([]string, 0, 2)
×
290
        cidrs := make([]string, 0, len(subnets)*2)
×
291
        for _, subnet := range subnets {
×
292
                // The route for overlay subnet cidr via ovn0 should not be deleted even though subnet.Status has changed to not ready
×
293
                if subnet.Spec.Vpc != c.config.ClusterRouter ||
×
294
                        (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway && (!subnet.Spec.U2OInterconnection || (subnet.Spec.EnableLb != nil && *subnet.Spec.EnableLb))) ||
×
295
                        !subnet.Status.IsValidated() {
×
296
                        continue
×
297
                }
298

299
                for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
×
300
                        if _, ipNet, err := net.ParseCIDR(cidrBlock); err != nil {
×
301
                                klog.Errorf("%s is not a valid cidr block", cidrBlock)
×
302
                        } else {
×
303
                                if nodeIPv4 != "" && util.CIDRContainIP(cidrBlock, nodeIPv4) {
×
304
                                        continue
×
305
                                }
306
                                if nodeIPv6 != "" && util.CIDRContainIP(cidrBlock, nodeIPv6) {
×
307
                                        continue
×
308
                                }
309
                                cidrs = append(cidrs, ipNet.String())
×
310
                                if subnet.Name == c.config.NodeSwitch {
×
311
                                        joinCIDR = append(joinCIDR, ipNet.String())
×
312
                                }
×
313
                        }
314
                }
315
        }
316

317
        gateway, ok := node.Annotations[util.GatewayAnnotation]
×
318
        if !ok {
×
319
                klog.Errorf("annotation for node %s ovn.kubernetes.io/gateway not exists", node.Name)
×
320
                return errors.New("annotation for node ovn.kubernetes.io/gateway not exists")
×
321
        }
×
322
        nic, err := netlink.LinkByName(util.NodeNic)
×
323
        if err != nil {
×
324
                klog.Errorf("failed to get nic %s", util.NodeNic)
×
325
                return fmt.Errorf("failed to get nic %s", util.NodeNic)
×
326
        }
×
327

328
        allRoutes, err := getNicExistRoutes(nil, gateway)
×
329
        if err != nil {
×
330
                klog.Error(err)
×
331
                return err
×
332
        }
×
333
        nodeNicRoutes, err := getNicExistRoutes(nic, gateway)
×
334
        if err != nil {
×
335
                klog.Error(err)
×
336
                return err
×
337
        }
×
338
        toAdd, toDel := routeDiff(nodeNicRoutes, allRoutes, cidrs, joinCIDR, joinIPv4, joinIPv6, gateway, net.ParseIP(nodeIPv4), net.ParseIP(nodeIPv6))
×
339
        for _, r := range toDel {
×
340
                if err = netlink.RouteDel(&netlink.Route{Dst: r.Dst}); err != nil {
×
341
                        klog.Errorf("failed to del route %v", err)
×
342
                }
×
343
        }
344

345
        for _, r := range toAdd {
×
346
                r.LinkIndex = nic.Attrs().Index
×
347
                if err = netlink.RouteReplace(&r); err != nil {
×
348
                        klog.Errorf("failed to replace route %v: %v", r, err)
×
349
                }
×
350
        }
351

352
        return nil
×
353
}
354

NEW
355
func genLBServiceRules(service *v1.Service, bridgeName, underlayNic string) []LbServiceRules {
×
NEW
356
        var lbServiceRules []LbServiceRules
×
NEW
357
        for _, ingress := range service.Status.LoadBalancer.Ingress {
×
NEW
358
                for _, port := range service.Spec.Ports {
×
NEW
359
                        lbServiceRules = append(lbServiceRules, LbServiceRules{
×
NEW
360
                                IP:          ingress.IP,
×
NEW
361
                                Port:        uint16(port.Port), // #nosec G115
×
NEW
362
                                Protocol:    string(port.Protocol),
×
NEW
363
                                DstMac:      util.MasqueradeExternalLBAccessMac,
×
NEW
364
                                UnderlayNic: underlayNic,
×
NEW
365
                                BridgeName:  bridgeName,
×
NEW
366
                        })
×
NEW
367
                }
×
368
        }
NEW
369
        return lbServiceRules
×
370
}
371

NEW
372
func (c *Controller) diffExternalLBServiceRules(oldService, newService *v1.Service, isSubnetExternalLBEnabled bool) (lbServiceRulesToAdd, lbServiceRulesToDel []LbServiceRules, err error) {
×
NEW
373
        var oldlbServiceRules, newlbServiceRules []LbServiceRules
×
NEW
374

×
NEW
375
        if oldService != nil && oldService.Annotations[util.ServiceExternalIPFromSubnetAnnotation] != "" {
×
NEW
376
                oldBridgeName, underlayNic, err := c.getExtInfoBySubnet(oldService.Annotations[util.ServiceExternalIPFromSubnetAnnotation])
×
NEW
377
                if err != nil {
×
NEW
378
                        klog.Errorf("failed to get provider network by subnet %s: %v", oldService.Annotations[util.ServiceExternalIPFromSubnetAnnotation], err)
×
NEW
379
                        return nil, nil, err
×
NEW
380
                }
×
381

NEW
382
                oldlbServiceRules = genLBServiceRules(oldService, oldBridgeName, underlayNic)
×
383
        }
384

NEW
385
        if isSubnetExternalLBEnabled && newService != nil && newService.Annotations[util.ServiceExternalIPFromSubnetAnnotation] != "" {
×
NEW
386
                newBridgeName, underlayNic, err := c.getExtInfoBySubnet(newService.Annotations[util.ServiceExternalIPFromSubnetAnnotation])
×
NEW
387
                if err != nil {
×
NEW
388
                        klog.Errorf("failed to get provider network by subnet %s: %v", newService.Annotations[util.ServiceExternalIPFromSubnetAnnotation], err)
×
NEW
389
                        return nil, nil, err
×
NEW
390
                }
×
NEW
391
                newlbServiceRules = genLBServiceRules(newService, newBridgeName, underlayNic)
×
392
        }
393

NEW
394
        for _, oldRule := range oldlbServiceRules {
×
NEW
395
                found := false
×
NEW
396
                for _, newRule := range newlbServiceRules {
×
NEW
397
                        if oldRule == newRule {
×
NEW
398
                                found = true
×
NEW
399
                                break
×
400
                        }
401
                }
NEW
402
                if !found {
×
NEW
403
                        lbServiceRulesToDel = append(lbServiceRulesToDel, oldRule)
×
NEW
404
                }
×
405
        }
406

NEW
407
        for _, newRule := range newlbServiceRules {
×
NEW
408
                found := false
×
NEW
409
                for _, oldRule := range oldlbServiceRules {
×
NEW
410
                        if newRule == oldRule {
×
NEW
411
                                found = true
×
NEW
412
                                break
×
413
                        }
414
                }
NEW
415
                if !found {
×
NEW
416
                        lbServiceRulesToAdd = append(lbServiceRulesToAdd, newRule)
×
NEW
417
                }
×
418
        }
419

NEW
420
        return lbServiceRulesToAdd, lbServiceRulesToDel, nil
×
421
}
422

NEW
423
func (c *Controller) getExtInfoBySubnet(subnetName string) (string, string, error) {
×
NEW
424
        subnet, err := c.subnetsLister.Get(subnetName)
×
NEW
425
        if err != nil {
×
NEW
426
                klog.Errorf("failed to get subnet %s: %v", subnetName, err)
×
NEW
427
                return "", "", err
×
NEW
428
        }
×
429

NEW
430
        vlanName := subnet.Spec.Vlan
×
NEW
431
        if vlanName == "" {
×
NEW
432
                return "", "", errors.New("vlan not specified in subnet")
×
NEW
433
        }
×
434

NEW
435
        vlan, err := c.vlansLister.Get(vlanName)
×
NEW
436
        if err != nil {
×
NEW
437
                klog.Errorf("failed to get vlan %s: %v", vlanName, err)
×
NEW
438
                return "", "", err
×
NEW
439
        }
×
440

NEW
441
        providerNetworkName := vlan.Spec.Provider
×
NEW
442
        if providerNetworkName == "" {
×
NEW
443
                return "", "", errors.New("provider network not specified in vlan")
×
NEW
444
        }
×
445

NEW
446
        pn, err := c.providerNetworksLister.Get(providerNetworkName)
×
NEW
447
        if err != nil {
×
NEW
448
                klog.Errorf("failed to get provider network %s: %v", providerNetworkName, err)
×
NEW
449
                return "", "", err
×
NEW
450
        }
×
451

NEW
452
        underlayNic := pn.Spec.DefaultInterface
×
NEW
453
        for _, item := range pn.Spec.CustomInterfaces {
×
NEW
454
                if slices.Contains(item.Nodes, c.config.NodeName) {
×
NEW
455
                        underlayNic = item.Interface
×
NEW
456
                        break
×
457
                }
458
        }
NEW
459
        klog.Infof("Provider network: %s, Underlay NIC: %s", providerNetworkName, underlayNic)
×
NEW
460
        return util.ExternalBridgeName(providerNetworkName), underlayNic, nil
×
461
}
462

NEW
463
func (c *Controller) reconcileServices(event *serviceEvent) error {
×
NEW
464
        if event == nil {
×
NEW
465
                return nil
×
NEW
466
        }
×
NEW
467
        var ok bool
×
NEW
468
        var oldService, newService *v1.Service
×
NEW
469
        if event.oldObj != nil {
×
NEW
470
                if oldService, ok = event.oldObj.(*v1.Service); !ok {
×
NEW
471
                        klog.Errorf("expected old service in serviceEvent but got %#v", event.oldObj)
×
NEW
472
                        return nil
×
NEW
473
                }
×
474
        }
475

NEW
476
        if event.newObj != nil {
×
NEW
477
                if newService, ok = event.newObj.(*v1.Service); !ok {
×
NEW
478
                        klog.Errorf("expected new service in serviceEvent but got %#v", event.newObj)
×
NEW
479
                        return nil
×
NEW
480
                }
×
481
        }
482

483
        // check is the lb service IP related subnet's EnableExternalLBAddress
NEW
484
        isSubnetExternalLBEnabled := false
×
NEW
485
        if newService != nil && newService.Annotations[util.ServiceExternalIPFromSubnetAnnotation] != "" {
×
NEW
486
                subnet, err := c.subnetsLister.Get(newService.Annotations[util.ServiceExternalIPFromSubnetAnnotation])
×
NEW
487
                if err != nil {
×
NEW
488
                        klog.Errorf("failed to get subnet %s: %v", newService.Annotations[util.ServiceExternalIPFromSubnetAnnotation], err)
×
NEW
489
                        return err
×
NEW
490
                }
×
NEW
491
                isSubnetExternalLBEnabled = subnet.Spec.EnableExternalLBAddress
×
492
        }
493

NEW
494
        lbServiceRulesToAdd, lbServiceRulesToDel, err := c.diffExternalLBServiceRules(oldService, newService, isSubnetExternalLBEnabled)
×
NEW
495
        if err != nil {
×
NEW
496
                klog.Errorf("failed to get ip port difference: %v", err)
×
NEW
497
                return err
×
NEW
498
        }
×
499

NEW
500
        if len(lbServiceRulesToAdd) > 0 {
×
NEW
501
                for _, rule := range lbServiceRulesToAdd {
×
NEW
502
                        klog.Infof("Adding LB service rule: %+v", rule)
×
NEW
503
                        if err := ovs.AddOrUpdateUnderlaySubnetSvcLocalOpenFlow(c.ovsClient, rule.BridgeName, rule.IP, rule.Protocol, rule.DstMac, rule.UnderlayNic, rule.Port); err != nil {
×
NEW
504
                                klog.Errorf("failed to add or update underlay subnet svc local openflow: %v", err)
×
NEW
505
                        }
×
506
                }
507
        }
508

NEW
509
        if len(lbServiceRulesToDel) > 0 {
×
NEW
510
                for _, rule := range lbServiceRulesToDel {
×
NEW
511
                        klog.Infof("Delete LB service rule: %+v", rule)
×
NEW
512
                        if err := ovs.DeleteUnderlaySubnetSvcLocalOpenFlow(c.ovsClient, rule.BridgeName, rule.IP, rule.Protocol, rule.UnderlayNic, rule.Port); err != nil {
×
NEW
513
                                klog.Errorf("failed to delete underlay subnet svc local openflow: %v", err)
×
NEW
514
                        }
×
515
                }
516
        }
517

NEW
518
        return nil
×
519
}
520

521
func getNicExistRoutes(nic netlink.Link, gateway string) ([]netlink.Route, error) {
×
522
        var routes, existRoutes []netlink.Route
×
523
        var err error
×
524
        for _, gw := range strings.Split(gateway, ",") {
×
525
                if util.CheckProtocol(gw) == kubeovnv1.ProtocolIPv4 {
×
526
                        routes, err = netlink.RouteList(nic, netlink.FAMILY_V4)
×
527
                } else {
×
528
                        routes, err = netlink.RouteList(nic, netlink.FAMILY_V6)
×
529
                }
×
530
                if err != nil {
×
531
                        return nil, err
×
532
                }
×
533
                existRoutes = append(existRoutes, routes...)
×
534
        }
535
        return existRoutes, nil
×
536
}
537

538
func routeDiff(nodeNicRoutes, allRoutes []netlink.Route, cidrs, joinCIDR []string, joinIPv4, joinIPv6, gateway string, srcIPv4, srcIPv6 net.IP) (toAdd, toDel []netlink.Route) {
×
539
        // joinIPv6 is not used for now
×
540
        _ = joinIPv6
×
541

×
542
        for _, route := range nodeNicRoutes {
×
543
                if route.Scope == netlink.SCOPE_LINK || route.Dst == nil || route.Dst.IP.IsLinkLocalUnicast() {
×
544
                        continue
×
545
                }
546

547
                found := false
×
548
                for _, c := range cidrs {
×
549
                        if route.Dst.String() == c {
×
550
                                found = true
×
551
                                break
×
552
                        }
553
                }
554
                if !found {
×
555
                        toDel = append(toDel, route)
×
556
                }
×
557
                conflict := false
×
558
                for _, ar := range allRoutes {
×
559
                        if ar.Dst != nil && ar.Dst.String() == route.Dst.String() && ar.LinkIndex != route.LinkIndex {
×
560
                                // route conflict
×
561
                                conflict = true
×
562
                                break
×
563
                        }
564
                }
565
                if conflict {
×
566
                        toDel = append(toDel, route)
×
567
                }
×
568
        }
569
        if len(toDel) > 0 {
×
570
                klog.Infof("routes to delete: %v", toDel)
×
571
        }
×
572

573
        ipv4, ipv6 := util.SplitStringIP(gateway)
×
574
        gwV4, gwV6 := net.ParseIP(ipv4), net.ParseIP(ipv6)
×
575
        for _, c := range cidrs {
×
576
                var src, gw net.IP
×
577
                switch util.CheckProtocol(c) {
×
578
                case kubeovnv1.ProtocolIPv4:
×
579
                        src, gw = srcIPv4, gwV4
×
580
                case kubeovnv1.ProtocolIPv6:
×
581
                        src, gw = srcIPv6, gwV6
×
582
                }
583

584
                found := false
×
585
                for _, ar := range allRoutes {
×
586
                        if ar.Dst != nil && ar.Dst.String() == c {
×
587
                                // route already exist
×
588
                                found = true
×
589
                                break
×
590
                        }
591
                }
592
                if found {
×
593
                        continue
×
594
                }
595
                for _, r := range nodeNicRoutes {
×
596
                        if r.Dst == nil || r.Dst.String() != c {
×
597
                                continue
×
598
                        }
599
                        if src == nil {
×
600
                                if r.Src == nil {
×
601
                                        found = true
×
602
                                        break
×
603
                                }
604
                        } else if src.Equal(r.Src) {
×
605
                                found = true
×
606
                                break
×
607
                        }
608
                }
609
                if !found {
×
610
                        var priority int
×
611
                        scope := netlink.SCOPE_UNIVERSE
×
612
                        proto := netlink.RouteProtocol(syscall.RTPROT_STATIC)
×
613
                        if slices.Contains(joinCIDR, c) {
×
614
                                if util.CheckProtocol(c) == kubeovnv1.ProtocolIPv4 {
×
615
                                        src = net.ParseIP(joinIPv4)
×
616
                                } else {
×
617
                                        src, priority = nil, 256
×
618
                                }
×
619
                                gw, scope = nil, netlink.SCOPE_LINK
×
620
                                proto = netlink.RouteProtocol(unix.RTPROT_KERNEL)
×
621
                        }
622
                        _, cidr, _ := net.ParseCIDR(c)
×
623
                        toAdd = append(toAdd, netlink.Route{
×
624
                                Dst:      cidr,
×
625
                                Src:      src,
×
626
                                Gw:       gw,
×
627
                                Protocol: proto,
×
628
                                Scope:    scope,
×
629
                                Priority: priority,
×
630
                        })
×
631
                }
632
        }
633
        if len(toAdd) > 0 {
×
634
                klog.Infof("routes to add: %v", toAdd)
×
635
        }
×
636
        return
×
637
}
638

639
func getRulesToAdd(oldRules, newRules []netlink.Rule) []netlink.Rule {
×
640
        var toAdd []netlink.Rule
×
641

×
642
        for _, rule := range newRules {
×
643
                var found bool
×
644
                for _, r := range oldRules {
×
645
                        if r.Family == rule.Family && r.Priority == rule.Priority && r.Table == rule.Table && reflect.DeepEqual(r.Src, rule.Src) {
×
646
                                found = true
×
647
                                break
×
648
                        }
649
                }
650
                if !found {
×
651
                        toAdd = append(toAdd, rule)
×
652
                }
×
653
        }
654

655
        return toAdd
×
656
}
657

658
func getRoutesToAdd(oldRoutes, newRoutes []netlink.Route) []netlink.Route {
×
659
        var toAdd []netlink.Route
×
660

×
661
        for _, route := range newRoutes {
×
662
                var found bool
×
663
                for _, r := range oldRoutes {
×
664
                        if r.Equal(route) {
×
665
                                found = true
×
666
                                break
×
667
                        }
668
                }
669
                if !found {
×
670
                        toAdd = append(toAdd, route)
×
671
                }
×
672
        }
673

674
        return toAdd
×
675
}
676

677
func (c *Controller) diffPolicyRouting(oldSubnet, newSubnet *kubeovnv1.Subnet) (rulesToAdd, rulesToDel []netlink.Rule, routesToAdd, routesToDel []netlink.Route, err error) {
×
678
        oldRules, oldRoutes, err := c.getPolicyRouting(oldSubnet)
×
679
        if err != nil {
×
680
                klog.Error(err)
×
681
                return
×
682
        }
×
683
        newRules, newRoutes, err := c.getPolicyRouting(newSubnet)
×
684
        if err != nil {
×
685
                klog.Error(err)
×
686
                return
×
687
        }
×
688

689
        rulesToAdd = getRulesToAdd(oldRules, newRules)
×
690
        rulesToDel = getRulesToAdd(newRules, oldRules)
×
691
        routesToAdd = getRoutesToAdd(oldRoutes, newRoutes)
×
692
        routesToDel = getRoutesToAdd(newRoutes, oldRoutes)
×
693

×
694
        return
×
695
}
696

697
func (c *Controller) getPolicyRouting(subnet *kubeovnv1.Subnet) ([]netlink.Rule, []netlink.Route, error) {
×
698
        if subnet == nil || subnet.Spec.ExternalEgressGateway == "" || subnet.Spec.Vpc != c.config.ClusterRouter {
×
699
                return nil, nil, nil
×
700
        }
×
701
        if subnet.Spec.GatewayType == kubeovnv1.GWCentralizedType && !util.GatewayContains(subnet.Spec.GatewayNode, c.config.NodeName) {
×
702
                return nil, nil, nil
×
703
        }
×
704

705
        protocols := make([]string, 1, 2)
×
706
        if protocol := util.CheckProtocol(subnet.Spec.ExternalEgressGateway); protocol == kubeovnv1.ProtocolDual {
×
707
                protocols[0] = kubeovnv1.ProtocolIPv4
×
708
                protocols = append(protocols, kubeovnv1.ProtocolIPv6)
×
709
        } else {
×
710
                protocols[0] = protocol
×
711
        }
×
712

713
        cidr := strings.Split(subnet.Spec.CIDRBlock, ",")
×
714
        egw := strings.Split(subnet.Spec.ExternalEgressGateway, ",")
×
715

×
716
        // rules
×
717
        var rules []netlink.Rule
×
718
        rule := netlink.NewRule()
×
719
        rule.Table = int(subnet.Spec.PolicyRoutingTableID)
×
720
        rule.Priority = int(subnet.Spec.PolicyRoutingPriority)
×
721
        if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType {
×
722
                pods, err := c.podsLister.List(labels.Everything())
×
723
                if err != nil {
×
724
                        klog.Errorf("list pods failed, %+v", err)
×
725
                        return nil, nil, err
×
726
                }
×
727

728
                hostname := os.Getenv(util.HostnameEnv)
×
729
                for _, pod := range pods {
×
730
                        if pod.Spec.HostNetwork ||
×
731
                                pod.Status.PodIP == "" ||
×
732
                                pod.Annotations[util.LogicalSwitchAnnotation] != subnet.Name ||
×
733
                                pod.Spec.NodeName != hostname {
×
734
                                continue
×
735
                        }
736

737
                        for i := range protocols {
×
738
                                rule.Family, _ = util.ProtocolToFamily(protocols[i])
×
739

×
740
                                var ip net.IP
×
741
                                var maskBits int
×
742
                                if len(pod.Status.PodIPs) == 2 && protocols[i] == kubeovnv1.ProtocolIPv6 {
×
743
                                        ip = net.ParseIP(pod.Status.PodIPs[1].IP)
×
744
                                        maskBits = 128
×
745
                                } else if util.CheckProtocol(pod.Status.PodIP) == protocols[i] {
×
746
                                        ip = net.ParseIP(pod.Status.PodIP)
×
747
                                        maskBits = 32
×
748
                                        if rule.Family == netlink.FAMILY_V6 {
×
749
                                                maskBits = 128
×
750
                                        }
×
751
                                }
752

753
                                rule.Src = &net.IPNet{IP: ip, Mask: net.CIDRMask(maskBits, maskBits)}
×
754
                                rules = append(rules, *rule)
×
755
                        }
756
                }
757
        } else {
×
758
                for i := range protocols {
×
759
                        rule.Family, _ = util.ProtocolToFamily(protocols[i])
×
760
                        if len(cidr) == len(protocols) {
×
761
                                _, rule.Src, _ = net.ParseCIDR(cidr[i])
×
762
                        }
×
763
                        rules = append(rules, *rule)
×
764
                }
765
        }
766

767
        // routes
768
        var routes []netlink.Route
×
769
        for i := range protocols {
×
770
                routes = append(routes, netlink.Route{
×
771
                        Protocol: netlink.RouteProtocol(syscall.RTPROT_STATIC),
×
772
                        Table:    int(subnet.Spec.PolicyRoutingTableID),
×
773
                        Gw:       net.ParseIP(egw[i]),
×
774
                })
×
775
        }
×
776

777
        return rules, routes, nil
×
778
}
779

780
func (c *Controller) handlePod(key string) error {
×
781
        namespace, name, err := cache.SplitMetaNamespaceKey(key)
×
782
        if err != nil {
×
783
                utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
×
784
                return nil
×
785
        }
×
786
        klog.Infof("handle qos update for pod %s/%s", namespace, name)
×
787

×
788
        pod, err := c.podsLister.Pods(namespace).Get(name)
×
789
        if err != nil {
×
790
                if k8serrors.IsNotFound(err) {
×
791
                        return nil
×
792
                }
×
793
                klog.Error(err)
×
794
                return err
×
795
        }
796

797
        if err := util.ValidatePodNetwork(pod.Annotations); err != nil {
×
798
                klog.Errorf("validate pod %s/%s failed, %v", namespace, name, err)
×
799
                c.recorder.Eventf(pod, v1.EventTypeWarning, "ValidatePodNetworkFailed", err.Error())
×
800
                return err
×
801
        }
×
802

803
        podName := pod.Name
×
804
        if pod.Annotations[fmt.Sprintf(util.VMAnnotationTemplate, util.OvnProvider)] != "" {
×
805
                podName = pod.Annotations[fmt.Sprintf(util.VMAnnotationTemplate, util.OvnProvider)]
×
806
        }
×
807

808
        // set default nic bandwidth
809
        //  ovsIngress and ovsEgress are derived from the pod's egress and ingress rate annotations respectively, their roles are reversed from the OVS interface perspective.
810
        ifaceID := ovs.PodNameToPortName(podName, pod.Namespace, util.OvnProvider)
×
811
        ovsIngress := pod.Annotations[util.EgressRateAnnotation]
×
812
        ovsEgress := pod.Annotations[util.IngressRateAnnotation]
×
813
        err = ovs.SetInterfaceBandwidth(podName, pod.Namespace, ifaceID, ovsIngress, ovsEgress)
×
814
        if err != nil {
×
815
                klog.Error(err)
×
816
                return err
×
817
        }
×
818
        err = ovs.ConfigInterfaceMirror(c.config.EnableMirror, pod.Annotations[util.MirrorControlAnnotation], ifaceID)
×
819
        if err != nil {
×
820
                klog.Error(err)
×
821
                return err
×
822
        }
×
823
        // set linux-netem qos
824
        err = ovs.SetNetemQos(podName, pod.Namespace, ifaceID, pod.Annotations[util.NetemQosLatencyAnnotation], pod.Annotations[util.NetemQosLimitAnnotation], pod.Annotations[util.NetemQosLossAnnotation], pod.Annotations[util.NetemQosJitterAnnotation])
×
825
        if err != nil {
×
826
                klog.Error(err)
×
827
                return err
×
828
        }
×
829

830
        // set multus-nic bandwidth
831
        attachNets, err := nadutils.ParsePodNetworkAnnotation(pod)
×
832
        if err != nil {
×
833
                klog.Error(err)
×
834
                return err
×
835
        }
×
836
        for _, multiNet := range attachNets {
×
837
                provider := fmt.Sprintf("%s.%s.%s", multiNet.Name, multiNet.Namespace, util.OvnProvider)
×
838
                if pod.Annotations[fmt.Sprintf(util.VMAnnotationTemplate, provider)] != "" {
×
839
                        podName = pod.Annotations[fmt.Sprintf(util.VMAnnotationTemplate, provider)]
×
840
                }
×
841
                if pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, provider)] == "true" {
×
842
                        ifaceID = ovs.PodNameToPortName(podName, pod.Namespace, provider)
×
843

×
844
                        err = ovs.SetInterfaceBandwidth(podName, pod.Namespace, ifaceID, pod.Annotations[fmt.Sprintf(util.EgressRateAnnotationTemplate, provider)], pod.Annotations[fmt.Sprintf(util.IngressRateAnnotationTemplate, provider)])
×
845
                        if err != nil {
×
846
                                klog.Error(err)
×
847
                                return err
×
848
                        }
×
849
                        err = ovs.ConfigInterfaceMirror(c.config.EnableMirror, pod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)], ifaceID)
×
850
                        if err != nil {
×
851
                                klog.Error(err)
×
852
                                return err
×
853
                        }
×
854
                        err = ovs.SetNetemQos(podName, pod.Namespace, ifaceID, pod.Annotations[fmt.Sprintf(util.NetemQosLatencyAnnotationTemplate, provider)], pod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)], pod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)], pod.Annotations[fmt.Sprintf(util.NetemQosJitterAnnotationTemplate, provider)])
×
855
                        if err != nil {
×
856
                                klog.Error(err)
×
857
                                return err
×
858
                        }
×
859
                }
860
        }
861
        return nil
×
862
}
863

864
func (c *Controller) loopEncapIPCheck() {
×
865
        node, err := c.nodesLister.Get(c.config.NodeName)
×
866
        if err != nil {
×
867
                klog.Errorf("failed to get node %s %v", c.config.NodeName, err)
×
868
                return
×
869
        }
×
870

871
        if nodeTunnelName := node.GetAnnotations()[util.TunnelInterfaceAnnotation]; nodeTunnelName != "" {
×
872
                iface, err := findInterface(nodeTunnelName)
×
873
                if err != nil {
×
874
                        klog.Errorf("failed to find iface %s, %v", nodeTunnelName, err)
×
875
                        return
×
876
                }
×
877
                if iface.Flags&net.FlagUp == 0 {
×
878
                        klog.Errorf("iface %v is down", nodeTunnelName)
×
879
                        return
×
880
                }
×
881
                addrs, err := iface.Addrs()
×
882
                if err != nil {
×
883
                        klog.Errorf("failed to get iface addr. %v", err)
×
884
                        return
×
885
                }
×
886
                if len(addrs) == 0 {
×
887
                        klog.Errorf("iface %s has no ip address", nodeTunnelName)
×
888
                        return
×
889
                }
×
890
                if iface.Name != c.config.tunnelIface {
×
891
                        klog.Infof("use %s as tunnel interface", iface.Name)
×
892
                        c.config.tunnelIface = iface.Name
×
893
                }
×
894

895
                // if assigned iface in node annotation is down or with no ip, the error msg should be printed periodically
896
                if c.config.Iface == nodeTunnelName {
×
897
                        klog.V(3).Infof("node tunnel interface %s not changed", nodeTunnelName)
×
898
                        return
×
899
                }
×
900
                c.config.Iface = nodeTunnelName
×
901
                klog.Infof("Update node tunnel interface %v", nodeTunnelName)
×
902

×
903
                encapIP := strings.Split(addrs[0].String(), "/")[0]
×
904
                if err = setEncapIP(encapIP); err != nil {
×
905
                        klog.Errorf("failed to set encap ip %s for iface %s", encapIP, c.config.Iface)
×
906
                        return
×
907
                }
×
908
        }
909
}
910

911
func (c *Controller) ovnMetricsUpdate() {
×
912
        c.setOvnSubnetGatewayMetric()
×
913

×
914
        resetSysParaMetrics()
×
915
        c.setIPLocalPortRangeMetric()
×
916
        c.setCheckSumErrMetric()
×
917
        c.setDNSSearchMetric()
×
918
        c.setTCPTwRecycleMetric()
×
919
        c.setTCPMtuProbingMetric()
×
920
        c.setConntrackTCPLiberalMetric()
×
921
        c.setBridgeNfCallIptablesMetric()
×
922
        c.setIPv6RouteMaxsizeMetric()
×
923
        c.setTCPMemMetric()
×
924
}
×
925

926
func resetSysParaMetrics() {
×
927
        metricIPLocalPortRange.Reset()
×
928
        metricCheckSumErr.Reset()
×
929
        metricDNSSearch.Reset()
×
930
        metricTCPTwRecycle.Reset()
×
931
        metricTCPMtuProbing.Reset()
×
932
        metricConntrackTCPLiberal.Reset()
×
933
        metricBridgeNfCallIptables.Reset()
×
934
        metricTCPMem.Reset()
×
935
        metricIPv6RouteMaxsize.Reset()
×
936
}
×
937

938
func rotateLog() {
×
939
        output, err := exec.Command("logrotate", "/etc/logrotate.d/openvswitch").CombinedOutput()
×
940
        if err != nil {
×
941
                klog.Errorf("failed to rotate openvswitch log %q", output)
×
942
        }
×
943
        output, err = exec.Command("logrotate", "/etc/logrotate.d/ovn").CombinedOutput()
×
944
        if err != nil {
×
945
                klog.Errorf("failed to rotate ovn log %q", output)
×
946
        }
×
947
        output, err = exec.Command("logrotate", "/etc/logrotate.d/kubeovn").CombinedOutput()
×
948
        if err != nil {
×
949
                klog.Errorf("failed to rotate kube-ovn log %q", output)
×
950
        }
×
951
}
952

953
func kernelModuleLoaded(module string) (bool, error) {
×
954
        data, err := os.ReadFile("/proc/modules")
×
955
        if err != nil {
×
956
                klog.Errorf("failed to read /proc/modules: %v", err)
×
957
                return false, err
×
958
        }
×
959

960
        for _, line := range strings.Split(string(data), "\n") {
×
961
                if fields := strings.Fields(line); len(fields) != 0 && fields[0] == module {
×
962
                        return true, nil
×
963
                }
×
964
        }
965

966
        return false, nil
×
967
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc