• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 13425935899

20 Feb 2025 01:32AM UTC coverage: 22.263% (+0.2%) from 22.068%
13425935899

Pull #4991

github

zhangzujian
wip

Signed-off-by: zhangzujian <zhangzujian.7@gmail.com>
Pull Request #4991: add support for internalTrafficPolicy=Local

175 of 617 new or added lines in 13 files covered. (28.36%)

8 existing lines in 5 files now uncovered.

10436 of 46876 relevant lines covered (22.26%)

0.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/controller/node.go
1
package controller
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "reflect"
8
        "slices"
9
        "strconv"
10
        "strings"
11
        "time"
12

13
        goping "github.com/prometheus-community/pro-bing"
14
        "github.com/scylladb/go-set/strset"
15
        v1 "k8s.io/api/core/v1"
16
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
17
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
        "k8s.io/apimachinery/pkg/labels"
19
        "k8s.io/apimachinery/pkg/types"
20
        "k8s.io/client-go/tools/cache"
21
        "k8s.io/klog/v2"
22

23
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
24
        "github.com/kubeovn/kube-ovn/pkg/ovs"
25
        "github.com/kubeovn/kube-ovn/pkg/util"
26
)
27

28
type ErrChassisNotFound struct {
29
        Chassis string
30
        Node    string
31
}
32

33
func (e *ErrChassisNotFound) Error() string {
×
34
        return fmt.Sprintf("chassis %s not found for node %s", e.Chassis, e.Node)
×
35
}
×
36

37
func (c *Controller) enqueueAddNode(obj interface{}) {
×
38
        key := cache.MetaObjectToName(obj.(*v1.Node)).String()
×
39
        klog.V(3).Infof("enqueue add node %s", key)
×
40
        c.addNodeQueue.Add(key)
×
41
}
×
42

43
func nodeReady(node *v1.Node) bool {
×
44
        var ready, networkUnavailable bool
×
45
        for _, c := range node.Status.Conditions {
×
46
                switch c.Type {
×
47
                case v1.NodeReady:
×
48
                        ready = c.Status == v1.ConditionTrue
×
49
                case v1.NodeNetworkUnavailable:
×
50
                        networkUnavailable = c.Status == v1.ConditionTrue
×
51
                }
52
        }
53
        return ready && !networkUnavailable
×
54
}
55

56
func (c *Controller) enqueueUpdateNode(oldObj, newObj interface{}) {
×
57
        oldNode := oldObj.(*v1.Node)
×
58
        newNode := newObj.(*v1.Node)
×
59

×
60
        if nodeReady(oldNode) != nodeReady(newNode) ||
×
61
                !reflect.DeepEqual(oldNode.Annotations, newNode.Annotations) {
×
62
                key := cache.MetaObjectToName(newNode).String()
×
63
                if len(newNode.Annotations) == 0 || newNode.Annotations[util.AllocatedAnnotation] != "true" {
×
64
                        klog.V(3).Infof("enqueue add node %s", key)
×
65
                        c.addNodeQueue.Add(key)
×
66
                } else {
×
67
                        klog.V(3).Infof("enqueue update node %s", key)
×
68
                        c.updateNodeQueue.Add(key)
×
69
                }
×
70
        }
71
}
72

73
func (c *Controller) enqueueDeleteNode(obj interface{}) {
×
74
        node := obj.(*v1.Node)
×
75
        key := cache.MetaObjectToName(node).String()
×
76
        klog.V(3).Infof("enqueue delete node %s", key)
×
77
        c.deletingNodeObjMap.Store(key, node)
×
78
        c.deleteNodeQueue.Add(key)
×
79
}
×
80

81
func nodeUnderlayAddressSetName(node string, af int) string {
×
82
        return fmt.Sprintf("node_%s_underlay_v%d", strings.ReplaceAll(node, "-", "_"), af)
×
83
}
×
84

NEW
85
func (c *Controller) initChassisTemplateVar(node *v1.Node) error {
×
NEW
86
        if !c.config.EnableLb {
×
NEW
87
                if err := c.OVNNbClient.DeleteChassisTemplateVarByNodeName(node.Name); err != nil {
×
NEW
88
                        err = fmt.Errorf("failed to delete ovn-nb Chassis_Template_Var for node %s: %w", node.Name, err)
×
NEW
89
                        klog.Error(err)
×
NEW
90
                        return err
×
NEW
91
                }
×
NEW
92
                return nil
×
93
        }
94

NEW
95
        chassisName := node.Annotations[util.ChassisAnnotation]
×
NEW
96
        if chassisName == "" {
×
NEW
97
                // kube-ovn-cni not ready to set chassis
×
NEW
98
                klog.Infof("chassis annotation not found for node %s, skip initializing ovn-nb Chassis_Template_Var record", node.Name)
×
NEW
99
                return nil
×
NEW
100
        }
×
101

NEW
102
        if err := c.OVNNbClient.CreateChassisTemplateVar(node.Name, chassisName, nil); err != nil {
×
NEW
103
                err = fmt.Errorf("failed to create ovn-nb Chassis_Template_Var %s for node %s: %w", chassisName, node.Name, err)
×
NEW
104
                klog.Error(err)
×
NEW
105
                return err
×
NEW
106
        }
×
107

NEW
108
        return nil
×
109
}
110

111
func (c *Controller) handleAddNode(key string) error {
×
112
        c.nodeKeyMutex.LockKey(key)
×
113
        defer func() { _ = c.nodeKeyMutex.UnlockKey(key) }()
×
114

115
        cachedNode, err := c.nodesLister.Get(key)
×
116
        if err != nil {
×
117
                if k8serrors.IsNotFound(err) {
×
118
                        return nil
×
119
                }
×
120
                klog.Errorf("failed to get node %s: %v", key, err)
×
121
                return err
×
122
        }
123
        node := cachedNode.DeepCopy()
×
124
        klog.Infof("handle add node %s", node.Name)
×
125

×
NEW
126
        // init ovn-nb Chassis_Template_Var record which is used by template Load_Balancer
×
NEW
127
        if err = c.initChassisTemplateVar(node); err != nil {
×
NEW
128
                klog.Errorf("failed to init Chassis_Template_Var for node %s: %v", node.Name, err)
×
NEW
129
                return err
×
NEW
130
        }
×
131

132
        subnets, err := c.subnetsLister.List(labels.Everything())
×
133
        if err != nil {
×
134
                klog.Errorf("failed to list subnets: %v", err)
×
135
                return err
×
136
        }
×
137

138
        nodeIPv4, nodeIPv6 := util.GetNodeInternalIP(*node)
×
139
        for _, subnet := range subnets {
×
140
                if subnet.Spec.Vpc != c.config.ClusterRouter {
×
141
                        continue
×
142
                }
143

144
                v4, v6 := util.SplitStringIP(subnet.Spec.CIDRBlock)
×
145
                if subnet.Spec.Vlan == "" && (util.CIDRContainIP(v4, nodeIPv4) || util.CIDRContainIP(v6, nodeIPv6)) {
×
146
                        msg := fmt.Sprintf("internal IP address of node %s is in CIDR of subnet %s, this may result in network issues", node.Name, subnet.Name)
×
147
                        klog.Warning(msg)
×
148
                        c.recorder.Eventf(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: node.Name, UID: types.UID(node.Name)}}, v1.EventTypeWarning, "NodeAddressConflictWithSubnet", msg)
×
149
                        break
×
150
                }
151
        }
152

153
        if err = c.handleNodeAnnotationsForProviderNetworks(node); err != nil {
×
154
                klog.Errorf("failed to handle annotations of node %s for provider networks: %v", node.Name, err)
×
155
                return err
×
156
        }
×
157

158
        subnet, err := c.subnetsLister.Get(c.config.NodeSwitch)
×
159
        if err != nil {
×
160
                klog.Errorf("failed to get node subnet: %v", err)
×
161
                return err
×
162
        }
×
163

164
        var v4IP, v6IP, mac string
×
165
        portName := util.NodeLspName(key)
×
166
        if node.Annotations[util.AllocatedAnnotation] == "true" && node.Annotations[util.IPAddressAnnotation] != "" && node.Annotations[util.MacAddressAnnotation] != "" {
×
167
                macStr := node.Annotations[util.MacAddressAnnotation]
×
168
                v4IP, v6IP, mac, err = c.ipam.GetStaticAddress(portName, portName, node.Annotations[util.IPAddressAnnotation],
×
169
                        &macStr, node.Annotations[util.LogicalSwitchAnnotation], true)
×
170
                if err != nil {
×
171
                        klog.Errorf("failed to alloc static ip addrs for node %v: %v", node.Name, err)
×
172
                        return err
×
173
                }
×
174
        } else {
×
175
                v4IP, v6IP, mac, err = c.ipam.GetRandomAddress(portName, portName, nil, c.config.NodeSwitch, "", nil, true)
×
176
                if err != nil {
×
177
                        klog.Errorf("failed to alloc random ip addrs for node %v: %v", node.Name, err)
×
178
                        return err
×
179
                }
×
180
        }
181

182
        ipStr := util.GetStringIP(v4IP, v6IP)
×
183
        if err := c.OVNNbClient.CreateBareLogicalSwitchPort(c.config.NodeSwitch, portName, ipStr, mac); err != nil {
×
184
                klog.Errorf("failed to create logical switch port %s: %v", portName, err)
×
185
                return err
×
186
        }
×
187

188
        for _, ip := range strings.Split(ipStr, ",") {
×
189
                if ip == "" {
×
190
                        continue
×
191
                }
192

193
                nodeIP, af := nodeIPv4, 4
×
194
                protocol := util.CheckProtocol(ip)
×
195
                if protocol == kubeovnv1.ProtocolIPv6 {
×
196
                        nodeIP, af = nodeIPv6, 6
×
197
                }
×
198
                if nodeIP != "" {
×
199
                        var (
×
200
                                match       = fmt.Sprintf("ip%d.dst == %s", af, nodeIP)
×
201
                                action      = kubeovnv1.PolicyRouteActionReroute
×
202
                                externalIDs = map[string]string{
×
203
                                        "vendor":         util.CniTypeName,
×
204
                                        "node":           node.Name,
×
205
                                        "address-family": strconv.Itoa(af),
×
206
                                }
×
207
                        )
×
208
                        klog.Infof("add policy route for router: %s, match %s, action %s, nexthop %s, externalID %v", c.config.ClusterRouter, match, action, ip, externalIDs)
×
209
                        if err = c.addPolicyRouteToVpc(
×
210
                                c.config.ClusterRouter,
×
211
                                &kubeovnv1.PolicyRoute{
×
212
                                        Priority:  util.NodeRouterPolicyPriority,
×
213
                                        Match:     match,
×
214
                                        Action:    action,
×
215
                                        NextHopIP: ip,
×
216
                                },
×
217
                                externalIDs,
×
218
                        ); err != nil {
×
219
                                klog.Errorf("failed to add logical router policy for node %s: %v", node.Name, err)
×
220
                                return err
×
221
                        }
×
222

223
                        dnsIPs := make([]string, 0, len(c.config.NodeLocalDNSIPs))
×
224
                        for _, ip := range c.config.NodeLocalDNSIPs {
×
225
                                if util.CheckProtocol(ip) == protocol {
×
226
                                        dnsIPs = append(dnsIPs, ip)
×
227
                                }
×
228
                        }
229

230
                        if err = c.addPolicyRouteForLocalDNSCacheOnNode(dnsIPs, portName, ip, node.Name, af); err != nil {
×
231
                                klog.Errorf("failed to add policy route for node %s: %v", node.Name, err)
×
232
                                return err
×
233
                        }
×
234
                }
235
        }
236

237
        if err := c.addNodeGatewayStaticRoute(); err != nil {
×
238
                klog.Errorf("failed to add static route for node gw: %v", err)
×
239
                return err
×
240
        }
×
241

242
        patch := util.KVPatch{
×
243
                util.IPAddressAnnotation:     ipStr,
×
244
                util.MacAddressAnnotation:    mac,
×
245
                util.CidrAnnotation:          subnet.Spec.CIDRBlock,
×
246
                util.GatewayAnnotation:       subnet.Spec.Gateway,
×
247
                util.LogicalSwitchAnnotation: c.config.NodeSwitch,
×
248
                util.AllocatedAnnotation:     "true",
×
249
                util.PortNameAnnotation:      portName,
×
250
        }
×
251
        if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
252
                klog.Errorf("failed to update annotations of node %s: %v", node.Name, err)
×
253
                return err
×
254
        }
×
255

256
        if err := c.createOrUpdateIPCR("", "", ipStr, mac, c.config.NodeSwitch, "", node.Name, ""); err != nil {
×
257
                klog.Errorf("failed to create or update IPs %s: %v", portName, err)
×
258
                return err
×
259
        }
×
260

261
        for _, subnet := range subnets {
×
262
                if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) || subnet.Spec.Vpc != c.config.ClusterRouter || subnet.Name == c.config.NodeSwitch || subnet.Spec.GatewayType != kubeovnv1.GWDistributedType {
×
263
                        continue
×
264
                }
265
                if err = c.createPortGroupForDistributedSubnet(node, subnet); err != nil {
×
266
                        klog.Errorf("failed to create port group for node %s and subnet %s: %v", node.Name, subnet.Name, err)
×
267
                        return err
×
268
                }
×
269
                // policy route for overlay distributed subnet should be reconciled when node ip changed
270
                c.addOrUpdateSubnetQueue.Add(subnet.Name)
×
271
        }
272

273
        // ovn acl doesn't support address_set name with '-', so replace '-' by '.'
274
        pgName := strings.ReplaceAll(portName, "-", ".")
×
275
        if err = c.OVNNbClient.CreatePortGroup(pgName, map[string]string{networkPolicyKey: "node" + "/" + key}); err != nil {
×
276
                klog.Errorf("create port group %s for node %s: %v", pgName, key, err)
×
277
                return err
×
278
        }
×
279

280
        if err := c.addPolicyRouteForCentralizedSubnetOnNode(node.Name, ipStr); err != nil {
×
281
                klog.Errorf("failed to add policy route for node %s, %v", key, err)
×
282
                return err
×
283
        }
×
284

285
        if err := c.UpdateChassisTag(node); err != nil {
×
286
                klog.Errorf("failed to update chassis tag for node %s: %v", node.Name, err)
×
287
                return err
×
288
        }
×
289

290
        if err := c.retryDelDupChassis(util.ChassisRetryMaxTimes, util.ChassisControllerRetryInterval, c.cleanDuplicatedChassis, node); err != nil {
×
291
                klog.Errorf("failed to clean duplicated chassis for node %s: %v", node.Name, err)
×
292
                return err
×
293
        }
×
294
        return nil
×
295
}
296

297
func (c *Controller) handleNodeAnnotationsForProviderNetworks(node *v1.Node) error {
×
298
        providerNetworks, err := c.providerNetworksLister.List(labels.Everything())
×
299
        if err != nil && !k8serrors.IsNotFound(err) {
×
300
                klog.Errorf("failed to list provider networks: %v", err)
×
301
                return err
×
302
        }
×
303

304
        for _, pn := range providerNetworks {
×
305
                excludeAnno := fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name)
×
306
                interfaceAnno := fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)
×
307

×
308
                var newPn *kubeovnv1.ProviderNetwork
×
309
                excluded := slices.Contains(pn.Spec.ExcludeNodes, node.Name)
×
310
                if !excluded && len(node.Annotations) != 0 && node.Annotations[excludeAnno] == "true" {
×
311
                        newPn = pn.DeepCopy()
×
312
                        newPn.Spec.ExcludeNodes = append(newPn.Spec.ExcludeNodes, node.Name)
×
313
                        excluded = true
×
314
                }
×
315

316
                var customInterface string
×
317
                for _, v := range pn.Spec.CustomInterfaces {
×
318
                        if slices.Contains(v.Nodes, node.Name) {
×
319
                                customInterface = v.Interface
×
320
                                break
×
321
                        }
322
                }
323
                if customInterface == "" && len(node.Annotations) != 0 {
×
324
                        if customInterface = node.Annotations[interfaceAnno]; customInterface != "" {
×
325
                                if newPn == nil {
×
326
                                        newPn = pn.DeepCopy()
×
327
                                }
×
328
                                var index *int
×
329
                                for i := range newPn.Spec.CustomInterfaces {
×
330
                                        if newPn.Spec.CustomInterfaces[i].Interface == customInterface {
×
331
                                                index = &i
×
332
                                                break
×
333
                                        }
334
                                }
335
                                if index != nil {
×
336
                                        newPn.Spec.CustomInterfaces[*index].Nodes = append(newPn.Spec.CustomInterfaces[*index].Nodes, node.Name)
×
337
                                } else {
×
338
                                        ci := kubeovnv1.CustomInterface{Interface: customInterface, Nodes: []string{node.Name}}
×
339
                                        newPn.Spec.CustomInterfaces = append(newPn.Spec.CustomInterfaces, ci)
×
340
                                }
×
341
                        }
342
                }
343

344
                if newPn != nil {
×
345
                        if newPn, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().Update(context.Background(), newPn, metav1.UpdateOptions{}); err != nil {
×
346
                                klog.Errorf("failed to update provider network %s: %v", pn.Name, err)
×
347
                                return err
×
348
                        }
×
349
                }
350

351
                if len(node.Annotations) != 0 {
×
352
                        patch := util.KVPatch{excludeAnno: nil, interfaceAnno: nil}
×
353
                        if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
354
                                klog.Errorf("failed to patch node %s: %v", node.Name, err)
×
355
                                return err
×
356
                        }
×
357
                }
358

359
                if excluded {
×
360
                        if newPn == nil {
×
361
                                newPn = pn.DeepCopy()
×
362
                        } else {
×
363
                                newPn = newPn.DeepCopy()
×
364
                        }
×
365

366
                        if newPn.Status.EnsureNodeStandardConditions(node.Name) {
×
367
                                _, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().UpdateStatus(context.Background(), newPn, metav1.UpdateOptions{})
×
368
                                if err != nil {
×
369
                                        klog.Errorf("failed to update status of provider network %s: %v", pn.Name, err)
×
370
                                        return err
×
371
                                }
×
372
                        }
373
                }
374
        }
375

376
        return nil
×
377
}
378

379
func (c *Controller) handleDeleteNode(key string) (err error) {
×
380
        c.nodeKeyMutex.LockKey(key)
×
381
        defer func() {
×
382
                _ = c.nodeKeyMutex.UnlockKey(key)
×
383
                if err == nil {
×
384
                        c.deletingNodeObjMap.Delete(key)
×
385
                }
×
386
        }()
387
        klog.Infof("handle delete node %s", key)
×
388

×
389
        node, ok := c.deletingNodeObjMap.Load(key)
×
390
        if !ok {
×
391
                return nil
×
392
        }
×
393
        n, _ := c.nodesLister.Get(key)
×
394
        if n != nil && n.UID != node.UID {
×
395
                klog.Warningf("Node %s is adding, skip the node delete handler, but it may leave some gc resources behind", key)
×
396
                return nil
×
397
        }
×
398
        return c.deleteNode(key)
×
399
}
400

401
func (c *Controller) deleteNode(key string) error {
×
402
        portName := util.NodeLspName(key)
×
403
        klog.Infof("delete logical switch port %s", portName)
×
404
        if err := c.OVNNbClient.DeleteLogicalSwitchPort(portName); err != nil {
×
405
                klog.Errorf("failed to delete node switch port %s: %v", portName, err)
×
406
                return err
×
407
        }
×
408
        if err := c.OVNSbClient.DeleteChassisByHost(key); err != nil {
×
409
                klog.Errorf("failed to delete chassis for node %s: %v", key, err)
×
410
                return err
×
411
        }
×
412

413
        for _, af := range [...]int{4, 6} {
×
414
                if err := c.deletePolicyRouteForLocalDNSCacheOnNode(key, af); err != nil {
×
415
                        klog.Error(err)
×
416
                        return err
×
417
                }
×
418
        }
419

420
        // ovn acl doesn't support address_set name with '-', so replace '-' by '.'
421
        pgName := strings.ReplaceAll(portName, "-", ".")
×
422
        if err := c.OVNNbClient.DeletePortGroup(pgName); err != nil {
×
423
                klog.Errorf("delete port group %s for node: %v", portName, err)
×
424
                return err
×
425
        }
×
426

427
        if err := c.deletePolicyRouteForNode(key, portName); err != nil {
×
428
                klog.Errorf("failed to delete policy route for node %s: %v", key, err)
×
429
                return err
×
430
        }
×
431

432
        if err := c.OVNNbClient.DeleteAddressSet(nodeUnderlayAddressSetName(key, 4)); err != nil {
×
433
                klog.Errorf("failed to delete address set for node %s: %v", key, err)
×
434
                return err
×
435
        }
×
436
        if err := c.OVNNbClient.DeleteAddressSet(nodeUnderlayAddressSetName(key, 6)); err != nil {
×
437
                klog.Errorf("failed to delete address set for node %s: %v", key, err)
×
438
                return err
×
439
        }
×
440

441
        klog.Infof("release node port %s", portName)
×
442
        c.ipam.ReleaseAddressByPod(portName, c.config.NodeSwitch)
×
443

×
444
        providerNetworks, err := c.providerNetworksLister.List(labels.Everything())
×
445
        if err != nil && !k8serrors.IsNotFound(err) {
×
446
                klog.Errorf("failed to list provider networks: %v", err)
×
447
                return err
×
448
        }
×
449

450
        for _, pn := range providerNetworks {
×
451
                if err = c.updateProviderNetworkForNodeDeletion(pn, key); err != nil {
×
452
                        klog.Error(err)
×
453
                        return err
×
454
                }
×
455
        }
456
        klog.Infof("delete node ip %s", portName)
×
457
        if err = c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), portName, metav1.DeleteOptions{}); err != nil && !k8serrors.IsNotFound(err) {
×
458
                return err
×
459
        }
×
460

NEW
461
        if c.config.EnableLb {
×
NEW
462
                if err = c.OVNNbClient.DeleteChassisTemplateVarByNodeName(key); err != nil {
×
NEW
463
                        err = fmt.Errorf("failed to delete ovn-nb Chassis_Template_Var for node %s: %w", key, err)
×
NEW
464
                        klog.Error(err)
×
NEW
465
                        return err
×
NEW
466
                }
×
467
        }
468

UNCOV
469
        return nil
×
470
}
471

472
func (c *Controller) updateProviderNetworkForNodeDeletion(pn *kubeovnv1.ProviderNetwork, node string) error {
×
473
        // update provider network status
×
474
        var needUpdate bool
×
475
        newPn := pn.DeepCopy()
×
476
        if slices.Contains(newPn.Status.ReadyNodes, node) {
×
477
                newPn.Status.ReadyNodes = util.RemoveString(newPn.Status.ReadyNodes, node)
×
478
                needUpdate = true
×
479
        }
×
480
        if newPn.Status.RemoveNodeConditions(node) {
×
481
                needUpdate = true
×
482
        }
×
483
        if needUpdate {
×
484
                var err error
×
485
                newPn, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().UpdateStatus(context.Background(), newPn, metav1.UpdateOptions{})
×
486
                if err != nil {
×
487
                        klog.Errorf("failed to update status of provider network %s: %v", pn.Name, err)
×
488
                        return err
×
489
                }
×
490
        }
491

492
        // update provider network spec
493
        pn, newPn = newPn, nil
×
494
        if excludeNodes := util.RemoveString(pn.Spec.ExcludeNodes, node); len(excludeNodes) != len(pn.Spec.ExcludeNodes) {
×
495
                newPn = pn.DeepCopy()
×
496
                newPn.Spec.ExcludeNodes = excludeNodes
×
497
        }
×
498

499
        var changed bool
×
500
        customInterfaces := make([]kubeovnv1.CustomInterface, 0, len(pn.Spec.CustomInterfaces))
×
501
        for _, ci := range pn.Spec.CustomInterfaces {
×
502
                nodes := util.RemoveString(ci.Nodes, node)
×
503
                if !changed {
×
504
                        changed = len(nodes) == 0 || len(nodes) != len(ci.Nodes)
×
505
                }
×
506
                if len(nodes) != 0 {
×
507
                        customInterfaces = append(customInterfaces, kubeovnv1.CustomInterface{Interface: ci.Interface, Nodes: nodes})
×
508
                }
×
509
        }
510
        if changed {
×
511
                newPn = pn.DeepCopy()
×
512
                newPn.Spec.CustomInterfaces = customInterfaces
×
513
        }
×
514
        if newPn != nil {
×
515
                if _, err := c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().Update(context.Background(), newPn, metav1.UpdateOptions{}); err != nil {
×
516
                        klog.Errorf("failed to update provider network %s: %v", pn.Name, err)
×
517
                        return err
×
518
                }
×
519
        }
520

521
        return nil
×
522
}
523

524
func (c *Controller) handleUpdateNode(key string) error {
×
525
        c.nodeKeyMutex.LockKey(key)
×
526
        defer func() { _ = c.nodeKeyMutex.UnlockKey(key) }()
×
527
        klog.Infof("handle update node %s", key)
×
528

×
529
        node, err := c.nodesLister.Get(key)
×
530
        if err != nil {
×
531
                if k8serrors.IsNotFound(err) {
×
532
                        return nil
×
533
                }
×
534
                klog.Errorf("failed to get node %s: %v", key, err)
×
535
                return err
×
536
        }
537

538
        if err = c.handleNodeAnnotationsForProviderNetworks(node); err != nil {
×
539
                klog.Errorf("failed to handle annotations of node %s for provider networks: %v", node.Name, err)
×
540
                return err
×
541
        }
×
542

543
        subnets, err := c.subnetsLister.List(labels.Everything())
×
544
        if err != nil {
×
545
                klog.Errorf("failed to get subnets %v", err)
×
546
                return err
×
547
        }
×
548

549
        if err := c.UpdateChassisTag(node); err != nil {
×
550
                klog.Errorf("failed to update chassis tag for node %s: %v", node.Name, err)
×
551
                return err
×
552
        }
×
553
        if err := c.retryDelDupChassis(util.ChassisRetryMaxTimes, util.ChassisControllerRetryInterval, c.cleanDuplicatedChassis, node); err != nil {
×
554
                klog.Errorf("failed to clean duplicated chassis for node %s: %v", node.Name, err)
×
555
                return err
×
556
        }
×
557

558
        for _, cachedSubnet := range subnets {
×
559
                subnet := cachedSubnet.DeepCopy()
×
560
                if util.GatewayContains(subnet.Spec.GatewayNode, node.Name) {
×
561
                        if err := c.reconcileOvnDefaultVpcRoute(subnet); err != nil {
×
562
                                klog.Error(err)
×
563
                                return err
×
564
                        }
×
565
                }
566
        }
567

568
        return nil
×
569
}
570

571
func (c *Controller) checkSubnetGateway() {
×
572
        if err := c.checkSubnetGatewayNode(); err != nil {
×
573
                klog.Errorf("failed to check subnet gateway node: %v", err)
×
574
        }
×
575
}
576

577
func (c *Controller) checkSubnetGatewayNode() error {
×
578
        klog.V(3).Infoln("start to check subnet gateway node")
×
579
        subnetList, err := c.subnetsLister.List(labels.Everything())
×
580
        if err != nil {
×
581
                klog.Errorf("failed to list subnets: %v", err)
×
582
                return err
×
583
        }
×
584
        nodes, err := c.nodesLister.List(labels.Everything())
×
585
        if err != nil {
×
586
                klog.Errorf("failed to list nodes: %v", err)
×
587
                return err
×
588
        }
×
589

590
        for _, subnet := range subnetList {
×
591
                if (subnet.Spec.Vlan != "" && (subnet.Spec.U2OInterconnection || !subnet.Spec.LogicalGateway)) ||
×
592
                        subnet.Spec.Vpc != c.config.ClusterRouter ||
×
593
                        subnet.Name == c.config.NodeSwitch ||
×
594
                        subnet.Spec.GatewayNode == "" ||
×
595
                        subnet.Spec.GatewayType != kubeovnv1.GWCentralizedType ||
×
596
                        !subnet.Spec.EnableEcmp {
×
597
                        continue
×
598
                }
599
                gwNodes := strings.Split(subnet.Spec.GatewayNode, ",")
×
600
                if len(gwNodes) < 2 {
×
601
                        continue
×
602
                }
603

604
                for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
×
605
                        nextHops, nameIPMap, err := c.getPolicyRouteParas(cidrBlock, util.GatewayRouterPolicyPriority)
×
606
                        if err != nil {
×
607
                                klog.Errorf("failed to get ecmp policy route paras for subnet %s: %v", subnet.Name, err)
×
608
                                continue
×
609
                        }
610
                        for _, node := range nodes {
×
611
                                ipStr := node.Annotations[util.IPAddressAnnotation]
×
612
                                for _, ip := range strings.Split(ipStr, ",") {
×
613
                                        if util.CheckProtocol(cidrBlock) != util.CheckProtocol(ip) {
×
614
                                                continue
×
615
                                        }
616

617
                                        exist := nameIPMap[node.Name] == ip
×
618
                                        if util.GatewayContains(subnet.Spec.GatewayNode, node.Name) {
×
619
                                                pinger, err := goping.NewPinger(ip)
×
620
                                                if err != nil {
×
621
                                                        return fmt.Errorf("failed to init pinger, %w", err)
×
622
                                                }
×
623
                                                pinger.SetPrivileged(true)
×
624

×
625
                                                count := 5
×
626
                                                pinger.Count = count
×
627
                                                pinger.Timeout = time.Duration(count) * time.Second
×
628
                                                pinger.Interval = 1 * time.Second
×
629

×
630
                                                var pingSucceeded bool
×
631
                                                pinger.OnRecv = func(_ *goping.Packet) {
×
632
                                                        pingSucceeded = true
×
633
                                                        pinger.Stop()
×
634
                                                }
×
635
                                                if err = pinger.Run(); err != nil {
×
636
                                                        klog.Errorf("failed to run pinger for destination %s: %v", ip, err)
×
637
                                                        return err
×
638
                                                }
×
639

640
                                                nodeIsReady := nodeReady(node)
×
641
                                                if !pingSucceeded || !nodeIsReady {
×
642
                                                        if exist {
×
643
                                                                if !pingSucceeded {
×
644
                                                                        klog.Warningf("failed to ping ovn0 ip %s on node %s", ip, node.Name)
×
645
                                                                }
×
646
                                                                if !nodeIsReady {
×
647
                                                                        klog.Warningf("node %s is not ready", node.Name)
×
648
                                                                }
×
649
                                                                klog.Warningf("delete ecmp policy route for node %s ip %s", node.Name, ip)
×
650
                                                                nextHops.Remove(ip)
×
651
                                                                delete(nameIPMap, node.Name)
×
652
                                                                klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
653
                                                                if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
654
                                                                        klog.Errorf("failed to delete ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
×
655
                                                                        return err
×
656
                                                                }
×
657
                                                        }
658
                                                } else {
×
659
                                                        klog.V(3).Infof("succeeded to ping ovn0 ip %s on node %s", ip, node.Name)
×
660
                                                        if !exist {
×
661
                                                                nextHops.Add(ip)
×
662
                                                                if nameIPMap == nil {
×
663
                                                                        nameIPMap = make(map[string]string, 1)
×
664
                                                                }
×
665
                                                                nameIPMap[node.Name] = ip
×
666
                                                                klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
667
                                                                if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
668
                                                                        klog.Errorf("failed to add ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
×
669
                                                                        return err
×
670
                                                                }
×
671
                                                        }
672
                                                }
673
                                        } else if exist {
×
674
                                                klog.Infof("subnet %s gateway nodes does not contain node %s, delete policy route for node ip %s", subnet.Name, node.Name, ip)
×
675
                                                nextHops.Remove(ip)
×
676
                                                delete(nameIPMap, node.Name)
×
677
                                                klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
678
                                                if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
679
                                                        klog.Errorf("failed to delete ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
×
680
                                                        return err
×
681
                                                }
×
682
                                        }
683
                                }
684
                        }
685
                }
686
        }
687
        return nil
×
688
}
689

690
func (c *Controller) cleanDuplicatedChassis(node *v1.Node) error {
×
691
        // if multi chassis has the same node name, delete all of them
×
692
        var err error
×
693
        if _, err := c.OVNSbClient.GetChassisByHost(node.Name); err == nil {
×
694
                return nil
×
695
        }
×
696
        klog.Errorf("failed to get chassis for node %s: %v", node.Name, err)
×
697
        if errors.Is(err, ovs.ErrOneNodeMultiChassis) {
×
698
                klog.Warningf("node %s has multiple chassis", node.Name)
×
699
                if err := c.OVNSbClient.DeleteChassisByHost(node.Name); err != nil {
×
700
                        klog.Errorf("failed to delete chassis for node %s: %v", node.Name, err)
×
701
                        return err
×
702
                }
×
703
        }
704
        return err
×
705
}
706

707
func (c *Controller) retryDelDupChassis(attempts, sleep int, f func(node *v1.Node) error, node *v1.Node) (err error) {
×
708
        i := 0
×
709
        for ; ; i++ {
×
710
                err = f(node)
×
711
                if err == nil {
×
712
                        return
×
713
                }
×
714
                klog.Errorf("failed to delete duplicated chassis for node %s: %v", node.Name, err)
×
715
                if i >= (attempts - 1) {
×
716
                        break
×
717
                }
718
                time.Sleep(time.Duration(sleep) * time.Second)
×
719
        }
720
        if i >= (attempts - 1) {
×
721
                errMsg := errors.New("exhausting all attempts")
×
722
                klog.Error(errMsg)
×
723
                return errMsg
×
724
        }
×
725
        klog.V(3).Infof("finish check chassis")
×
726
        return nil
×
727
}
728

729
func (c *Controller) fetchPodsOnNode(nodeName string, pods []*v1.Pod) ([]string, error) {
×
730
        ports := make([]string, 0, len(pods))
×
731
        for _, pod := range pods {
×
732
                if !isPodAlive(pod) || pod.Spec.HostNetwork || pod.Spec.NodeName != nodeName || pod.Annotations[util.LogicalRouterAnnotation] != c.config.ClusterRouter {
×
733
                        continue
×
734
                }
735
                podName := c.getNameByPod(pod)
×
736

×
737
                podNets, err := c.getPodKubeovnNets(pod)
×
738
                if err != nil {
×
739
                        klog.Errorf("failed to get pod nets %v", err)
×
740
                        return nil, err
×
741
                }
×
742

743
                for _, podNet := range podNets {
×
744
                        if !isOvnSubnet(podNet.Subnet) {
×
745
                                continue
×
746
                        }
747

748
                        if pod.Annotations != nil && pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)] == "true" {
×
749
                                ports = append(ports, ovs.PodNameToPortName(podName, pod.Namespace, podNet.ProviderName))
×
750
                        }
×
751
                }
752
        }
753
        return ports, nil
×
754
}
755

756
func (c *Controller) CheckNodePortGroup() {
×
757
        if err := c.checkAndUpdateNodePortGroup(); err != nil {
×
758
                klog.Errorf("check node port group status: %v", err)
×
759
        }
×
760
}
761

762
func (c *Controller) checkAndUpdateNodePortGroup() error {
×
763
        klog.V(3).Infoln("start to check node port-group status")
×
764
        np, _ := c.npsLister.List(labels.Everything())
×
765
        networkPolicyExists := len(np) != 0
×
766

×
767
        nodes, err := c.nodesLister.List(labels.Everything())
×
768
        if err != nil {
×
769
                klog.Errorf("list nodes: %v", err)
×
770
                return err
×
771
        }
×
772

773
        pods, err := c.podsLister.List(labels.Everything())
×
774
        if err != nil {
×
775
                klog.Errorf("list pods, %v", err)
×
776
                return err
×
777
        }
×
778

779
        for _, node := range nodes {
×
780
                // The port-group should already created when add node
×
781
                pgName := strings.ReplaceAll(node.Annotations[util.PortNameAnnotation], "-", ".")
×
782

×
783
                // use join IP only when no internal IP exists
×
784
                nodeIPv4, nodeIPv6 := util.GetNodeInternalIP(*node)
×
785
                joinIP := node.Annotations[util.IPAddressAnnotation]
×
786
                joinIPv4, joinIPv6 := util.SplitStringIP(joinIP)
×
787
                if nodeIPv4 == "" {
×
788
                        nodeIPv4 = joinIPv4
×
789
                }
×
790
                if nodeIPv6 == "" {
×
791
                        nodeIPv6 = joinIPv6
×
792
                }
×
793
                nodeIP := strings.Trim(fmt.Sprintf("%s,%s", nodeIPv4, nodeIPv6), ",")
×
794

×
795
                nodePorts, err := c.fetchPodsOnNode(node.Name, pods)
×
796
                if err != nil {
×
797
                        klog.Errorf("fetch pods for node %v: %v", node.Name, err)
×
798
                        return err
×
799
                }
×
800

801
                if err = c.OVNNbClient.PortGroupSetPorts(pgName, nodePorts); err != nil {
×
802
                        klog.Errorf("add ports to port group %s: %v", pgName, err)
×
803
                        return err
×
804
                }
×
805

806
                if networkPolicyExists {
×
807
                        if err := c.OVNNbClient.CreateNodeACL(pgName, nodeIP, joinIP); err != nil {
×
808
                                klog.Errorf("create node acl for node pg %s: %v", pgName, err)
×
809
                        }
×
810
                } else {
×
811
                        // clear all acl
×
812
                        if err = c.OVNNbClient.DeleteAcls(pgName, portGroupKey, "", nil); err != nil {
×
813
                                klog.Errorf("delete node acl for node pg %s: %v", pgName, err)
×
814
                        }
×
815
                }
816
        }
817

818
        return nil
×
819
}
820

821
func (c *Controller) UpdateChassisTag(node *v1.Node) error {
×
822
        annoChassisName := node.Annotations[util.ChassisAnnotation]
×
823
        if annoChassisName == "" {
×
824
                // kube-ovn-cni not ready to set chassis
×
825
                return nil
×
826
        }
×
827
        chassis, err := c.OVNSbClient.GetChassis(annoChassisName, true)
×
828
        if err != nil {
×
829
                klog.Errorf("failed to get chassis %s for node %s: %v", annoChassisName, node.Name, err)
×
830
                return err
×
831
        }
×
832
        if chassis == nil {
×
833
                klog.Infof("chassis %q not registered for node %s, do chassis gc once", annoChassisName, node.Name)
×
834
                // chassis name conflict, do GC
×
835
                if err = c.gcChassis(); err != nil {
×
836
                        klog.Errorf("failed to gc chassis: %v", err)
×
837
                        return err
×
838
                }
×
839
                err = &ErrChassisNotFound{Chassis: annoChassisName, Node: node.Name}
×
840
                klog.Error(err)
×
841
                return err
×
842
        }
843

844
        if chassis.ExternalIDs == nil || chassis.ExternalIDs["vendor"] != util.CniTypeName {
×
845
                klog.Infof("init tag %s for node %s chassis %s", util.CniTypeName, node.Name, chassis.Name)
×
846
                if err = c.OVNSbClient.UpdateChassisTag(chassis.Name, node.Name); err != nil {
×
847
                        err := fmt.Errorf("failed to init chassis tag, %w", err)
×
848
                        klog.Error(err)
×
849
                        return err
×
850
                }
×
851
        }
852
        return nil
×
853
}
854

855
func (c *Controller) addNodeGatewayStaticRoute() error {
×
856
        // If user not manage static route for default vpc, just add route about ovn-default to join
×
857
        if vpc, err := c.vpcsLister.Get(c.config.ClusterRouter); err != nil || vpc.Spec.StaticRoutes != nil {
×
858
                existRoute, err := c.OVNNbClient.ListLogicalRouterStaticRoutes(c.config.ClusterRouter, nil, nil, "", nil)
×
859
                if err != nil {
×
860
                        klog.Errorf("failed to get vpc %s static route list, %v", c.config.ClusterRouter, err)
×
861
                }
×
862
                if len(existRoute) != 0 {
×
863
                        klog.Infof("skip add static route for node gw")
×
864
                        return nil
×
865
                }
×
866
        }
867
        dstCidr := "0.0.0.0/0,::/0"
×
868
        for _, cidrBlock := range strings.Split(dstCidr, ",") {
×
869
                for _, nextHop := range strings.Split(c.config.NodeSwitchGateway, ",") {
×
870
                        if util.CheckProtocol(cidrBlock) != util.CheckProtocol(nextHop) {
×
871
                                continue
×
872
                        }
873

874
                        if err := c.addStaticRouteToVpc(
×
875
                                c.config.ClusterRouter,
×
876
                                &kubeovnv1.StaticRoute{
×
877
                                        Policy:     kubeovnv1.PolicyDst,
×
878
                                        CIDR:       cidrBlock,
×
879
                                        NextHopIP:  nextHop,
×
880
                                        RouteTable: util.MainRouteTable,
×
881
                                },
×
882
                        ); err != nil {
×
883
                                klog.Errorf("failed to add static route for node gw: %v", err)
×
884
                                return err
×
885
                        }
×
886
                }
887
        }
888
        return nil
×
889
}
890

891
func (c *Controller) getPolicyRouteParas(cidr string, priority int) (*strset.Set, map[string]string, error) {
×
892
        ipSuffix := "ip4"
×
893
        if util.CheckProtocol(cidr) == kubeovnv1.ProtocolIPv6 {
×
894
                ipSuffix = "ip6"
×
895
        }
×
896
        match := fmt.Sprintf("%s.src == %s", ipSuffix, cidr)
×
897
        policyList, err := c.OVNNbClient.GetLogicalRouterPolicy(c.config.ClusterRouter, priority, match, true)
×
898
        if err != nil {
×
899
                klog.Errorf("failed to get logical router policy: %v", err)
×
900
                return nil, nil, err
×
901
        }
×
902
        if len(policyList) == 0 {
×
903
                return strset.New(), map[string]string{}, nil
×
904
        }
×
905
        return strset.New(policyList[0].Nexthops...), policyList[0].ExternalIDs, nil
×
906
}
907

908
func (c *Controller) deletePolicyRouteForNode(nodeName, portName string) error {
×
909
        subnets, err := c.subnetsLister.List(labels.Everything())
×
910
        if err != nil {
×
911
                klog.Errorf("get subnets: %v", err)
×
912
                return err
×
913
        }
×
914

915
        addresses := c.ipam.GetPodAddress(portName)
×
916
        for _, addr := range addresses {
×
917
                if addr.IP == "" {
×
918
                        continue
×
919
                }
920
                klog.Infof("deleting logical router policy with nexthop %q from %s for node %s", addr.IP, c.config.ClusterRouter, nodeName)
×
921
                if err = c.OVNNbClient.DeleteLogicalRouterPolicyByNexthop(c.config.ClusterRouter, util.NodeRouterPolicyPriority, addr.IP); err != nil {
×
922
                        klog.Errorf("failed to delete logical router policy with nexthop %q from %s for node %s: %v", addr.IP, c.config.ClusterRouter, nodeName, err)
×
923
                        return err
×
924
                }
×
925
        }
926

927
        for _, subnet := range subnets {
×
928
                if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) || subnet.Spec.Vpc != c.config.ClusterRouter || subnet.Name == c.config.NodeSwitch {
×
929
                        continue
×
930
                }
931

932
                if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType {
×
933
                        pgName := getOverlaySubnetsPortGroupName(subnet.Name, nodeName)
×
934
                        if err = c.OVNNbClient.DeletePortGroup(pgName); err != nil {
×
935
                                klog.Errorf("delete port group for subnet %s and node %s: %v", subnet.Name, nodeName, err)
×
936
                                return err
×
937
                        }
×
938

939
                        klog.Infof("delete policy route for distributed subnet %s, node %s", subnet.Name, nodeName)
×
940
                        if err = c.deletePolicyRouteForDistributedSubnet(subnet, nodeName); err != nil {
×
941
                                klog.Errorf("delete policy route for subnet %s and node %s: %v", subnet.Name, nodeName, err)
×
942
                                return err
×
943
                        }
×
944
                }
945

946
                if subnet.Spec.GatewayType == kubeovnv1.GWCentralizedType {
×
947
                        if subnet.Spec.EnableEcmp {
×
948
                                for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
×
949
                                        nextHops, nameIPMap, err := c.getPolicyRouteParas(cidrBlock, util.GatewayRouterPolicyPriority)
×
950
                                        if err != nil {
×
951
                                                klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
×
952
                                                continue
×
953
                                        }
954

955
                                        exist := false
×
956
                                        if _, ok := nameIPMap[nodeName]; ok {
×
957
                                                exist = true
×
958
                                        }
×
959

960
                                        if exist {
×
961
                                                nextHops.Remove(nameIPMap[nodeName])
×
962
                                                delete(nameIPMap, nodeName)
×
963

×
964
                                                if nextHops.Size() == 0 {
×
965
                                                        klog.Infof("delete policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
966
                                                        if err := c.deletePolicyRouteForCentralizedSubnet(subnet); err != nil {
×
967
                                                                klog.Errorf("failed to delete policy route for centralized subnet %s, %v", subnet.Name, err)
×
968
                                                                return err
×
969
                                                        }
×
970
                                                } else {
×
971
                                                        klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
972
                                                        if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
973
                                                                klog.Errorf("failed to update policy route for subnet %s on node %s, %v", subnet.Name, nodeName, err)
×
974
                                                                return err
×
975
                                                        }
×
976
                                                }
977
                                        }
978
                                }
979
                        } else {
×
980
                                klog.Infof("reconcile policy route for centralized subnet %s", subnet.Name)
×
981
                                if err := c.reconcileDefaultCentralizedSubnetRouteInDefaultVpc(subnet); err != nil {
×
982
                                        klog.Errorf("failed to delete policy route for centralized subnet %s, %v", subnet.Name, err)
×
983
                                        return err
×
984
                                }
×
985
                        }
986
                }
987
        }
988
        return nil
×
989
}
990

991
func (c *Controller) addPolicyRouteForCentralizedSubnetOnNode(nodeName, nodeIP string) error {
×
992
        subnets, err := c.subnetsLister.List(labels.Everything())
×
993
        if err != nil {
×
994
                klog.Errorf("failed to get subnets %v", err)
×
995
                return err
×
996
        }
×
997

998
        for _, subnet := range subnets {
×
999
                if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) || subnet.Spec.Vpc != c.config.ClusterRouter || subnet.Name == c.config.NodeSwitch || subnet.Spec.GatewayType != kubeovnv1.GWCentralizedType {
×
1000
                        continue
×
1001
                }
1002

1003
                if subnet.Spec.EnableEcmp {
×
1004
                        if !util.GatewayContains(subnet.Spec.GatewayNode, nodeName) {
×
1005
                                continue
×
1006
                        }
1007

1008
                        for _, nextHop := range strings.Split(nodeIP, ",") {
×
1009
                                for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
×
1010
                                        if util.CheckProtocol(cidrBlock) != util.CheckProtocol(nextHop) {
×
1011
                                                continue
×
1012
                                        }
1013

1014
                                        nextHops, nameIPMap, err := c.getPolicyRouteParas(cidrBlock, util.GatewayRouterPolicyPriority)
×
1015
                                        if err != nil {
×
1016
                                                klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
×
1017
                                                continue
×
1018
                                        }
1019
                                        if nameIPMap[nodeName] == nextHop {
×
1020
                                                continue
×
1021
                                        }
1022

1023
                                        nextHops.Add(nextHop)
×
1024
                                        if nameIPMap == nil {
×
1025
                                                nameIPMap = make(map[string]string, 1)
×
1026
                                        }
×
1027
                                        nameIPMap[nodeName] = nextHop
×
1028
                                        klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
1029
                                        if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
1030
                                                klog.Errorf("failed to update policy route for subnet %s on node %s, %v", subnet.Name, nodeName, err)
×
1031
                                                return err
×
1032
                                        }
×
1033
                                }
1034
                        }
1035
                } else {
×
1036
                        if subnet.Status.ActivateGateway != nodeName {
×
1037
                                continue
×
1038
                        }
1039
                        klog.Infof("add policy route for centralized subnet %s, on node %s, ip %s", subnet.Name, nodeName, nodeIP)
×
1040
                        if err = c.addPolicyRouteForCentralizedSubnet(subnet, nodeName, nil, strings.Split(nodeIP, ",")); err != nil {
×
1041
                                klog.Errorf("failed to add active-backup policy route for centralized subnet %s: %v", subnet.Name, err)
×
1042
                                return err
×
1043
                        }
×
1044
                }
1045
        }
1046
        return nil
×
1047
}
1048

1049
func (c *Controller) addPolicyRouteForLocalDNSCacheOnNode(dnsIPs []string, nodePortName, nodeIP, nodeName string, af int) error {
×
1050
        if len(dnsIPs) == 0 {
×
1051
                return c.deletePolicyRouteForLocalDNSCacheOnNode(nodeName, af)
×
1052
        }
×
1053

1054
        var (
×
1055
                externalIDs = map[string]string{
×
1056
                        "vendor":          util.CniTypeName,
×
1057
                        "node":            nodeName,
×
1058
                        "address-family":  strconv.Itoa(af),
×
1059
                        "isLocalDnsCache": "true",
×
1060
                }
×
1061
                pgAs     = strings.ReplaceAll(fmt.Sprintf("%s_ip%d", nodePortName, af), "-", ".")
×
1062
                action   = kubeovnv1.PolicyRouteActionReroute
×
1063
                nextHops = []string{nodeIP}
×
1064
        )
×
1065
        matches := strset.NewWithSize(len(dnsIPs))
×
1066
        for _, ip := range dnsIPs {
×
1067
                matches.Add(fmt.Sprintf("ip%d.src == $%s && ip%d.dst == %s", af, pgAs, af, ip))
×
1068
        }
×
1069

1070
        policies, err := c.OVNNbClient.GetLogicalRouterPoliciesByExtID(c.config.ClusterRouter, "node", nodeName)
×
1071
        if err != nil {
×
1072
                klog.Errorf("failed to list logical router policies with external-ids:node = %q: %v", nodeName, err)
×
1073
                return err
×
1074
        }
×
1075

1076
        for _, policy := range policies {
×
1077
                if len(policy.ExternalIDs) == 0 || policy.ExternalIDs["vendor"] != util.CniTypeName || policy.ExternalIDs["isLocalDnsCache"] != "true" {
×
1078
                        continue
×
1079
                }
1080
                if policy.Priority == util.NodeRouterPolicyPriority && policy.Action == string(action) && slices.Equal(policy.Nexthops, nextHops) && matches.Has(policy.Match) {
×
1081
                        matches.Remove(policy.Match)
×
1082
                        continue
×
1083
                }
1084
                // delete unused policy router policy
1085
                klog.Infof("deleting logical router policy by UUID %s", policy.UUID)
×
1086
                if err = c.OVNNbClient.DeleteLogicalRouterPolicyByUUID(c.config.ClusterRouter, policy.UUID); err != nil {
×
1087
                        klog.Errorf("failed to delete logical router policy by UUID %s: %v", policy.UUID, err)
×
1088
                        return err
×
1089
                }
×
1090
        }
1091

1092
        for _, match := range matches.List() {
×
1093
                klog.Infof("add node local dns cache policy route for router %s: match %q, action %q, nexthop %q, externalID %v", c.config.ClusterRouter, match, action, nodeIP, externalIDs)
×
1094
                if err := c.addPolicyRouteToVpc(
×
1095
                        c.config.ClusterRouter,
×
1096
                        &kubeovnv1.PolicyRoute{
×
1097
                                Priority:  util.NodeRouterPolicyPriority,
×
1098
                                Match:     match,
×
1099
                                Action:    action,
×
1100
                                NextHopIP: nodeIP,
×
1101
                        },
×
1102
                        externalIDs,
×
1103
                ); err != nil {
×
1104
                        klog.Errorf("failed to add logical router policy for node %s: %v", nodeName, err)
×
1105
                        return err
×
1106
                }
×
1107
        }
1108

1109
        return nil
×
1110
}
1111

1112
func (c *Controller) deletePolicyRouteForLocalDNSCacheOnNode(nodeName string, af int) error {
×
1113
        policies, err := c.OVNNbClient.ListLogicalRouterPolicies(c.config.ClusterRouter, -1, map[string]string{
×
1114
                "vendor":          util.CniTypeName,
×
1115
                "node":            nodeName,
×
1116
                "address-family":  strconv.Itoa(af),
×
1117
                "isLocalDnsCache": "true",
×
1118
        }, true)
×
1119
        if err != nil {
×
1120
                klog.Errorf("failed to list logical router policies: %v", err)
×
1121
                return err
×
1122
        }
×
1123
        if len(policies) == 0 {
×
1124
                return nil
×
1125
        }
×
1126

1127
        for _, policy := range policies {
×
1128
                klog.Infof("delete node local dns cache policy route for router %s with match %s", c.config.ClusterRouter, policy.Match)
×
1129

×
1130
                if err := c.OVNNbClient.DeleteLogicalRouterPolicyByUUID(c.config.ClusterRouter, policy.UUID); err != nil {
×
1131
                        klog.Errorf("failed to delete policy route for node local dns in router %s with match %s: %v", c.config.ClusterRouter, policy.Match, err)
×
1132
                        return err
×
1133
                }
×
1134
        }
1135
        return nil
×
1136
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc