• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 27750934832

18 Jun 2026 09:43AM UTC coverage: 25.584% (+0.2%) from 25.424%
27750934832

Pull #6901

github

zhangzujian
fix: reconcile vpc bfd ha chassis on node changes

Signed-off-by: zhangzujian <zhangzujian.7@gmail.com>
Pull Request #6901: fix: reconcile VPC BFD HA chassis on node changes

52 of 63 new or added lines in 2 files covered. (82.54%)

1 existing line in 1 file now uncovered.

14545 of 56853 relevant lines covered (25.58%)

0.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

18.42
/pkg/controller/node.go
1
package controller
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "maps"
8
        "slices"
9
        "strconv"
10
        "strings"
11
        "time"
12

13
        goping "github.com/prometheus-community/pro-bing"
14
        "github.com/scylladb/go-set/strset"
15
        v1 "k8s.io/api/core/v1"
16
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
17
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
        "k8s.io/apimachinery/pkg/labels"
19
        "k8s.io/apimachinery/pkg/types"
20
        "k8s.io/client-go/tools/cache"
21
        "k8s.io/klog/v2"
22

23
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
24
        "github.com/kubeovn/kube-ovn/pkg/ovs"
25
        "github.com/kubeovn/kube-ovn/pkg/util"
26
)
27

28
type ErrChassisNotFound struct {
29
        Chassis string
30
        Node    string
31
}
32

33
func (e *ErrChassisNotFound) Error() string {
×
34
        return fmt.Sprintf("chassis %s not found for node %s", e.Chassis, e.Node)
×
35
}
×
36

37
func (c *Controller) enqueueAddNode(obj any) {
1✔
38
        node := obj.(*v1.Node)
1✔
39
        key := cache.MetaObjectToName(node).String()
1✔
40
        klog.V(3).Infof("enqueue add node %s", key)
1✔
41
        c.addNodeQueue.Add(key)
1✔
42
        c.enqueueVpcBFDPortByNodeChange(nil, node)
1✔
43
}
1✔
44

45
func nodeReady(node *v1.Node) bool {
1✔
46
        var ready, networkUnavailable bool
1✔
47
        for _, c := range node.Status.Conditions {
2✔
48
                switch c.Type {
1✔
49
                case v1.NodeReady:
1✔
50
                        ready = c.Status == v1.ConditionTrue
1✔
51
                case v1.NodeNetworkUnavailable:
×
52
                        networkUnavailable = c.Status == v1.ConditionTrue
×
53
                }
54
        }
55
        return ready && !networkUnavailable
1✔
56
}
57

58
func kubeOvnAnnotationsChanged(oldAnnotations, newAnnotations map[string]string) bool {
1✔
59
        filterKubeOvnAnnotations := func(annotations map[string]string) map[string]string {
2✔
60
                filtered := make(map[string]string)
1✔
61
                for key, value := range annotations {
2✔
62
                        if strings.Contains(key, ".kubernetes.io/") {
2✔
63
                                filtered[key] = value
1✔
64
                        }
1✔
65
                }
66
                return filtered
1✔
67
        }
68

69
        return !maps.Equal(filterKubeOvnAnnotations(oldAnnotations), filterKubeOvnAnnotations(newAnnotations))
1✔
70
}
71

72
func (c *Controller) listVpcBFDPorts() ([]*kubeovnv1.Vpc, error) {
1✔
73
        vpcs, err := c.vpcsLister.List(labels.Everything())
1✔
74
        if err != nil {
1✔
NEW
75
                return nil, err
×
NEW
76
        }
×
77

78
        bfdVpcs := make([]*kubeovnv1.Vpc, 0, len(vpcs))
1✔
79
        for _, vpc := range vpcs {
2✔
80
                if vpc.Labels != nil && vpc.Labels[util.VpcExternalLabel] == "true" {
1✔
NEW
81
                        continue
×
82
                }
83
                if !vpc.Spec.BFDPort.IsEnabled() {
1✔
NEW
84
                        continue
×
85
                }
86
                bfdVpcs = append(bfdVpcs, vpc)
1✔
87
        }
88
        return bfdVpcs, nil
1✔
89
}
90

91
func (c *Controller) enqueueVpcBFDPortByNodeChange(oldNode, newNode *v1.Node) {
1✔
92
        vpcs, err := c.listVpcBFDPorts()
1✔
93
        if err != nil {
1✔
NEW
94
                klog.Errorf("failed to list VPC BFD ports for node change: %v", err)
×
NEW
95
                return
×
NEW
96
        }
×
97

98
        for _, vpc := range vpcs {
2✔
99
                selector := labels.Everything()
1✔
100
                if vpc.Spec.BFDPort.NodeSelector != nil {
2✔
101
                        selector, err = metav1.LabelSelectorAsSelector(vpc.Spec.BFDPort.NodeSelector)
1✔
102
                        if err != nil {
1✔
NEW
103
                                klog.Errorf("failed to parse BFD port node selector of vpc %s: %v", vpc.Name, err)
×
NEW
104
                                c.addOrUpdateVpcQueue.Add(vpc.Name)
×
NEW
105
                                continue
×
106
                        }
107
                }
108

109
                oldMatched := oldNode != nil && selector.Matches(labels.Set(oldNode.Labels))
1✔
110
                newMatched := newNode != nil && selector.Matches(labels.Set(newNode.Labels))
1✔
111
                nodeReadyChanged := oldNode != nil && newNode != nil && oldMatched && newMatched && nodeReady(oldNode) != nodeReady(newNode)
1✔
112
                if oldMatched != newMatched || nodeReadyChanged {
2✔
113
                        klog.V(3).Infof("enqueue update vpc %s for BFD port triggered by node change", vpc.Name)
1✔
114
                        c.addOrUpdateVpcQueue.Add(vpc.Name)
1✔
115
                }
1✔
116
        }
117
}
118

119
func (c *Controller) enqueueUpdateNode(oldObj, newObj any) {
1✔
120
        oldNode := oldObj.(*v1.Node)
1✔
121
        newNode := newObj.(*v1.Node)
1✔
122
        nodeReadyChanged := nodeReady(oldNode) != nodeReady(newNode)
1✔
123
        nodeLabelsChanged := !maps.Equal(oldNode.Labels, newNode.Labels)
1✔
124

1✔
125
        key := cache.MetaObjectToName(newNode).String()
1✔
126
        if nodeReadyChanged || kubeOvnAnnotationsChanged(oldNode.Annotations, newNode.Annotations) {
2✔
127
                if len(newNode.Annotations) == 0 || newNode.Annotations[util.AllocatedAnnotation] != "true" {
2✔
128
                        klog.V(3).Infof("enqueue add node %s", key)
1✔
129
                        c.addNodeQueue.Add(key)
1✔
130
                } else {
1✔
131
                        klog.V(3).Infof("enqueue update node %s", key)
×
132
                        c.updateNodeQueue.Add(key)
×
133
                }
×
134
        } else if nodeLabelsChanged && newNode.Annotations[util.AllocatedAnnotation] == "true" {
2✔
135
                klog.V(3).Infof("enqueue update node %s for label change", key)
1✔
136
                c.updateNodeQueue.Add(key)
1✔
137
        }
1✔
138
        if nodeReadyChanged || nodeLabelsChanged {
2✔
139
                c.enqueueVpcBFDPortByNodeChange(oldNode, newNode)
1✔
140
        }
1✔
141
}
142

143
func (c *Controller) enqueueDeleteNode(obj any) {
1✔
144
        var node *v1.Node
1✔
145
        switch t := obj.(type) {
1✔
146
        case *v1.Node:
×
147
                node = t
×
148
        case cache.DeletedFinalStateUnknown:
1✔
149
                n, ok := t.Obj.(*v1.Node)
1✔
150
                if !ok {
1✔
151
                        klog.Warningf("unexpected object type: %T", t.Obj)
×
152
                        return
×
153
                }
×
154
                node = n
1✔
155
        default:
×
156
                klog.Warningf("unexpected type: %T", obj)
×
157
                return
×
158
        }
159

160
        key := cache.MetaObjectToName(node).String()
1✔
161
        klog.V(3).Infof("enqueue delete node %s", key)
1✔
162
        c.deletingNodeObjMap.Store(key, node)
1✔
163
        c.deleteNodeQueue.Add(key)
1✔
164
        c.enqueueVpcBFDPortByNodeChange(node, nil)
1✔
165
}
166

167
func nodeUnderlayAddressSetName(node string, af int) string {
×
168
        return fmt.Sprintf("node_%s_underlay_v%d", strings.ReplaceAll(node, "-", "_"), af)
×
169
}
×
170

171
func (c *Controller) handleAddNode(key string) error {
×
172
        c.nodeKeyMutex.LockKey(key)
×
173
        defer func() { _ = c.nodeKeyMutex.UnlockKey(key) }()
×
174

175
        cachedNode, err := c.nodesLister.Get(key)
×
176
        if err != nil {
×
177
                if k8serrors.IsNotFound(err) {
×
178
                        return nil
×
179
                }
×
180
                klog.Errorf("failed to get node %s: %v", key, err)
×
181
                return err
×
182
        }
183
        node := cachedNode.DeepCopy()
×
184
        klog.Infof("handle add node %s", node.Name)
×
185

×
186
        subnets, err := c.subnetsLister.List(labels.Everything())
×
187
        if err != nil {
×
188
                klog.Errorf("failed to list subnets: %v", err)
×
189
                return err
×
190
        }
×
191

192
        nodeIPv4, nodeIPv6 := util.GetNodeInternalIP(*node)
×
193
        for _, subnet := range subnets {
×
194
                if subnet.Spec.Vpc != c.config.ClusterRouter {
×
195
                        continue
×
196
                }
197

198
                v4, v6 := util.SplitStringIP(subnet.Spec.CIDRBlock)
×
199
                if subnet.Spec.Vlan == "" && (util.CIDRContainIP(v4, nodeIPv4) || util.CIDRContainIP(v6, nodeIPv6)) {
×
200
                        msg := fmt.Sprintf("internal IP address of node %s is in CIDR of subnet %s, this may result in network issues", node.Name, subnet.Name)
×
201
                        klog.Warning(msg)
×
202
                        c.recorder.Eventf(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: node.Name, UID: types.UID(node.Name)}}, v1.EventTypeWarning, "NodeAddressConflictWithSubnet", msg)
×
203
                        break
×
204
                }
205
        }
206

207
        if err = c.handleNodeAnnotationsForProviderNetworks(node); err != nil {
×
208
                klog.Errorf("failed to handle annotations of node %s for provider networks: %v", node.Name, err)
×
209
                return err
×
210
        }
×
211

212
        subnet, err := c.subnetsLister.Get(c.config.NodeSwitch)
×
213
        if err != nil {
×
214
                klog.Errorf("failed to get node subnet: %v", err)
×
215
                return err
×
216
        }
×
217

218
        var v4IP, v6IP, mac string
×
219

×
220
        portName := util.NodeLspName(key)
×
221
        if node.Annotations[util.AllocatedAnnotation] == "true" && node.Annotations[util.IPAddressAnnotation] != "" && node.Annotations[util.MacAddressAnnotation] != "" {
×
222
                macStr := node.Annotations[util.MacAddressAnnotation]
×
223
                v4IP, v6IP, mac, err = c.ipam.GetStaticAddress(portName, portName, node.Annotations[util.IPAddressAnnotation],
×
224
                        &macStr, node.Annotations[util.LogicalSwitchAnnotation], true)
×
225
                if err != nil {
×
226
                        klog.Errorf("failed to alloc static ip addrs for node %v: %v", node.Name, err)
×
227
                        return err
×
228
                }
×
229
        } else {
×
230
                v4IP, v6IP, mac, err = c.ipam.GetRandomAddress(portName, portName, nil, c.config.NodeSwitch, "", nil, true)
×
231
                if err != nil {
×
232
                        klog.Errorf("failed to alloc random ip addrs for node %v: %v", node.Name, err)
×
233
                        return err
×
234
                }
×
235

236
                // Clean up potentially existing logical switch ports to avoid leftover issues from previous failed configurations
237
                if err := c.OVNNbClient.DeleteLogicalSwitchPort(portName); err != nil {
×
238
                        klog.Errorf("failed to delete stale logical switch port %s: %v", portName, err)
×
239
                        return err
×
240
                }
×
241
                klog.Infof("deleted stale logical switch port %s", portName)
×
242
        }
243

244
        ipStr := util.GetStringIP(v4IP, v6IP)
×
245
        if err := c.OVNNbClient.CreateBareLogicalSwitchPort(c.config.NodeSwitch, portName, ipStr, mac); err != nil {
×
246
                klog.Errorf("failed to create logical switch port %s: %v", portName, err)
×
247
                return err
×
248
        }
×
249

250
        for ip := range strings.SplitSeq(ipStr, ",") {
×
251
                if ip == "" {
×
252
                        continue
×
253
                }
254

255
                nodeIP, af := nodeIPv4, 4
×
256
                protocol := util.CheckProtocol(ip)
×
257
                if protocol == kubeovnv1.ProtocolIPv6 {
×
258
                        nodeIP, af = nodeIPv6, 6
×
259
                }
×
260
                if nodeIP != "" {
×
261
                        var (
×
262
                                match       = fmt.Sprintf("ip%d.dst == %s", af, nodeIP)
×
263
                                action      = kubeovnv1.PolicyRouteActionReroute
×
264
                                externalIDs = map[string]string{
×
265
                                        "vendor":         util.CniTypeName,
×
266
                                        "node":           node.Name,
×
267
                                        "address-family": strconv.Itoa(af),
×
268
                                }
×
269
                        )
×
270
                        klog.Infof("add policy route for router: %s, match %s, action %s, nexthop %s, externalID %v", c.config.ClusterRouter, match, action, ip, externalIDs)
×
271
                        if err = c.addPolicyRouteToVpc(
×
272
                                c.config.ClusterRouter,
×
273
                                &kubeovnv1.PolicyRoute{
×
274
                                        Priority:  util.NodeRouterPolicyPriority,
×
275
                                        Match:     match,
×
276
                                        Action:    action,
×
277
                                        NextHopIP: ip,
×
278
                                },
×
279
                                externalIDs,
×
280
                        ); err != nil {
×
281
                                klog.Errorf("failed to add logical router policy for node %s: %v", node.Name, err)
×
282
                                return err
×
283
                        }
×
284

285
                        dnsIPs := make([]string, 0, len(c.config.NodeLocalDNSIPs))
×
286
                        for _, ip := range c.config.NodeLocalDNSIPs {
×
287
                                if util.CheckProtocol(ip) == protocol {
×
288
                                        dnsIPs = append(dnsIPs, ip)
×
289
                                }
×
290
                        }
291

292
                        if err = c.addPolicyRouteForLocalDNSCacheOnNode(dnsIPs, portName, ip, node.Name, af); err != nil {
×
293
                                klog.Errorf("failed to add policy route for node %s: %v", node.Name, err)
×
294
                                return err
×
295
                        }
×
296
                }
297
        }
298

299
        patch := util.KVPatch{
×
300
                util.IPAddressAnnotation:     ipStr,
×
301
                util.MacAddressAnnotation:    mac,
×
302
                util.CidrAnnotation:          subnet.Spec.CIDRBlock,
×
303
                util.GatewayAnnotation:       subnet.Spec.Gateway,
×
304
                util.LogicalSwitchAnnotation: c.config.NodeSwitch,
×
305
                util.AllocatedAnnotation:     "true",
×
306
                util.PortNameAnnotation:      portName,
×
307
        }
×
308
        if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
309
                klog.Errorf("failed to update annotations of node %s: %v", node.Name, err)
×
310
                return err
×
311
        }
×
312

313
        if err := c.createOrUpdateIPCR("", "", ipStr, mac, c.config.NodeSwitch, "", node.Name, ""); err != nil {
×
314
                klog.Errorf("failed to create or update IPs %s: %v", portName, err)
×
315
                return err
×
316
        }
×
317

318
        for _, subnet := range subnets {
×
319
                if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) || subnet.Spec.Vpc != c.config.ClusterRouter || subnet.Name == c.config.NodeSwitch || subnet.Spec.GatewayType != kubeovnv1.GWDistributedType {
×
320
                        continue
×
321
                }
322
                if err = c.createPortGroupForDistributedSubnet(node, subnet); err != nil {
×
323
                        klog.Errorf("failed to create port group for node %s and subnet %s: %v", node.Name, subnet.Name, err)
×
324
                        return err
×
325
                }
×
326
        }
327
        c.distributedSubnetNeedSync.Store(true)
×
328

×
329
        // ovn acl doesn't support address_set name with '-', so replace '-' by '.'
×
330
        pgName := strings.ReplaceAll(portName, "-", ".")
×
331
        if err = c.OVNNbClient.CreatePortGroup(pgName, map[string]string{"node": node.Name, networkPolicyKey: "node" + "/" + key}); err != nil {
×
332
                klog.Errorf("create port group %s for node %s: %v", pgName, key, err)
×
333
                return err
×
334
        }
×
335

336
        if err := c.addPolicyRouteForCentralizedSubnetOnNode(node, ipStr); err != nil {
×
337
                klog.Errorf("failed to add policy route for node %s, %v", key, err)
×
338
                return err
×
339
        }
×
340

341
        if err := c.UpdateChassisTag(node); err != nil {
×
342
                klog.Errorf("failed to update chassis tag for node %s: %v", node.Name, err)
×
343
                return err
×
344
        }
×
345

346
        if err := c.retryDelDupChassis(util.ChassisRetryMaxTimes, util.ChassisControllerRetryInterval, c.cleanDuplicatedChassis, node); err != nil {
×
347
                klog.Errorf("failed to clean duplicated chassis for node %s: %v", node.Name, err)
×
348
                return err
×
349
        }
×
350
        return nil
×
351
}
352

353
func (c *Controller) handleNodeAnnotationsForProviderNetworks(node *v1.Node) error {
×
354
        providerNetworks, err := c.providerNetworksLister.List(labels.Everything())
×
355
        if err != nil && !k8serrors.IsNotFound(err) {
×
356
                klog.Errorf("failed to list provider networks: %v", err)
×
357
                return err
×
358
        }
×
359

360
        for _, pn := range providerNetworks {
×
361
                excludeAnno := fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name)
×
362
                interfaceAnno := fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)
×
363

×
364
                var newPn *kubeovnv1.ProviderNetwork
×
365
                excluded, err := util.IsNodeExcludedFromProviderNetwork(node, pn)
×
366
                if err != nil {
×
367
                        klog.Error(err)
×
368
                        return err
×
369
                }
×
370

371
                // Handle node annotation for exclusion (only when nodeSelector is not set)
372
                if !excluded && pn.Spec.NodeSelector == nil && len(node.Annotations) != 0 && node.Annotations[excludeAnno] == "true" {
×
373
                        newPn = pn.DeepCopy()
×
374
                        newPn.Spec.ExcludeNodes = append(newPn.Spec.ExcludeNodes, node.Name)
×
375
                        excluded = true
×
376
                }
×
377

378
                var customInterface string
×
379
                for _, v := range pn.Spec.CustomInterfaces {
×
380
                        if slices.Contains(v.Nodes, node.Name) {
×
381
                                customInterface = v.Interface
×
382
                                break
×
383
                        }
384
                }
385
                if customInterface == "" && len(node.Annotations) != 0 {
×
386
                        if customInterface = node.Annotations[interfaceAnno]; customInterface != "" {
×
387
                                if newPn == nil {
×
388
                                        newPn = pn.DeepCopy()
×
389
                                }
×
390
                                var index *int
×
391
                                for i := range newPn.Spec.CustomInterfaces {
×
392
                                        if newPn.Spec.CustomInterfaces[i].Interface == customInterface {
×
393
                                                index = &i
×
394
                                                break
×
395
                                        }
396
                                }
397
                                if index != nil {
×
398
                                        newPn.Spec.CustomInterfaces[*index].Nodes = append(newPn.Spec.CustomInterfaces[*index].Nodes, node.Name)
×
399
                                } else {
×
400
                                        ci := kubeovnv1.CustomInterface{Interface: customInterface, Nodes: []string{node.Name}}
×
401
                                        newPn.Spec.CustomInterfaces = append(newPn.Spec.CustomInterfaces, ci)
×
402
                                }
×
403
                        }
404
                }
405

406
                if newPn != nil {
×
407
                        if newPn, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().Update(context.Background(), newPn, metav1.UpdateOptions{}); err != nil {
×
408
                                klog.Errorf("failed to update provider network %s: %v", pn.Name, err)
×
409
                                return err
×
410
                        }
×
411
                }
412

413
                if len(node.Annotations) != 0 {
×
414
                        patch := util.KVPatch{excludeAnno: nil, interfaceAnno: nil}
×
415
                        if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
416
                                klog.Errorf("failed to patch node %s: %v", node.Name, err)
×
417
                                return err
×
418
                        }
×
419
                }
420

421
                if excluded {
×
422
                        if newPn == nil {
×
423
                                newPn = pn.DeepCopy()
×
424
                        } else {
×
425
                                newPn = newPn.DeepCopy()
×
426
                        }
×
427

428
                        if newPn.Status.EnsureNodeStandardConditions(node.Name) {
×
429
                                _, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().UpdateStatus(context.Background(), newPn, metav1.UpdateOptions{})
×
430
                                if err != nil {
×
431
                                        klog.Errorf("failed to update status of provider network %s: %v", pn.Name, err)
×
432
                                        return err
×
433
                                }
×
434
                        }
435
                }
436
        }
437

438
        return nil
×
439
}
440

441
func (c *Controller) handleDeleteNode(key string) (err error) {
×
442
        c.nodeKeyMutex.LockKey(key)
×
443
        defer func() {
×
444
                _ = c.nodeKeyMutex.UnlockKey(key)
×
445
                if err == nil {
×
446
                        c.deletingNodeObjMap.Delete(key)
×
447
                }
×
448
        }()
449
        klog.Infof("handle delete node %s", key)
×
450

×
451
        node, ok := c.deletingNodeObjMap.Load(key)
×
452
        if !ok {
×
453
                return nil
×
454
        }
×
455
        n, _ := c.nodesLister.Get(key)
×
456
        if n != nil && n.UID != node.UID {
×
457
                klog.Warningf("Node %s is adding, skip the node delete handler, but it may leave some gc resources behind", key)
×
458
                return nil
×
459
        }
×
460
        return c.deleteNode(key)
×
461
}
462

463
func (c *Controller) deleteNode(key string) error {
×
464
        portName := util.NodeLspName(key)
×
465
        klog.Infof("delete logical switch port %s", portName)
×
466
        if err := c.OVNNbClient.DeleteLogicalSwitchPort(portName); err != nil {
×
467
                klog.Errorf("failed to delete node switch port %s: %v", portName, err)
×
468
                return err
×
469
        }
×
470
        if err := c.OVNSbClient.DeleteChassisByHost(key); err != nil {
×
471
                klog.Errorf("failed to delete chassis for node %s: %v", key, err)
×
472
                return err
×
473
        }
×
474

475
        for _, af := range [...]int{4, 6} {
×
476
                if err := c.deletePolicyRouteForLocalDNSCacheOnNode(key, af); err != nil {
×
477
                        klog.Error(err)
×
478
                        return err
×
479
                }
×
480
        }
481

482
        // ovn acl doesn't support address_set name with '-', so replace '-' by '.'
483
        pgName := strings.ReplaceAll(portName, "-", ".")
×
484
        if err := c.OVNNbClient.DeletePortGroup(pgName); err != nil {
×
485
                klog.Errorf("delete port group %s for node: %v", portName, err)
×
486
                return err
×
487
        }
×
488

489
        if err := c.deletePolicyRouteForNode(key, portName); err != nil {
×
490
                klog.Errorf("failed to delete policy route for node %s: %v", key, err)
×
491
                return err
×
492
        }
×
493

494
        if err := c.OVNNbClient.DeleteAddressSet(nodeUnderlayAddressSetName(key, 4)); err != nil {
×
495
                klog.Errorf("failed to delete address set for node %s: %v", key, err)
×
496
                return err
×
497
        }
×
498
        if err := c.OVNNbClient.DeleteAddressSet(nodeUnderlayAddressSetName(key, 6)); err != nil {
×
499
                klog.Errorf("failed to delete address set for node %s: %v", key, err)
×
500
                return err
×
501
        }
×
502

503
        klog.Infof("release node port %s", portName)
×
504
        c.ipam.ReleaseAddressByPod(portName, c.config.NodeSwitch)
×
505

×
506
        providerNetworks, err := c.providerNetworksLister.List(labels.Everything())
×
507
        if err != nil && !k8serrors.IsNotFound(err) {
×
508
                klog.Errorf("failed to list provider networks: %v", err)
×
509
                return err
×
510
        }
×
511

512
        for _, pn := range providerNetworks {
×
513
                if err = c.updateProviderNetworkForNodeDeletion(pn, key); err != nil {
×
514
                        klog.Error(err)
×
515
                        return err
×
516
                }
×
517
        }
518
        klog.Infof("delete node ip %s", portName)
×
519
        if err = c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), portName, metav1.DeleteOptions{}); err != nil && !k8serrors.IsNotFound(err) {
×
520
                return err
×
521
        }
×
522

523
        return nil
×
524
}
525

526
func (c *Controller) updateProviderNetworkForNodeDeletion(pn *kubeovnv1.ProviderNetwork, node string) error {
×
527
        // update provider network status
×
528
        var needUpdate bool
×
529
        newPn := pn.DeepCopy()
×
530
        if slices.Contains(newPn.Status.ReadyNodes, node) {
×
531
                newPn.Status.ReadyNodes = util.RemoveString(newPn.Status.ReadyNodes, node)
×
532
                needUpdate = true
×
533
        }
×
534
        if newPn.Status.RemoveNodeConditions(node) {
×
535
                needUpdate = true
×
536
        }
×
537
        if needUpdate {
×
538
                var err error
×
539
                newPn, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().UpdateStatus(context.Background(), newPn, metav1.UpdateOptions{})
×
540
                if err != nil {
×
541
                        klog.Errorf("failed to update status of provider network %s: %v", pn.Name, err)
×
542
                        return err
×
543
                }
×
544
        }
545

546
        // update provider network spec
547
        pn, newPn = newPn, nil
×
548
        if excludeNodes := util.RemoveString(pn.Spec.ExcludeNodes, node); len(excludeNodes) != len(pn.Spec.ExcludeNodes) {
×
549
                newPn = pn.DeepCopy()
×
550
                newPn.Spec.ExcludeNodes = excludeNodes
×
551
        }
×
552

553
        var changed bool
×
554
        customInterfaces := make([]kubeovnv1.CustomInterface, 0, len(pn.Spec.CustomInterfaces))
×
555
        for _, ci := range pn.Spec.CustomInterfaces {
×
556
                nodes := util.RemoveString(ci.Nodes, node)
×
557
                if !changed {
×
558
                        changed = len(nodes) == 0 || len(nodes) != len(ci.Nodes)
×
559
                }
×
560
                if len(nodes) != 0 {
×
561
                        customInterfaces = append(customInterfaces, kubeovnv1.CustomInterface{Interface: ci.Interface, Nodes: nodes})
×
562
                }
×
563
        }
564
        if changed {
×
565
                newPn = pn.DeepCopy()
×
566
                newPn.Spec.CustomInterfaces = customInterfaces
×
567
        }
×
568
        if newPn != nil {
×
569
                if _, err := c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().Update(context.Background(), newPn, metav1.UpdateOptions{}); err != nil {
×
570
                        klog.Errorf("failed to update provider network %s: %v", pn.Name, err)
×
571
                        return err
×
572
                }
×
573
        }
574

575
        return nil
×
576
}
577

578
func (c *Controller) handleUpdateNode(key string) error {
×
579
        c.nodeKeyMutex.LockKey(key)
×
580
        defer func() { _ = c.nodeKeyMutex.UnlockKey(key) }()
×
581
        klog.Infof("handle update node %s", key)
×
582

×
583
        node, err := c.nodesLister.Get(key)
×
584
        if err != nil {
×
585
                if k8serrors.IsNotFound(err) {
×
586
                        return nil
×
587
                }
×
588
                klog.Errorf("failed to get node %s: %v", key, err)
×
589
                return err
×
590
        }
591

592
        if err = c.handleNodeAnnotationsForProviderNetworks(node); err != nil {
×
593
                klog.Errorf("failed to handle annotations of node %s for provider networks: %v", node.Name, err)
×
594
                return err
×
595
        }
×
596

597
        subnets, err := c.subnetsLister.List(labels.Everything())
×
598
        if err != nil {
×
599
                klog.Errorf("failed to get subnets %v", err)
×
600
                return err
×
601
        }
×
602

603
        if err := c.UpdateChassisTag(node); err != nil {
×
604
                klog.Errorf("failed to update chassis tag for node %s: %v", node.Name, err)
×
605
                return err
×
606
        }
×
607
        if err := c.retryDelDupChassis(util.ChassisRetryMaxTimes, util.ChassisControllerRetryInterval, c.cleanDuplicatedChassis, node); err != nil {
×
608
                klog.Errorf("failed to clean duplicated chassis for node %s: %v", node.Name, err)
×
609
                return err
×
610
        }
×
611

612
        c.distributedSubnetNeedSync.Store(true)
×
613

×
614
        for _, cachedSubnet := range subnets {
×
615
                if cachedSubnet.Spec.GatewayType != kubeovnv1.GWCentralizedType {
×
616
                        continue
×
617
                }
618

619
                // For subnets using GatewayNodeSelectors, always trigger reconciliation
620
                // when node labels change, since the node might have been added or removed
621
                // from the gateway list
622
                if cachedSubnet.Spec.GatewayNode == "" && len(cachedSubnet.Spec.GatewayNodeSelectors) > 0 {
×
623
                        c.addOrUpdateSubnetQueue.Add(cachedSubnet.Name)
×
624
                        continue
×
625
                }
626

627
                if util.GatewayContains(cachedSubnet.Spec.GatewayNode, node.Name) {
×
628
                        c.subnetKeyMutex.LockKey(cachedSubnet.Name)
×
629
                        err = func() error {
×
630
                                defer func() { _ = c.subnetKeyMutex.UnlockKey(cachedSubnet.Name) }()
×
631
                                return c.reconcileOvnDefaultVpcRoute(cachedSubnet.DeepCopy())
×
632
                        }()
633
                        if err != nil {
×
634
                                klog.Error(err)
×
635
                                return err
×
636
                        }
×
637
                }
638
        }
639

640
        return nil
×
641
}
642

643
func (c *Controller) syncDistributedSubnetRoutes() {
×
644
        if !c.distributedSubnetNeedSync.Swap(false) {
×
645
                return
×
646
        }
×
647

648
        klog.V(3).Infoln("start to sync distributed subnet routes")
×
649
        subnets, err := c.subnetsLister.List(labels.Everything())
×
650
        if err != nil {
×
651
                klog.Errorf("failed to list subnets: %v", err)
×
652
                c.distributedSubnetNeedSync.Store(true)
×
653
                return
×
654
        }
×
655

656
        for _, subnet := range subnets {
×
657
                if subnet.Spec.Vpc != c.config.ClusterRouter ||
×
658
                        subnet.Name == c.config.NodeSwitch ||
×
659
                        subnet.Spec.GatewayType != kubeovnv1.GWDistributedType ||
×
660
                        (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) {
×
661
                        continue
×
662
                }
663
                if err := c.reconcileDistributedSubnetRouteInDefaultVpc(subnet); err != nil {
×
664
                        klog.Errorf("failed to reconcile distributed subnet %s route: %v", subnet.Name, err)
×
665
                        c.distributedSubnetNeedSync.Store(true)
×
666
                }
×
667
        }
668
}
669

670
func (c *Controller) checkSubnetGateway() {
×
671
        if err := c.checkSubnetGatewayNode(); err != nil {
×
672
                klog.Errorf("failed to check subnet gateway node: %v", err)
×
673
        }
×
674
}
675

676
func (c *Controller) checkSubnetGatewayNode() error {
×
677
        klog.V(3).Infoln("start to check subnet gateway node")
×
678
        subnetList, err := c.subnetsLister.List(labels.Everything())
×
679
        if err != nil {
×
680
                klog.Errorf("failed to list subnets: %v", err)
×
681
                return err
×
682
        }
×
683
        nodes, err := c.nodesLister.List(labels.Everything())
×
684
        if err != nil {
×
685
                klog.Errorf("failed to list nodes: %v", err)
×
686
                return err
×
687
        }
×
688

689
        for _, subnet := range subnetList {
×
690
                if (subnet.Spec.Vlan != "" && (subnet.Spec.U2OInterconnection || !subnet.Spec.LogicalGateway)) ||
×
691
                        subnet.Spec.Vpc != c.config.ClusterRouter ||
×
692
                        subnet.Name == c.config.NodeSwitch ||
×
693
                        subnet.Spec.GatewayNode == "" ||
×
694
                        subnet.Spec.GatewayType != kubeovnv1.GWCentralizedType ||
×
695
                        !subnet.Spec.EnableEcmp {
×
696
                        continue
×
697
                }
698
                gwNodes := strings.Split(subnet.Spec.GatewayNode, ",")
×
699
                if len(gwNodes) < 2 {
×
700
                        continue
×
701
                }
702

703
                for cidrBlock := range strings.SplitSeq(subnet.Spec.CIDRBlock, ",") {
×
704
                        skipCIDR := false
×
705
                        for _, node := range nodes {
×
706
                                if skipCIDR {
×
707
                                        break
×
708
                                }
709

710
                                ipStr := node.Annotations[util.IPAddressAnnotation]
×
711
                                for ip := range strings.SplitSeq(ipStr, ",") {
×
712
                                        if util.CheckProtocol(cidrBlock) != util.CheckProtocol(ip) {
×
713
                                                continue
×
714
                                        }
715

716
                                        isGateway := util.GatewayContains(subnet.Spec.GatewayNode, node.Name)
×
717
                                        pingSucceeded := false
×
718
                                        nodeIsReady := nodeReady(node)
×
719
                                        if isGateway {
×
720
                                                pinger, err := goping.NewPinger(ip)
×
721
                                                if err != nil {
×
722
                                                        return fmt.Errorf("failed to init pinger, %w", err)
×
723
                                                }
×
724
                                                pinger.SetPrivileged(true)
×
725

×
726
                                                count := 5
×
727
                                                pinger.Count = count
×
728
                                                pinger.Timeout = time.Duration(count) * time.Second
×
729
                                                pinger.Interval = 1 * time.Second
×
730

×
731
                                                pinger.OnRecv = func(_ *goping.Packet) {
×
732
                                                        pingSucceeded = true
×
733
                                                        pinger.Stop()
×
734
                                                }
×
735
                                                if err = pinger.Run(); err != nil {
×
736
                                                        klog.Errorf("failed to run pinger for destination %s: %v", ip, err)
×
737
                                                        return err
×
738
                                                }
×
739
                                                if pingSucceeded {
×
740
                                                        klog.V(3).Infof("succeeded to ping %s ip %s on node %s", util.NodeNic, ip, node.Name)
×
741
                                                }
×
742
                                        }
743

744
                                        err = func() error {
×
745
                                                c.subnetKeyMutex.LockKey(subnet.Name)
×
746
                                                defer func() { _ = c.subnetKeyMutex.UnlockKey(subnet.Name) }()
×
747

748
                                                nextHops, nameIPMap, err := c.getPolicyRouteParams(cidrBlock, util.GatewayRouterPolicyPriority)
×
749
                                                if err != nil {
×
750
                                                        klog.Errorf("failed to get ecmp policy route paras for subnet %s: %v", subnet.Name, err)
×
751
                                                        skipCIDR = true
×
752
                                                        return nil
×
753
                                                }
×
754

755
                                                exist := nameIPMap[node.Name] == ip
×
756
                                                if isGateway {
×
757
                                                        if !pingSucceeded || !nodeIsReady {
×
758
                                                                if exist {
×
759
                                                                        if !pingSucceeded {
×
760
                                                                                klog.Warningf("failed to ping %s ip %s on node %s", util.NodeNic, ip, node.Name)
×
761
                                                                        }
×
762
                                                                        if !nodeIsReady {
×
763
                                                                                klog.Warningf("node %s is not ready", node.Name)
×
764
                                                                        }
×
765
                                                                        klog.Warningf("delete ecmp policy route for node %s ip %s", node.Name, ip)
×
766
                                                                        nextHops.Remove(ip)
×
767
                                                                        delete(nameIPMap, node.Name)
×
768
                                                                        klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
769
                                                                        if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
770
                                                                                klog.Errorf("failed to delete ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
×
771
                                                                                return err
×
772
                                                                        }
×
773
                                                                }
774
                                                        } else if !exist {
×
775
                                                                nextHops.Add(ip)
×
776
                                                                if nameIPMap == nil {
×
777
                                                                        nameIPMap = make(map[string]string, 1)
×
778
                                                                }
×
779
                                                                nameIPMap[node.Name] = ip
×
780
                                                                klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
781
                                                                if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
782
                                                                        klog.Errorf("failed to add ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
×
783
                                                                        return err
×
784
                                                                }
×
785
                                                        }
786
                                                } else if exist {
×
787
                                                        klog.Infof("subnet %s gateway nodes does not contain node %s, delete policy route for node ip %s", subnet.Name, node.Name, ip)
×
788
                                                        nextHops.Remove(ip)
×
789
                                                        delete(nameIPMap, node.Name)
×
790
                                                        klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
791
                                                        if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
792
                                                                klog.Errorf("failed to delete ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
×
793
                                                                return err
×
794
                                                        }
×
795
                                                }
796

797
                                                return nil
×
798
                                        }()
799
                                        if err != nil {
×
800
                                                return err
×
801
                                        }
×
802
                                        if skipCIDR {
×
803
                                                break
×
804
                                        }
805
                                }
806
                        }
807
                }
808
        }
809

810
        return nil
×
811
}
812

813
func (c *Controller) cleanDuplicatedChassis(node *v1.Node) error {
1✔
814
        // if multi chassis has the same node name, delete all of them
1✔
815
        _, err := c.OVNSbClient.GetChassisByHost(node.Name)
1✔
816
        if err == nil {
2✔
817
                return nil
1✔
818
        }
1✔
819

820
        if !errors.Is(err, ovs.ErrOneNodeMultiChassis) {
2✔
821
                klog.Errorf("failed to get chassis for node %s: %v", node.Name, err)
1✔
822
                return err
1✔
823
        }
1✔
824

825
        klog.Warningf("node %s has multiple chassis, deleting all", node.Name)
1✔
826
        if err := c.OVNSbClient.DeleteChassisByHost(node.Name); err != nil {
2✔
827
                klog.Errorf("failed to delete chassis for node %s: %v", node.Name, err)
1✔
828
                return err
1✔
829
        }
1✔
830
        return nil
1✔
831
}
832

833
func (c *Controller) retryDelDupChassis(attempts, sleep int, f func(node *v1.Node) error, node *v1.Node) error {
×
834
        for i := range attempts {
×
835
                err := f(node)
×
836
                if err == nil {
×
837
                        return nil
×
838
                }
×
839
                klog.Errorf("failed to delete duplicated chassis for node %s: %v", node.Name, err)
×
840
                if i < attempts-1 {
×
841
                        time.Sleep(time.Duration(sleep) * time.Second)
×
842
                }
×
843
        }
844
        errMsg := errors.New("exhausting all attempts")
×
845
        klog.Error(errMsg)
×
846
        return errMsg
×
847
}
848

849
func (c *Controller) fetchPodsOnNode(nodeName string, pods []*v1.Pod) ([]string, error) {
1✔
850
        ports := make([]string, 0, len(pods))
1✔
851
        for _, pod := range pods {
1✔
852
                if pod.Spec.HostNetwork || pod.Spec.NodeName != nodeName || !isPodAlive(pod) {
×
853
                        continue
×
854
                }
855

856
                if pod.Annotations[util.LogicalRouterAnnotation] != c.config.ClusterRouter {
×
857
                        subnetName := pod.Annotations[util.LogicalSwitchAnnotation]
×
858
                        if subnetName == "" {
×
859
                                klog.V(4).Infof("Pod %s/%s is not on cluster router and has no logical switch annotation, skipping for VLAN check.", pod.Namespace, pod.Name)
×
860
                                continue
×
861
                        }
862

863
                        subnet, err := c.subnetsLister.Get(subnetName)
×
864
                        if err != nil {
×
865
                                klog.Errorf("failed to get subnet %s: %v", subnetName, err)
×
866
                                return nil, err
×
867
                        }
×
868

869
                        if subnet.Spec.Vlan == "" {
×
870
                                continue
×
871
                        }
872
                }
873

874
                podName := c.getNameByPod(pod)
×
875

×
876
                podNets, err := c.getPodKubeovnNets(pod)
×
877
                if err != nil {
×
878
                        klog.Errorf("failed to get pod nets %v", err)
×
879
                        return nil, err
×
880
                }
×
881

882
                for _, podNet := range podNets {
×
883
                        if !isOvnSubnet(podNet.Subnet) {
×
884
                                continue
×
885
                        }
886

887
                        if pod.Annotations != nil && pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)] == "true" {
×
888
                                ports = append(ports, ovs.PodNameToPortName(podName, pod.Namespace, podNet.ProviderName))
×
889
                        }
×
890
                }
891
        }
892
        return ports, nil
1✔
893
}
894

895
func (c *Controller) CheckNodePortGroup() {
×
896
        if err := c.checkAndUpdateNodePortGroup(); err != nil {
×
897
                klog.Errorf("check node port group status: %v", err)
×
898
        }
×
899
}
900

901
func (c *Controller) checkAndUpdateNodePortGroup() error {
1✔
902
        klog.V(3).Infoln("start to check node port-group status")
1✔
903
        var networkPolicyExists bool
1✔
904
        if c.config.EnableNP {
2✔
905
                np, err := c.npsLister.List(labels.Everything())
1✔
906
                if err != nil {
2✔
907
                        klog.Errorf("failed to list network policies: %v", err)
1✔
908
                        return err
1✔
909
                }
1✔
910
                networkPolicyExists = len(np) != 0
×
911
        }
912

913
        nodes, err := c.nodesLister.List(labels.Everything())
1✔
914
        if err != nil {
1✔
915
                klog.Errorf("list nodes: %v", err)
×
916
                return err
×
917
        }
×
918

919
        pods, err := c.podsLister.List(labels.Everything())
1✔
920
        if err != nil {
1✔
921
                klog.Errorf("list pods, %v", err)
×
922
                return err
×
923
        }
×
924

925
        for _, node := range nodes {
2✔
926
                // The port-group should already created when add node
1✔
927
                pgName := strings.ReplaceAll(node.Annotations[util.PortNameAnnotation], "-", ".")
1✔
928
                if pgName == "" {
2✔
929
                        klog.V(2).Infof("node %s does not have port name annotation, skip port group update", node.Name)
1✔
930
                        continue
1✔
931
                }
932

933
                // use join IP only when no internal IP exists
934
                nodeIPv4, nodeIPv6 := util.GetNodeInternalIP(*node)
1✔
935
                joinIP := node.Annotations[util.IPAddressAnnotation]
1✔
936
                joinIPv4, joinIPv6 := util.SplitStringIP(joinIP)
1✔
937
                if nodeIPv4 == "" {
1✔
938
                        nodeIPv4 = joinIPv4
×
939
                }
×
940
                if nodeIPv6 == "" {
2✔
941
                        nodeIPv6 = joinIPv6
1✔
942
                }
1✔
943
                nodeIP := strings.Trim(fmt.Sprintf("%s,%s", nodeIPv4, nodeIPv6), ",")
1✔
944

1✔
945
                nodePorts, err := c.fetchPodsOnNode(node.Name, pods)
1✔
946
                if err != nil {
1✔
947
                        klog.Errorf("fetch pods for node %v: %v", node.Name, err)
×
948
                        return err
×
949
                }
×
950

951
                if err = c.OVNNbClient.PortGroupSetPorts(pgName, nodePorts); err != nil {
1✔
952
                        klog.Errorf("failed to set ports of port group %s: %v", pgName, err)
×
953
                        return err
×
954
                }
×
955

956
                if networkPolicyExists {
1✔
957
                        if err := c.OVNNbClient.CreateNodeACL(pgName, nodeIP, joinIP); err != nil {
×
958
                                klog.Errorf("create node acl for node pg %s: %v", pgName, err)
×
959
                        }
×
960
                } else {
1✔
961
                        // clear all acl
1✔
962
                        if err = c.OVNNbClient.DeleteAcls(pgName, portGroupKey, "", nil); err != nil {
1✔
963
                                klog.Errorf("delete node acl for node pg %s: %v", pgName, err)
×
964
                        }
×
965
                }
966
        }
967

968
        return nil
1✔
969
}
970

971
func (c *Controller) UpdateChassisTag(node *v1.Node) error {
×
972
        annoChassisName := node.Annotations[util.ChassisAnnotation]
×
973
        if annoChassisName == "" {
×
974
                // kube-ovn-cni not ready to set chassis
×
975
                return nil
×
976
        }
×
977
        chassis, err := c.OVNSbClient.GetChassis(annoChassisName, true)
×
978
        if err != nil {
×
979
                klog.Errorf("failed to get chassis %s for node %s: %v", annoChassisName, node.Name, err)
×
980
                return err
×
981
        }
×
982
        if chassis == nil {
×
983
                klog.Infof("chassis %q not registered for node %s, do chassis gc once", annoChassisName, node.Name)
×
984
                // chassis name conflict, do GC
×
985
                if err = c.gcChassis(); err != nil {
×
986
                        klog.Errorf("failed to gc chassis: %v", err)
×
987
                        return err
×
988
                }
×
989
                err = &ErrChassisNotFound{Chassis: annoChassisName, Node: node.Name}
×
990
                klog.Error(err)
×
991
                return err
×
992
        }
993

994
        if chassis.ExternalIDs == nil || chassis.ExternalIDs["vendor"] != util.CniTypeName {
×
995
                klog.Infof("init tag %s for node %s chassis %s", util.CniTypeName, node.Name, chassis.Name)
×
996
                if err = c.OVNSbClient.UpdateChassisTag(chassis.Name, node.Name); err != nil {
×
997
                        err := fmt.Errorf("failed to init chassis tag, %w", err)
×
998
                        klog.Error(err)
×
999
                        return err
×
1000
                }
×
1001
        }
1002
        return nil
×
1003
}
1004

1005
func (c *Controller) getPolicyRouteParams(cidr string, priority int) (*strset.Set, map[string]string, error) {
1✔
1006
        ipSuffix := "ip4"
1✔
1007
        if util.CheckProtocol(cidr) == kubeovnv1.ProtocolIPv6 {
1✔
1008
                ipSuffix = "ip6"
×
1009
        }
×
1010
        match := fmt.Sprintf("%s.src == %s", ipSuffix, cidr)
1✔
1011
        policyList, err := c.OVNNbClient.GetLogicalRouterPolicy(c.config.ClusterRouter, priority, match, true)
1✔
1012
        if err != nil {
1✔
1013
                klog.Errorf("failed to get logical router policy: %v", err)
×
1014
                return nil, nil, err
×
1015
        }
×
1016
        if len(policyList) == 0 {
1✔
1017
                return strset.New(), map[string]string{}, nil
×
1018
        }
×
1019
        return strset.New(policyList[0].Nexthops...), maps.Clone(policyList[0].ExternalIDs), nil
1✔
1020
}
1021

1022
func (c *Controller) deletePolicyRouteForNode(nodeName, portName string) error {
×
1023
        subnets, err := c.subnetsLister.List(labels.Everything())
×
1024
        if err != nil {
×
1025
                klog.Errorf("get subnets: %v", err)
×
1026
                return err
×
1027
        }
×
1028

1029
        addresses := c.ipam.GetPodAddress(portName)
×
1030
        for _, addr := range addresses {
×
1031
                if addr.IP == "" {
×
1032
                        continue
×
1033
                }
1034
                klog.Infof("deleting logical router policy with nexthop %q from %s for node %s", addr.IP, c.config.ClusterRouter, nodeName)
×
1035
                if err = c.OVNNbClient.DeleteLogicalRouterPolicyByNexthop(c.config.ClusterRouter, util.NodeRouterPolicyPriority, addr.IP); err != nil {
×
1036
                        klog.Errorf("failed to delete logical router policy with nexthop %q from %s for node %s: %v", addr.IP, c.config.ClusterRouter, nodeName, err)
×
1037
                        return err
×
1038
                }
×
1039
        }
1040

1041
        for _, subnet := range subnets {
×
1042
                if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) || subnet.Spec.Vpc != c.config.ClusterRouter || subnet.Name == c.config.NodeSwitch {
×
1043
                        continue
×
1044
                }
1045

1046
                if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType {
×
1047
                        pgName := getOverlaySubnetsPortGroupName(subnet.Name, nodeName)
×
1048
                        if err = c.OVNNbClient.DeletePortGroup(pgName); err != nil {
×
1049
                                klog.Errorf("delete port group for subnet %s and node %s: %v", subnet.Name, nodeName, err)
×
1050
                                return err
×
1051
                        }
×
1052

1053
                        klog.Infof("delete policy route for distributed subnet %s, node %s", subnet.Name, nodeName)
×
1054
                        if err = c.deletePolicyRouteForDistributedSubnet(subnet, nodeName); err != nil {
×
1055
                                klog.Errorf("delete policy route for subnet %s and node %s: %v", subnet.Name, nodeName, err)
×
1056
                                return err
×
1057
                        }
×
1058
                }
1059

1060
                if subnet.Spec.GatewayType == kubeovnv1.GWCentralizedType {
×
1061
                        c.subnetKeyMutex.LockKey(subnet.Name)
×
1062
                        err = func() error {
×
1063
                                defer func() { _ = c.subnetKeyMutex.UnlockKey(subnet.Name) }()
×
1064
                                if subnet.Spec.EnableEcmp {
×
1065
                                        for cidrBlock := range strings.SplitSeq(subnet.Spec.CIDRBlock, ",") {
×
1066
                                                nextHops, nameIPMap, err := c.getPolicyRouteParams(cidrBlock, util.GatewayRouterPolicyPriority)
×
1067
                                                if err != nil {
×
1068
                                                        klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
×
1069
                                                        continue
×
1070
                                                }
1071

1072
                                                exist := false
×
1073
                                                if _, ok := nameIPMap[nodeName]; ok {
×
1074
                                                        exist = true
×
1075
                                                }
×
1076

1077
                                                if exist {
×
1078
                                                        nextHops.Remove(nameIPMap[nodeName])
×
1079
                                                        delete(nameIPMap, nodeName)
×
1080

×
1081
                                                        if nextHops.Size() == 0 {
×
1082
                                                                klog.Infof("delete policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
1083
                                                                if err := c.deletePolicyRouteForCentralizedSubnet(subnet); err != nil {
×
1084
                                                                        klog.Errorf("failed to delete policy route for centralized subnet %s, %v", subnet.Name, err)
×
1085
                                                                        return err
×
1086
                                                                }
×
1087
                                                        } else {
×
1088
                                                                klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
1089
                                                                if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
1090
                                                                        klog.Errorf("failed to update policy route for subnet %s on node %s, %v", subnet.Name, nodeName, err)
×
1091
                                                                        return err
×
1092
                                                                }
×
1093
                                                        }
1094
                                                }
1095
                                        }
1096
                                } else {
×
1097
                                        klog.Infof("reconcile policy route for centralized subnet %s", subnet.Name)
×
1098
                                        if err := c.reconcileDefaultCentralizedSubnetRouteInDefaultVpc(subnet); err != nil {
×
1099
                                                klog.Errorf("failed to delete policy route for centralized subnet %s, %v", subnet.Name, err)
×
1100
                                                return err
×
1101
                                        }
×
1102
                                }
1103
                                return nil
×
1104
                        }()
1105
                        if err != nil {
×
1106
                                return err
×
1107
                        }
×
1108
                }
1109
        }
1110
        return nil
×
1111
}
1112

1113
func (c *Controller) addPolicyRouteForCentralizedSubnetOnNode(node *v1.Node, nodeIP string) error {
×
1114
        subnets, err := c.subnetsLister.List(labels.Everything())
×
1115
        if err != nil {
×
1116
                klog.Errorf("failed to get subnets %v", err)
×
1117
                return err
×
1118
        }
×
1119

1120
        for _, subnet := range subnets {
×
1121
                if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) || subnet.Spec.Vpc != c.config.ClusterRouter || subnet.Name == c.config.NodeSwitch || subnet.Spec.GatewayType != kubeovnv1.GWCentralizedType {
×
1122
                        continue
×
1123
                }
1124
                c.subnetKeyMutex.LockKey(subnet.Name)
×
1125
                err = func() error {
×
1126
                        defer func() { _ = c.subnetKeyMutex.UnlockKey(subnet.Name) }()
×
1127
                        nodeName := node.Name
×
1128
                        if subnet.Spec.EnableEcmp {
×
1129
                                if !util.GatewayContains(subnet.Spec.GatewayNode, nodeName) &&
×
1130
                                        (subnet.Spec.GatewayNode != "" || !util.MatchLabelSelectors(subnet.Spec.GatewayNodeSelectors, node.Labels)) {
×
1131
                                        return nil
×
1132
                                }
×
1133

1134
                                for nextHop := range strings.SplitSeq(nodeIP, ",") {
×
1135
                                        for cidrBlock := range strings.SplitSeq(subnet.Spec.CIDRBlock, ",") {
×
1136
                                                if util.CheckProtocol(cidrBlock) != util.CheckProtocol(nextHop) {
×
1137
                                                        continue
×
1138
                                                }
1139

1140
                                                nextHops, nameIPMap, err := c.getPolicyRouteParams(cidrBlock, util.GatewayRouterPolicyPriority)
×
1141
                                                if err != nil {
×
1142
                                                        klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
×
1143
                                                        continue
×
1144
                                                }
1145
                                                if nameIPMap[nodeName] == nextHop {
×
1146
                                                        continue
×
1147
                                                }
1148

1149
                                                nextHops.Add(nextHop)
×
1150
                                                if nameIPMap == nil {
×
1151
                                                        nameIPMap = make(map[string]string, 1)
×
1152
                                                }
×
1153
                                                nameIPMap[nodeName] = nextHop
×
1154
                                                klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
1155
                                                if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
1156
                                                        klog.Errorf("failed to update policy route for subnet %s on node %s, %v", subnet.Name, nodeName, err)
×
1157
                                                        return err
×
1158
                                                }
×
1159
                                        }
1160
                                }
1161
                        } else {
×
1162
                                if subnet.Status.ActivateGateway != nodeName {
×
1163
                                        return nil
×
1164
                                }
×
1165
                                klog.Infof("add policy route for centralized subnet %s, on node %s, ip %s", subnet.Name, nodeName, nodeIP)
×
1166
                                if err = c.addPolicyRouteForCentralizedSubnet(subnet, nodeName, nil, strings.Split(nodeIP, ",")); err != nil {
×
1167
                                        klog.Errorf("failed to add active-backup policy route for centralized subnet %s: %v", subnet.Name, err)
×
1168
                                        return err
×
1169
                                }
×
1170
                        }
1171
                        return nil
×
1172
                }()
1173
                if err != nil {
×
1174
                        return err
×
1175
                }
×
1176
        }
1177
        return nil
×
1178
}
1179

1180
func (c *Controller) addPolicyRouteForLocalDNSCacheOnNode(dnsIPs []string, nodePortName, nodeIP, nodeName string, af int) error {
1✔
1181
        if len(dnsIPs) == 0 {
1✔
1182
                return c.deletePolicyRouteForLocalDNSCacheOnNode(nodeName, af)
×
1183
        }
×
1184

1185
        var (
1✔
1186
                externalIDs = map[string]string{
1✔
1187
                        "vendor":          util.CniTypeName,
1✔
1188
                        "node":            nodeName,
1✔
1189
                        "address-family":  strconv.Itoa(af),
1✔
1190
                        "isLocalDnsCache": "true",
1✔
1191
                }
1✔
1192
                pgAs     = strings.ReplaceAll(fmt.Sprintf("%s_ip%d", nodePortName, af), "-", ".")
1✔
1193
                action   = kubeovnv1.PolicyRouteActionReroute
1✔
1194
                nextHops = []string{nodeIP}
1✔
1195
        )
1✔
1196
        matches := strset.NewWithSize(len(dnsIPs))
1✔
1197
        for _, ip := range dnsIPs {
2✔
1198
                matches.Add(fmt.Sprintf("ip%d.src == $%s && ip%d.dst == %s", af, pgAs, af, ip))
1✔
1199
        }
1✔
1200

1201
        policies, err := c.OVNNbClient.ListLogicalRouterPolicies(c.config.ClusterRouter, -1, externalIDs, true)
1✔
1202
        if err != nil {
1✔
1203
                klog.Errorf("failed to list logical router policies for node %q af %d: %v", nodeName, af, err)
×
1204
                return err
×
1205
        }
×
1206

1207
        for _, policy := range policies {
2✔
1208
                if policy.Priority == util.NodeRouterPolicyPriority && policy.Action == string(action) && slices.Equal(policy.Nexthops, nextHops) && matches.Has(policy.Match) {
2✔
1209
                        matches.Remove(policy.Match)
1✔
1210
                        continue
1✔
1211
                }
1212
                // delete unused policy router policy
1213
                klog.Infof("deleting logical router policy by UUID %s", policy.UUID)
1✔
1214
                if err = c.OVNNbClient.DeleteLogicalRouterPolicyByUUID(c.config.ClusterRouter, policy.UUID); err != nil {
1✔
1215
                        klog.Errorf("failed to delete logical router policy by UUID %s: %v", policy.UUID, err)
×
1216
                        return err
×
1217
                }
×
1218
        }
1219

1220
        for _, match := range matches.List() {
2✔
1221
                klog.Infof("add node local dns cache policy route for router %s: match %q, action %q, nexthop %q, externalID %v", c.config.ClusterRouter, match, action, nodeIP, externalIDs)
1✔
1222
                if err := c.addPolicyRouteToVpc(
1✔
1223
                        c.config.ClusterRouter,
1✔
1224
                        &kubeovnv1.PolicyRoute{
1✔
1225
                                Priority:  util.NodeRouterPolicyPriority,
1✔
1226
                                Match:     match,
1✔
1227
                                Action:    action,
1✔
1228
                                NextHopIP: nodeIP,
1✔
1229
                        },
1✔
1230
                        externalIDs,
1✔
1231
                ); err != nil {
1✔
1232
                        klog.Errorf("failed to add logical router policy for node %s: %v", nodeName, err)
×
1233
                        return err
×
1234
                }
×
1235
        }
1236

1237
        return nil
1✔
1238
}
1239

1240
func (c *Controller) deletePolicyRouteForLocalDNSCacheOnNode(nodeName string, af int) error {
×
1241
        policies, err := c.OVNNbClient.ListLogicalRouterPolicies(c.config.ClusterRouter, -1, map[string]string{
×
1242
                "vendor":          util.CniTypeName,
×
1243
                "node":            nodeName,
×
1244
                "address-family":  strconv.Itoa(af),
×
1245
                "isLocalDnsCache": "true",
×
1246
        }, true)
×
1247
        if err != nil {
×
1248
                klog.Errorf("failed to list logical router policies: %v", err)
×
1249
                return err
×
1250
        }
×
1251
        if len(policies) == 0 {
×
1252
                return nil
×
1253
        }
×
1254

1255
        for _, policy := range policies {
×
1256
                klog.Infof("delete node local dns cache policy route for router %s with match %s", c.config.ClusterRouter, policy.Match)
×
1257

×
1258
                if err := c.OVNNbClient.DeleteLogicalRouterPolicyByUUID(c.config.ClusterRouter, policy.UUID); err != nil {
×
1259
                        klog.Errorf("failed to delete policy route for node local dns in router %s with match %s: %v", c.config.ClusterRouter, policy.Match, err)
×
1260
                        return err
×
1261
                }
×
1262
        }
1263
        return nil
×
1264
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc