• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 27750930967

18 Jun 2026 09:43AM UTC coverage: 21.802% (+0.1%) from 21.689%
27750930967

Pull #6899

github

zhangzujian
fix: reconcile vpc bfd ha chassis on node changes

Signed-off-by: zhangzujian <zhangzujian.7@gmail.com>
Pull Request #6899: fix: reconcile VPC BFD HA chassis on node changes

35 of 63 new or added lines in 2 files covered. (55.56%)

1 existing line in 1 file now uncovered.

10991 of 50412 relevant lines covered (21.8%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

7.08
/pkg/controller/node.go
1
package controller
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "maps"
8
        "reflect"
9
        "slices"
10
        "strconv"
11
        "strings"
12
        "time"
13

14
        goping "github.com/prometheus-community/pro-bing"
15
        "github.com/scylladb/go-set/strset"
16
        v1 "k8s.io/api/core/v1"
17
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
18
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19
        "k8s.io/apimachinery/pkg/labels"
20
        "k8s.io/apimachinery/pkg/types"
21
        "k8s.io/client-go/tools/cache"
22
        "k8s.io/klog/v2"
23

24
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
25
        "github.com/kubeovn/kube-ovn/pkg/ovs"
26
        "github.com/kubeovn/kube-ovn/pkg/util"
27
)
28

29
type ErrChassisNotFound struct {
30
        Chassis string
31
        Node    string
32
}
33

34
func (e *ErrChassisNotFound) Error() string {
×
35
        return fmt.Sprintf("chassis %s not found for node %s", e.Chassis, e.Node)
×
36
}
×
37

38
func (c *Controller) enqueueAddNode(obj any) {
1✔
39
        node := obj.(*v1.Node)
1✔
40
        key := cache.MetaObjectToName(node).String()
1✔
41
        klog.V(3).Infof("enqueue add node %s", key)
1✔
42
        c.addNodeQueue.Add(key)
1✔
43
        c.enqueueVpcBFDPortByNodeChange(nil, node)
1✔
44
}
1✔
45

46
func nodeReady(node *v1.Node) bool {
1✔
47
        var ready, networkUnavailable bool
1✔
48
        for _, c := range node.Status.Conditions {
2✔
49
                switch c.Type {
1✔
50
                case v1.NodeReady:
1✔
51
                        ready = c.Status == v1.ConditionTrue
1✔
52
                case v1.NodeNetworkUnavailable:
×
53
                        networkUnavailable = c.Status == v1.ConditionTrue
×
54
                }
55
        }
56
        return ready && !networkUnavailable
1✔
57
}
58

59
func (c *Controller) listVpcBFDPorts() ([]*kubeovnv1.Vpc, error) {
1✔
60
        vpcs, err := c.vpcsLister.List(labels.Everything())
1✔
61
        if err != nil {
1✔
NEW
62
                return nil, err
×
NEW
63
        }
×
64

65
        bfdVpcs := make([]*kubeovnv1.Vpc, 0, len(vpcs))
1✔
66
        for _, vpc := range vpcs {
2✔
67
                if vpc.Labels != nil && vpc.Labels[util.VpcExternalLabel] == "true" {
1✔
NEW
68
                        continue
×
69
                }
70
                if !vpc.Spec.BFDPort.IsEnabled() {
1✔
NEW
71
                        continue
×
72
                }
73
                bfdVpcs = append(bfdVpcs, vpc)
1✔
74
        }
75
        return bfdVpcs, nil
1✔
76
}
77

78
func (c *Controller) enqueueVpcBFDPortByNodeChange(oldNode, newNode *v1.Node) {
1✔
79
        vpcs, err := c.listVpcBFDPorts()
1✔
80
        if err != nil {
1✔
NEW
81
                klog.Errorf("failed to list VPC BFD ports for node change: %v", err)
×
NEW
82
                return
×
NEW
83
        }
×
84

85
        for _, vpc := range vpcs {
2✔
86
                selector := labels.Everything()
1✔
87
                if vpc.Spec.BFDPort.NodeSelector != nil {
2✔
88
                        selector, err = metav1.LabelSelectorAsSelector(vpc.Spec.BFDPort.NodeSelector)
1✔
89
                        if err != nil {
1✔
NEW
90
                                klog.Errorf("failed to parse BFD port node selector of vpc %s: %v", vpc.Name, err)
×
NEW
91
                                c.addOrUpdateVpcQueue.Add(vpc.Name)
×
NEW
92
                                continue
×
93
                        }
94
                }
95

96
                oldMatched := oldNode != nil && selector.Matches(labels.Set(oldNode.Labels))
1✔
97
                newMatched := newNode != nil && selector.Matches(labels.Set(newNode.Labels))
1✔
98
                nodeReadyChanged := oldNode != nil && newNode != nil && oldMatched && newMatched && nodeReady(oldNode) != nodeReady(newNode)
1✔
99
                if oldMatched != newMatched || nodeReadyChanged {
2✔
100
                        klog.V(3).Infof("enqueue update vpc %s for BFD port triggered by node change", vpc.Name)
1✔
101
                        c.addOrUpdateVpcQueue.Add(vpc.Name)
1✔
102
                }
1✔
103
        }
104
}
105

106
func (c *Controller) enqueueUpdateNode(oldObj, newObj any) {
1✔
107
        oldNode := oldObj.(*v1.Node)
1✔
108
        newNode := newObj.(*v1.Node)
1✔
109
        nodeReadyChanged := nodeReady(oldNode) != nodeReady(newNode)
1✔
110
        nodeLabelsChanged := !maps.Equal(oldNode.Labels, newNode.Labels)
1✔
111

1✔
112
        key := cache.MetaObjectToName(newNode).String()
1✔
113
        if nodeReadyChanged ||
1✔
114
                !reflect.DeepEqual(oldNode.Annotations, newNode.Annotations) {
2✔
115
                if len(newNode.Annotations) == 0 || newNode.Annotations[util.AllocatedAnnotation] != "true" {
2✔
116
                        klog.V(3).Infof("enqueue add node %s", key)
1✔
117
                        c.addNodeQueue.Add(key)
1✔
118
                } else {
1✔
119
                        klog.V(3).Infof("enqueue update node %s", key)
×
120
                        c.updateNodeQueue.Add(key)
×
121
                }
×
122
        }
123
        if nodeReadyChanged || nodeLabelsChanged {
2✔
124
                c.enqueueVpcBFDPortByNodeChange(oldNode, newNode)
1✔
125
        }
1✔
126
}
127

128
func (c *Controller) enqueueDeleteNode(obj any) {
1✔
129
        var node *v1.Node
1✔
130
        switch t := obj.(type) {
1✔
131
        case *v1.Node:
×
132
                node = t
×
133
        case cache.DeletedFinalStateUnknown:
1✔
134
                n, ok := t.Obj.(*v1.Node)
1✔
135
                if !ok {
1✔
136
                        klog.Warningf("unexpected object type: %T", t.Obj)
×
137
                        return
×
138
                }
×
139
                node = n
1✔
140
        default:
×
141
                klog.Warningf("unexpected type: %T", obj)
×
142
                return
×
143
        }
144

145
        key := cache.MetaObjectToName(node).String()
1✔
146
        klog.V(3).Infof("enqueue delete node %s", key)
1✔
147
        c.deletingNodeObjMap.Store(key, node)
1✔
148
        c.deleteNodeQueue.Add(key)
1✔
149
        c.enqueueVpcBFDPortByNodeChange(node, nil)
1✔
150
}
151

152
func nodeUnderlayAddressSetName(node string, af int) string {
×
153
        return fmt.Sprintf("node_%s_underlay_v%d", strings.ReplaceAll(node, "-", "_"), af)
×
154
}
×
155

156
func (c *Controller) handleAddNode(key string) error {
×
157
        c.nodeKeyMutex.LockKey(key)
×
158
        defer func() { _ = c.nodeKeyMutex.UnlockKey(key) }()
×
159

160
        cachedNode, err := c.nodesLister.Get(key)
×
161
        if err != nil {
×
162
                if k8serrors.IsNotFound(err) {
×
163
                        return nil
×
164
                }
×
165
                klog.Errorf("failed to get node %s: %v", key, err)
×
166
                return err
×
167
        }
168
        node := cachedNode.DeepCopy()
×
169
        klog.Infof("handle add node %s", node.Name)
×
170

×
171
        subnets, err := c.subnetsLister.List(labels.Everything())
×
172
        if err != nil {
×
173
                klog.Errorf("failed to list subnets: %v", err)
×
174
                return err
×
175
        }
×
176

177
        nodeIPv4, nodeIPv6 := util.GetNodeInternalIP(*node)
×
178
        for _, subnet := range subnets {
×
179
                if subnet.Spec.Vpc != c.config.ClusterRouter {
×
180
                        continue
×
181
                }
182

183
                v4, v6 := util.SplitStringIP(subnet.Spec.CIDRBlock)
×
184
                if subnet.Spec.Vlan == "" && (util.CIDRContainIP(v4, nodeIPv4) || util.CIDRContainIP(v6, nodeIPv6)) {
×
185
                        msg := fmt.Sprintf("internal IP address of node %s is in CIDR of subnet %s, this may result in network issues", node.Name, subnet.Name)
×
186
                        klog.Warning(msg)
×
187
                        c.recorder.Eventf(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: node.Name, UID: types.UID(node.Name)}}, v1.EventTypeWarning, "NodeAddressConflictWithSubnet", msg)
×
188
                        break
×
189
                }
190
        }
191

192
        if err = c.handleNodeAnnotationsForProviderNetworks(node); err != nil {
×
193
                klog.Errorf("failed to handle annotations of node %s for provider networks: %v", node.Name, err)
×
194
                return err
×
195
        }
×
196

197
        subnet, err := c.subnetsLister.Get(c.config.NodeSwitch)
×
198
        if err != nil {
×
199
                klog.Errorf("failed to get node subnet: %v", err)
×
200
                return err
×
201
        }
×
202

203
        var v4IP, v6IP, mac string
×
204

×
205
        portName := util.NodeLspName(key)
×
206
        if node.Annotations[util.AllocatedAnnotation] == "true" && node.Annotations[util.IPAddressAnnotation] != "" && node.Annotations[util.MacAddressAnnotation] != "" {
×
207
                macStr := node.Annotations[util.MacAddressAnnotation]
×
208
                v4IP, v6IP, mac, err = c.ipam.GetStaticAddress(portName, portName, node.Annotations[util.IPAddressAnnotation],
×
209
                        &macStr, node.Annotations[util.LogicalSwitchAnnotation], true)
×
210
                if err != nil {
×
211
                        klog.Errorf("failed to alloc static ip addrs for node %v: %v", node.Name, err)
×
212
                        return err
×
213
                }
×
214
        } else {
×
215
                v4IP, v6IP, mac, err = c.ipam.GetRandomAddress(portName, portName, nil, c.config.NodeSwitch, "", nil, true)
×
216
                if err != nil {
×
217
                        klog.Errorf("failed to alloc random ip addrs for node %v: %v", node.Name, err)
×
218
                        return err
×
219
                }
×
220

221
                // Clean up potentially existing logical switch ports to avoid leftover issues from previous failed configurations
222
                if err := c.OVNNbClient.DeleteLogicalSwitchPort(portName); err != nil {
×
223
                        klog.Errorf("failed to delete stale logical switch port %s: %v", portName, err)
×
224
                        return err
×
225
                }
×
226
                klog.Infof("deleted stale logical switch port %s", portName)
×
227
        }
228

229
        ipStr := util.GetStringIP(v4IP, v6IP)
×
230
        if err := c.OVNNbClient.CreateBareLogicalSwitchPort(c.config.NodeSwitch, portName, ipStr, mac); err != nil {
×
231
                klog.Errorf("failed to create logical switch port %s: %v", portName, err)
×
232
                return err
×
233
        }
×
234

235
        for ip := range strings.SplitSeq(ipStr, ",") {
×
236
                if ip == "" {
×
237
                        continue
×
238
                }
239

240
                nodeIP, af := nodeIPv4, 4
×
241
                protocol := util.CheckProtocol(ip)
×
242
                if protocol == kubeovnv1.ProtocolIPv6 {
×
243
                        nodeIP, af = nodeIPv6, 6
×
244
                }
×
245
                if nodeIP != "" {
×
246
                        var (
×
247
                                match       = fmt.Sprintf("ip%d.dst == %s", af, nodeIP)
×
248
                                action      = kubeovnv1.PolicyRouteActionReroute
×
249
                                externalIDs = map[string]string{
×
250
                                        "vendor":         util.CniTypeName,
×
251
                                        "node":           node.Name,
×
252
                                        "address-family": strconv.Itoa(af),
×
253
                                }
×
254
                        )
×
255
                        klog.Infof("add policy route for router: %s, match %s, action %s, nexthop %s, externalID %v", c.config.ClusterRouter, match, action, ip, externalIDs)
×
256
                        if err = c.addPolicyRouteToVpc(
×
257
                                c.config.ClusterRouter,
×
258
                                &kubeovnv1.PolicyRoute{
×
259
                                        Priority:  util.NodeRouterPolicyPriority,
×
260
                                        Match:     match,
×
261
                                        Action:    action,
×
262
                                        NextHopIP: ip,
×
263
                                },
×
264
                                externalIDs,
×
265
                        ); err != nil {
×
266
                                klog.Errorf("failed to add logical router policy for node %s: %v", node.Name, err)
×
267
                                return err
×
268
                        }
×
269

270
                        dnsIPs := make([]string, 0, len(c.config.NodeLocalDNSIPs))
×
271
                        for _, ip := range c.config.NodeLocalDNSIPs {
×
272
                                if util.CheckProtocol(ip) == protocol {
×
273
                                        dnsIPs = append(dnsIPs, ip)
×
274
                                }
×
275
                        }
276

277
                        if err = c.addPolicyRouteForLocalDNSCacheOnNode(dnsIPs, portName, ip, node.Name, af); err != nil {
×
278
                                klog.Errorf("failed to add policy route for node %s: %v", node.Name, err)
×
279
                                return err
×
280
                        }
×
281
                }
282
        }
283

284
        if err := c.addNodeGatewayStaticRoute(); err != nil {
×
285
                klog.Errorf("failed to add static route for node gw: %v", err)
×
286
                return err
×
287
        }
×
288

289
        patch := util.KVPatch{
×
290
                util.IPAddressAnnotation:     ipStr,
×
291
                util.MacAddressAnnotation:    mac,
×
292
                util.CidrAnnotation:          subnet.Spec.CIDRBlock,
×
293
                util.GatewayAnnotation:       subnet.Spec.Gateway,
×
294
                util.LogicalSwitchAnnotation: c.config.NodeSwitch,
×
295
                util.AllocatedAnnotation:     "true",
×
296
                util.PortNameAnnotation:      portName,
×
297
        }
×
298
        if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
299
                klog.Errorf("failed to update annotations of node %s: %v", node.Name, err)
×
300
                return err
×
301
        }
×
302

303
        if err := c.createOrUpdateIPCR("", "", ipStr, mac, c.config.NodeSwitch, "", node.Name, ""); err != nil {
×
304
                klog.Errorf("failed to create or update IPs %s: %v", portName, err)
×
305
                return err
×
306
        }
×
307

308
        for _, subnet := range subnets {
×
309
                if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) || subnet.Spec.Vpc != c.config.ClusterRouter || subnet.Name == c.config.NodeSwitch || subnet.Spec.GatewayType != kubeovnv1.GWDistributedType {
×
310
                        continue
×
311
                }
312
                if err = c.createPortGroupForDistributedSubnet(node, subnet); err != nil {
×
313
                        klog.Errorf("failed to create port group for node %s and subnet %s: %v", node.Name, subnet.Name, err)
×
314
                        return err
×
315
                }
×
316
                // policy route for overlay distributed subnet should be reconciled when node ip changed
317
                c.addOrUpdateSubnetQueue.Add(subnet.Name)
×
318
        }
319

320
        // ovn acl doesn't support address_set name with '-', so replace '-' by '.'
321
        pgName := strings.ReplaceAll(portName, "-", ".")
×
322
        if err = c.OVNNbClient.CreatePortGroup(pgName, map[string]string{"node": node.Name, networkPolicyKey: "node" + "/" + key}); err != nil {
×
323
                klog.Errorf("create port group %s for node %s: %v", pgName, key, err)
×
324
                return err
×
325
        }
×
326

327
        if err := c.addPolicyRouteForCentralizedSubnetOnNode(node.Name, ipStr); err != nil {
×
328
                klog.Errorf("failed to add policy route for node %s, %v", key, err)
×
329
                return err
×
330
        }
×
331

332
        if err := c.UpdateChassisTag(node); err != nil {
×
333
                klog.Errorf("failed to update chassis tag for node %s: %v", node.Name, err)
×
334
                return err
×
335
        }
×
336

337
        if err := c.retryDelDupChassis(util.ChassisRetryMaxTimes, util.ChassisControllerRetryInterval, c.cleanDuplicatedChassis, node); err != nil {
×
338
                klog.Errorf("failed to clean duplicated chassis for node %s: %v", node.Name, err)
×
339
                return err
×
340
        }
×
341
        return nil
×
342
}
343

344
func (c *Controller) handleNodeAnnotationsForProviderNetworks(node *v1.Node) error {
×
345
        providerNetworks, err := c.providerNetworksLister.List(labels.Everything())
×
346
        if err != nil && !k8serrors.IsNotFound(err) {
×
347
                klog.Errorf("failed to list provider networks: %v", err)
×
348
                return err
×
349
        }
×
350

351
        for _, pn := range providerNetworks {
×
352
                excludeAnno := fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name)
×
353
                interfaceAnno := fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)
×
354

×
355
                var newPn *kubeovnv1.ProviderNetwork
×
356
                excluded := slices.Contains(pn.Spec.ExcludeNodes, node.Name)
×
357
                if !excluded && len(node.Annotations) != 0 && node.Annotations[excludeAnno] == "true" {
×
358
                        newPn = pn.DeepCopy()
×
359
                        newPn.Spec.ExcludeNodes = append(newPn.Spec.ExcludeNodes, node.Name)
×
360
                        excluded = true
×
361
                }
×
362

363
                var customInterface string
×
364
                for _, v := range pn.Spec.CustomInterfaces {
×
365
                        if slices.Contains(v.Nodes, node.Name) {
×
366
                                customInterface = v.Interface
×
367
                                break
×
368
                        }
369
                }
370
                if customInterface == "" && len(node.Annotations) != 0 {
×
371
                        if customInterface = node.Annotations[interfaceAnno]; customInterface != "" {
×
372
                                if newPn == nil {
×
373
                                        newPn = pn.DeepCopy()
×
374
                                }
×
375
                                var index *int
×
376
                                for i := range newPn.Spec.CustomInterfaces {
×
377
                                        if newPn.Spec.CustomInterfaces[i].Interface == customInterface {
×
378
                                                index = &i
×
379
                                                break
×
380
                                        }
381
                                }
382
                                if index != nil {
×
383
                                        newPn.Spec.CustomInterfaces[*index].Nodes = append(newPn.Spec.CustomInterfaces[*index].Nodes, node.Name)
×
384
                                } else {
×
385
                                        ci := kubeovnv1.CustomInterface{Interface: customInterface, Nodes: []string{node.Name}}
×
386
                                        newPn.Spec.CustomInterfaces = append(newPn.Spec.CustomInterfaces, ci)
×
387
                                }
×
388
                        }
389
                }
390

391
                if newPn != nil {
×
392
                        if newPn, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().Update(context.Background(), newPn, metav1.UpdateOptions{}); err != nil {
×
393
                                klog.Errorf("failed to update provider network %s: %v", pn.Name, err)
×
394
                                return err
×
395
                        }
×
396
                }
397

398
                if len(node.Annotations) != 0 {
×
399
                        patch := util.KVPatch{excludeAnno: nil, interfaceAnno: nil}
×
400
                        if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
401
                                klog.Errorf("failed to patch node %s: %v", node.Name, err)
×
402
                                return err
×
403
                        }
×
404
                }
405

406
                if excluded {
×
407
                        if newPn == nil {
×
408
                                newPn = pn.DeepCopy()
×
409
                        } else {
×
410
                                newPn = newPn.DeepCopy()
×
411
                        }
×
412

413
                        if newPn.Status.EnsureNodeStandardConditions(node.Name) {
×
414
                                _, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().UpdateStatus(context.Background(), newPn, metav1.UpdateOptions{})
×
415
                                if err != nil {
×
416
                                        klog.Errorf("failed to update status of provider network %s: %v", pn.Name, err)
×
417
                                        return err
×
418
                                }
×
419
                        }
420
                }
421
        }
422

423
        return nil
×
424
}
425

426
func (c *Controller) handleDeleteNode(key string) (err error) {
×
427
        c.nodeKeyMutex.LockKey(key)
×
428
        defer func() {
×
429
                _ = c.nodeKeyMutex.UnlockKey(key)
×
430
                if err == nil {
×
431
                        c.deletingNodeObjMap.Delete(key)
×
432
                }
×
433
        }()
434
        klog.Infof("handle delete node %s", key)
×
435

×
436
        node, ok := c.deletingNodeObjMap.Load(key)
×
437
        if !ok {
×
438
                return nil
×
439
        }
×
440
        n, _ := c.nodesLister.Get(key)
×
441
        if n != nil && n.UID != node.UID {
×
442
                klog.Warningf("Node %s is adding, skip the node delete handler, but it may leave some gc resources behind", key)
×
443
                return nil
×
444
        }
×
445
        return c.deleteNode(key)
×
446
}
447

448
func (c *Controller) deleteNode(key string) error {
×
449
        portName := util.NodeLspName(key)
×
450
        klog.Infof("delete logical switch port %s", portName)
×
451
        if err := c.OVNNbClient.DeleteLogicalSwitchPort(portName); err != nil {
×
452
                klog.Errorf("failed to delete node switch port %s: %v", portName, err)
×
453
                return err
×
454
        }
×
455
        if err := c.OVNSbClient.DeleteChassisByHost(key); err != nil {
×
456
                klog.Errorf("failed to delete chassis for node %s: %v", key, err)
×
457
                return err
×
458
        }
×
459

460
        for _, af := range [...]int{4, 6} {
×
461
                if err := c.deletePolicyRouteForLocalDNSCacheOnNode(key, af); err != nil {
×
462
                        klog.Error(err)
×
463
                        return err
×
464
                }
×
465
        }
466

467
        // ovn acl doesn't support address_set name with '-', so replace '-' by '.'
468
        pgName := strings.ReplaceAll(portName, "-", ".")
×
469
        if err := c.OVNNbClient.DeletePortGroup(pgName); err != nil {
×
470
                klog.Errorf("delete port group %s for node: %v", portName, err)
×
471
                return err
×
472
        }
×
473

474
        if err := c.deletePolicyRouteForNode(key, portName); err != nil {
×
475
                klog.Errorf("failed to delete policy route for node %s: %v", key, err)
×
476
                return err
×
477
        }
×
478

479
        if err := c.OVNNbClient.DeleteAddressSet(nodeUnderlayAddressSetName(key, 4)); err != nil {
×
480
                klog.Errorf("failed to delete address set for node %s: %v", key, err)
×
481
                return err
×
482
        }
×
483
        if err := c.OVNNbClient.DeleteAddressSet(nodeUnderlayAddressSetName(key, 6)); err != nil {
×
484
                klog.Errorf("failed to delete address set for node %s: %v", key, err)
×
485
                return err
×
486
        }
×
487

488
        klog.Infof("release node port %s", portName)
×
489
        c.ipam.ReleaseAddressByPod(portName, c.config.NodeSwitch)
×
490

×
491
        providerNetworks, err := c.providerNetworksLister.List(labels.Everything())
×
492
        if err != nil && !k8serrors.IsNotFound(err) {
×
493
                klog.Errorf("failed to list provider networks: %v", err)
×
494
                return err
×
495
        }
×
496

497
        for _, pn := range providerNetworks {
×
498
                if err = c.updateProviderNetworkForNodeDeletion(pn, key); err != nil {
×
499
                        klog.Error(err)
×
500
                        return err
×
501
                }
×
502
        }
503
        klog.Infof("delete node ip %s", portName)
×
504
        if err = c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), portName, metav1.DeleteOptions{}); err != nil && !k8serrors.IsNotFound(err) {
×
505
                return err
×
506
        }
×
507

508
        return nil
×
509
}
510

511
func (c *Controller) updateProviderNetworkForNodeDeletion(pn *kubeovnv1.ProviderNetwork, node string) error {
×
512
        // update provider network status
×
513
        var needUpdate bool
×
514
        newPn := pn.DeepCopy()
×
515
        if slices.Contains(newPn.Status.ReadyNodes, node) {
×
516
                newPn.Status.ReadyNodes = util.RemoveString(newPn.Status.ReadyNodes, node)
×
517
                needUpdate = true
×
518
        }
×
519
        if newPn.Status.RemoveNodeConditions(node) {
×
520
                needUpdate = true
×
521
        }
×
522
        if needUpdate {
×
523
                var err error
×
524
                newPn, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().UpdateStatus(context.Background(), newPn, metav1.UpdateOptions{})
×
525
                if err != nil {
×
526
                        klog.Errorf("failed to update status of provider network %s: %v", pn.Name, err)
×
527
                        return err
×
528
                }
×
529
        }
530

531
        // update provider network spec
532
        pn, newPn = newPn, nil
×
533
        if excludeNodes := util.RemoveString(pn.Spec.ExcludeNodes, node); len(excludeNodes) != len(pn.Spec.ExcludeNodes) {
×
534
                newPn = pn.DeepCopy()
×
535
                newPn.Spec.ExcludeNodes = excludeNodes
×
536
        }
×
537

538
        var changed bool
×
539
        customInterfaces := make([]kubeovnv1.CustomInterface, 0, len(pn.Spec.CustomInterfaces))
×
540
        for _, ci := range pn.Spec.CustomInterfaces {
×
541
                nodes := util.RemoveString(ci.Nodes, node)
×
542
                if !changed {
×
543
                        changed = len(nodes) == 0 || len(nodes) != len(ci.Nodes)
×
544
                }
×
545
                if len(nodes) != 0 {
×
546
                        customInterfaces = append(customInterfaces, kubeovnv1.CustomInterface{Interface: ci.Interface, Nodes: nodes})
×
547
                }
×
548
        }
549
        if changed {
×
550
                newPn = pn.DeepCopy()
×
551
                newPn.Spec.CustomInterfaces = customInterfaces
×
552
        }
×
553
        if newPn != nil {
×
554
                if _, err := c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().Update(context.Background(), newPn, metav1.UpdateOptions{}); err != nil {
×
555
                        klog.Errorf("failed to update provider network %s: %v", pn.Name, err)
×
556
                        return err
×
557
                }
×
558
        }
559

560
        return nil
×
561
}
562

563
func (c *Controller) handleUpdateNode(key string) error {
×
564
        c.nodeKeyMutex.LockKey(key)
×
565
        defer func() { _ = c.nodeKeyMutex.UnlockKey(key) }()
×
566
        klog.Infof("handle update node %s", key)
×
567

×
568
        node, err := c.nodesLister.Get(key)
×
569
        if err != nil {
×
570
                if k8serrors.IsNotFound(err) {
×
571
                        return nil
×
572
                }
×
573
                klog.Errorf("failed to get node %s: %v", key, err)
×
574
                return err
×
575
        }
576

577
        if err = c.handleNodeAnnotationsForProviderNetworks(node); err != nil {
×
578
                klog.Errorf("failed to handle annotations of node %s for provider networks: %v", node.Name, err)
×
579
                return err
×
580
        }
×
581

582
        subnets, err := c.subnetsLister.List(labels.Everything())
×
583
        if err != nil {
×
584
                klog.Errorf("failed to get subnets %v", err)
×
585
                return err
×
586
        }
×
587

588
        if err := c.UpdateChassisTag(node); err != nil {
×
589
                klog.Errorf("failed to update chassis tag for node %s: %v", node.Name, err)
×
590
                return err
×
591
        }
×
592
        if err := c.retryDelDupChassis(util.ChassisRetryMaxTimes, util.ChassisControllerRetryInterval, c.cleanDuplicatedChassis, node); err != nil {
×
593
                klog.Errorf("failed to clean duplicated chassis for node %s: %v", node.Name, err)
×
594
                return err
×
595
        }
×
596

597
        for _, cachedSubnet := range subnets {
×
598
                if cachedSubnet.Spec.GatewayType == kubeovnv1.GWDistributedType {
×
599
                        // we need to reconcile ovn route for subnets with distributed gateway mode,
×
600
                        // since the informer node cache may not be synced before the subnet reconcile triggered by node addition
×
601
                        c.addOrUpdateSubnetQueue.Add(cachedSubnet.Name)
×
602
                        continue
×
603
                }
604
                subnet := cachedSubnet.DeepCopy()
×
605
                if util.GatewayContains(subnet.Spec.GatewayNode, node.Name) {
×
606
                        if err := c.reconcileOvnDefaultVpcRoute(subnet); err != nil {
×
607
                                klog.Error(err)
×
608
                                return err
×
609
                        }
×
610
                }
611
        }
612

613
        return nil
×
614
}
615

616
func (c *Controller) checkSubnetGateway() {
×
617
        if err := c.checkSubnetGatewayNode(); err != nil {
×
618
                klog.Errorf("failed to check subnet gateway node: %v", err)
×
619
        }
×
620
}
621

622
func (c *Controller) checkSubnetGatewayNode() error {
×
623
        klog.V(3).Infoln("start to check subnet gateway node")
×
624
        subnetList, err := c.subnetsLister.List(labels.Everything())
×
625
        if err != nil {
×
626
                klog.Errorf("failed to list subnets: %v", err)
×
627
                return err
×
628
        }
×
629
        nodes, err := c.nodesLister.List(labels.Everything())
×
630
        if err != nil {
×
631
                klog.Errorf("failed to list nodes: %v", err)
×
632
                return err
×
633
        }
×
634

635
        for _, subnet := range subnetList {
×
636
                if (subnet.Spec.Vlan != "" && (subnet.Spec.U2OInterconnection || !subnet.Spec.LogicalGateway)) ||
×
637
                        subnet.Spec.Vpc != c.config.ClusterRouter ||
×
638
                        subnet.Name == c.config.NodeSwitch ||
×
639
                        subnet.Spec.GatewayNode == "" ||
×
640
                        subnet.Spec.GatewayType != kubeovnv1.GWCentralizedType ||
×
641
                        !subnet.Spec.EnableEcmp {
×
642
                        continue
×
643
                }
644
                gwNodes := strings.Split(subnet.Spec.GatewayNode, ",")
×
645
                if len(gwNodes) < 2 {
×
646
                        continue
×
647
                }
648

649
                for cidrBlock := range strings.SplitSeq(subnet.Spec.CIDRBlock, ",") {
×
650
                        nextHops, nameIPMap, err := c.getPolicyRouteParas(cidrBlock, util.GatewayRouterPolicyPriority)
×
651
                        if err != nil {
×
652
                                klog.Errorf("failed to get ecmp policy route paras for subnet %s: %v", subnet.Name, err)
×
653
                                continue
×
654
                        }
655
                        for _, node := range nodes {
×
656
                                ipStr := node.Annotations[util.IPAddressAnnotation]
×
657
                                for ip := range strings.SplitSeq(ipStr, ",") {
×
658
                                        if util.CheckProtocol(cidrBlock) != util.CheckProtocol(ip) {
×
659
                                                continue
×
660
                                        }
661

662
                                        exist := nameIPMap[node.Name] == ip
×
663
                                        if util.GatewayContains(subnet.Spec.GatewayNode, node.Name) {
×
664
                                                pinger, err := goping.NewPinger(ip)
×
665
                                                if err != nil {
×
666
                                                        return fmt.Errorf("failed to init pinger, %w", err)
×
667
                                                }
×
668
                                                pinger.SetPrivileged(true)
×
669

×
670
                                                count := 5
×
671
                                                pinger.Count = count
×
672
                                                pinger.Timeout = time.Duration(count) * time.Second
×
673
                                                pinger.Interval = 1 * time.Second
×
674

×
675
                                                var pingSucceeded bool
×
676
                                                pinger.OnRecv = func(_ *goping.Packet) {
×
677
                                                        pingSucceeded = true
×
678
                                                        pinger.Stop()
×
679
                                                }
×
680
                                                if err = pinger.Run(); err != nil {
×
681
                                                        klog.Errorf("failed to run pinger for destination %s: %v", ip, err)
×
682
                                                        return err
×
683
                                                }
×
684

685
                                                nodeIsReady := nodeReady(node)
×
686
                                                if !pingSucceeded || !nodeIsReady {
×
687
                                                        if exist {
×
688
                                                                if !pingSucceeded {
×
689
                                                                        klog.Warningf("failed to ping %s ip %s on node %s", util.NodeNic, ip, node.Name)
×
690
                                                                }
×
691
                                                                if !nodeIsReady {
×
692
                                                                        klog.Warningf("node %s is not ready", node.Name)
×
693
                                                                }
×
694
                                                                klog.Warningf("delete ecmp policy route for node %s ip %s", node.Name, ip)
×
695
                                                                nextHops.Remove(ip)
×
696
                                                                delete(nameIPMap, node.Name)
×
697
                                                                klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
698
                                                                if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
699
                                                                        klog.Errorf("failed to delete ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
×
700
                                                                        return err
×
701
                                                                }
×
702
                                                        }
703
                                                } else {
×
704
                                                        klog.V(3).Infof("succeeded to ping %s ip %s on node %s", util.NodeNic, ip, node.Name)
×
705
                                                        if !exist {
×
706
                                                                nextHops.Add(ip)
×
707
                                                                if nameIPMap == nil {
×
708
                                                                        nameIPMap = make(map[string]string, 1)
×
709
                                                                }
×
710
                                                                nameIPMap[node.Name] = ip
×
711
                                                                klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
712
                                                                if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
713
                                                                        klog.Errorf("failed to add ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
×
714
                                                                        return err
×
715
                                                                }
×
716
                                                        }
717
                                                }
718
                                        } else if exist {
×
719
                                                klog.Infof("subnet %s gateway nodes does not contain node %s, delete policy route for node ip %s", subnet.Name, node.Name, ip)
×
720
                                                nextHops.Remove(ip)
×
721
                                                delete(nameIPMap, node.Name)
×
722
                                                klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
723
                                                if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
724
                                                        klog.Errorf("failed to delete ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
×
725
                                                        return err
×
726
                                                }
×
727
                                        }
728
                                }
729
                        }
730
                }
731
        }
732
        return nil
×
733
}
734

735
func (c *Controller) cleanDuplicatedChassis(node *v1.Node) error {
×
736
        // if multi chassis has the same node name, delete all of them
×
737
        var err error
×
738
        if _, err := c.OVNSbClient.GetChassisByHost(node.Name); err == nil {
×
739
                return nil
×
740
        }
×
741
        klog.Errorf("failed to get chassis for node %s: %v", node.Name, err)
×
742
        if errors.Is(err, ovs.ErrOneNodeMultiChassis) {
×
743
                klog.Warningf("node %s has multiple chassis", node.Name)
×
744
                if err := c.OVNSbClient.DeleteChassisByHost(node.Name); err != nil {
×
745
                        klog.Errorf("failed to delete chassis for node %s: %v", node.Name, err)
×
746
                        return err
×
747
                }
×
748
        }
749
        return err
×
750
}
751

752
func (c *Controller) retryDelDupChassis(attempts, sleep int, f func(node *v1.Node) error, node *v1.Node) (err error) {
×
753
        i := 0
×
754
        for ; ; i++ {
×
755
                err = f(node)
×
756
                if err == nil {
×
757
                        return err
×
758
                }
×
759
                klog.Errorf("failed to delete duplicated chassis for node %s: %v", node.Name, err)
×
760
                if i >= (attempts - 1) {
×
761
                        break
×
762
                }
763
                time.Sleep(time.Duration(sleep) * time.Second)
×
764
        }
765
        if i >= (attempts - 1) {
×
766
                errMsg := errors.New("exhausting all attempts")
×
767
                klog.Error(errMsg)
×
768
                return errMsg
×
769
        }
×
770
        klog.V(3).Infof("finish check chassis")
×
771
        return nil
×
772
}
773

774
func (c *Controller) fetchPodsOnNode(nodeName string, pods []*v1.Pod) ([]string, error) {
×
775
        ports := make([]string, 0, len(pods))
×
776
        for _, pod := range pods {
×
777
                if !isPodAlive(pod) || pod.Spec.HostNetwork || pod.Spec.NodeName != nodeName {
×
778
                        continue
×
779
                }
780

781
                if pod.Annotations[util.LogicalRouterAnnotation] != c.config.ClusterRouter {
×
782
                        subnetName := pod.Annotations[util.LogicalSwitchAnnotation]
×
783
                        if subnetName == "" {
×
784
                                klog.V(4).Infof("Pod %s/%s is not on cluster router and has no logical switch annotation, skipping for VLAN check.", pod.Namespace, pod.Name)
×
785
                                continue
×
786
                        }
787

788
                        subnet, err := c.subnetsLister.Get(subnetName)
×
789
                        if err != nil {
×
790
                                klog.Errorf("failed to get subnet %s: %v", subnetName, err)
×
791
                                return nil, err
×
792
                        }
×
793

794
                        if subnet.Spec.Vlan == "" {
×
795
                                continue
×
796
                        }
797
                }
798

799
                podName := c.getNameByPod(pod)
×
800

×
801
                podNets, err := c.getPodKubeovnNets(pod)
×
802
                if err != nil {
×
803
                        klog.Errorf("failed to get pod nets %v", err)
×
804
                        return nil, err
×
805
                }
×
806

807
                for _, podNet := range podNets {
×
808
                        if !isOvnSubnet(podNet.Subnet) {
×
809
                                continue
×
810
                        }
811

812
                        if pod.Annotations != nil && pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)] == "true" {
×
813
                                ports = append(ports, ovs.PodNameToPortName(podName, pod.Namespace, podNet.ProviderName))
×
814
                        }
×
815
                }
816
        }
817
        return ports, nil
×
818
}
819

820
func (c *Controller) CheckNodePortGroup() {
×
821
        if err := c.checkAndUpdateNodePortGroup(); err != nil {
×
822
                klog.Errorf("check node port group status: %v", err)
×
823
        }
×
824
}
825

826
func (c *Controller) checkAndUpdateNodePortGroup() error {
×
827
        klog.V(3).Infoln("start to check node port-group status")
×
828
        var networkPolicyExists bool
×
829
        if c.config.EnableNP {
×
830
                np, _ := c.npsLister.List(labels.Everything())
×
831
                networkPolicyExists = len(np) != 0
×
832
        }
×
833

834
        nodes, err := c.nodesLister.List(labels.Everything())
×
835
        if err != nil {
×
836
                klog.Errorf("list nodes: %v", err)
×
837
                return err
×
838
        }
×
839

840
        pods, err := c.podsLister.List(labels.Everything())
×
841
        if err != nil {
×
842
                klog.Errorf("list pods, %v", err)
×
843
                return err
×
844
        }
×
845

846
        for _, node := range nodes {
×
847
                // The port-group should already created when add node
×
848
                pgName := strings.ReplaceAll(node.Annotations[util.PortNameAnnotation], "-", ".")
×
849

×
850
                // use join IP only when no internal IP exists
×
851
                nodeIPv4, nodeIPv6 := util.GetNodeInternalIP(*node)
×
852
                joinIP := node.Annotations[util.IPAddressAnnotation]
×
853
                joinIPv4, joinIPv6 := util.SplitStringIP(joinIP)
×
854
                if nodeIPv4 == "" {
×
855
                        nodeIPv4 = joinIPv4
×
856
                }
×
857
                if nodeIPv6 == "" {
×
858
                        nodeIPv6 = joinIPv6
×
859
                }
×
860
                nodeIP := strings.Trim(fmt.Sprintf("%s,%s", nodeIPv4, nodeIPv6), ",")
×
861

×
862
                nodePorts, err := c.fetchPodsOnNode(node.Name, pods)
×
863
                if err != nil {
×
864
                        klog.Errorf("fetch pods for node %v: %v", node.Name, err)
×
865
                        return err
×
866
                }
×
867

868
                if err = c.OVNNbClient.PortGroupSetPorts(pgName, nodePorts); err != nil {
×
869
                        klog.Errorf("failed to set ports of port group %s: %v", pgName, err)
×
870
                        return err
×
871
                }
×
872

873
                if networkPolicyExists {
×
874
                        if err := c.OVNNbClient.CreateNodeACL(pgName, nodeIP, joinIP); err != nil {
×
875
                                klog.Errorf("create node acl for node pg %s: %v", pgName, err)
×
876
                        }
×
877
                } else {
×
878
                        // clear all acl
×
879
                        if err = c.OVNNbClient.DeleteAcls(pgName, portGroupKey, "", nil); err != nil {
×
880
                                klog.Errorf("delete node acl for node pg %s: %v", pgName, err)
×
881
                        }
×
882
                }
883
        }
884

885
        return nil
×
886
}
887

888
func (c *Controller) UpdateChassisTag(node *v1.Node) error {
×
889
        annoChassisName := node.Annotations[util.ChassisAnnotation]
×
890
        if annoChassisName == "" {
×
891
                // kube-ovn-cni not ready to set chassis
×
892
                return nil
×
893
        }
×
894
        chassis, err := c.OVNSbClient.GetChassis(annoChassisName, true)
×
895
        if err != nil {
×
896
                klog.Errorf("failed to get chassis %s for node %s: %v", annoChassisName, node.Name, err)
×
897
                return err
×
898
        }
×
899
        if chassis == nil {
×
900
                klog.Infof("chassis %q not registered for node %s, do chassis gc once", annoChassisName, node.Name)
×
901
                // chassis name conflict, do GC
×
902
                if err = c.gcChassis(); err != nil {
×
903
                        klog.Errorf("failed to gc chassis: %v", err)
×
904
                        return err
×
905
                }
×
906
                err = &ErrChassisNotFound{Chassis: annoChassisName, Node: node.Name}
×
907
                klog.Error(err)
×
908
                return err
×
909
        }
910

911
        if chassis.ExternalIDs == nil || chassis.ExternalIDs["vendor"] != util.CniTypeName {
×
912
                klog.Infof("init tag %s for node %s chassis %s", util.CniTypeName, node.Name, chassis.Name)
×
913
                if err = c.OVNSbClient.UpdateChassisTag(chassis.Name, node.Name); err != nil {
×
914
                        err := fmt.Errorf("failed to init chassis tag, %w", err)
×
915
                        klog.Error(err)
×
916
                        return err
×
917
                }
×
918
        }
919
        return nil
×
920
}
921

922
func (c *Controller) addNodeGatewayStaticRoute() error {
×
923
        // If user not manage static route for default vpc, just add route about ovn-default to join
×
924
        if vpc, err := c.vpcsLister.Get(c.config.ClusterRouter); err != nil || vpc.Spec.StaticRoutes != nil {
×
925
                existRoute, err := c.OVNNbClient.ListLogicalRouterStaticRoutes(c.config.ClusterRouter, nil, nil, "", nil)
×
926
                if err != nil {
×
927
                        klog.Errorf("failed to get vpc %s static route list, %v", c.config.ClusterRouter, err)
×
928
                }
×
929
                if len(existRoute) != 0 {
×
930
                        klog.Infof("skip add static route for node gw")
×
931
                        return nil
×
932
                }
×
933
        }
934
        dstCidr := "0.0.0.0/0,::/0"
×
935
        for cidrBlock := range strings.SplitSeq(dstCidr, ",") {
×
936
                for nextHop := range strings.SplitSeq(c.config.NodeSwitchGateway, ",") {
×
937
                        if util.CheckProtocol(cidrBlock) != util.CheckProtocol(nextHop) {
×
938
                                continue
×
939
                        }
940

941
                        if err := c.addStaticRouteToVpc(
×
942
                                c.config.ClusterRouter,
×
943
                                &kubeovnv1.StaticRoute{
×
944
                                        Policy:     kubeovnv1.PolicyDst,
×
945
                                        CIDR:       cidrBlock,
×
946
                                        NextHopIP:  nextHop,
×
947
                                        RouteTable: util.MainRouteTable,
×
948
                                },
×
949
                        ); err != nil {
×
950
                                klog.Errorf("failed to add static route for node gw: %v", err)
×
951
                                return err
×
952
                        }
×
953
                }
954
        }
955
        return nil
×
956
}
957

958
func (c *Controller) getPolicyRouteParas(cidr string, priority int) (*strset.Set, map[string]string, error) {
×
959
        ipSuffix := "ip4"
×
960
        if util.CheckProtocol(cidr) == kubeovnv1.ProtocolIPv6 {
×
961
                ipSuffix = "ip6"
×
962
        }
×
963
        match := fmt.Sprintf("%s.src == %s", ipSuffix, cidr)
×
964
        policyList, err := c.OVNNbClient.GetLogicalRouterPolicy(c.config.ClusterRouter, priority, match, true)
×
965
        if err != nil {
×
966
                klog.Errorf("failed to get logical router policy: %v", err)
×
967
                return nil, nil, err
×
968
        }
×
969
        if len(policyList) == 0 {
×
970
                return strset.New(), map[string]string{}, nil
×
971
        }
×
972
        return strset.New(policyList[0].Nexthops...), policyList[0].ExternalIDs, nil
×
973
}
974

975
func (c *Controller) deletePolicyRouteForNode(nodeName, portName string) error {
×
976
        subnets, err := c.subnetsLister.List(labels.Everything())
×
977
        if err != nil {
×
978
                klog.Errorf("get subnets: %v", err)
×
979
                return err
×
980
        }
×
981

982
        addresses := c.ipam.GetPodAddress(portName)
×
983
        for _, addr := range addresses {
×
984
                if addr.IP == "" {
×
985
                        continue
×
986
                }
987
                klog.Infof("deleting logical router policy with nexthop %q from %s for node %s", addr.IP, c.config.ClusterRouter, nodeName)
×
988
                if err = c.OVNNbClient.DeleteLogicalRouterPolicyByNexthop(c.config.ClusterRouter, util.NodeRouterPolicyPriority, addr.IP); err != nil {
×
989
                        klog.Errorf("failed to delete logical router policy with nexthop %q from %s for node %s: %v", addr.IP, c.config.ClusterRouter, nodeName, err)
×
990
                        return err
×
991
                }
×
992
        }
993

994
        for _, subnet := range subnets {
×
995
                if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) || subnet.Spec.Vpc != c.config.ClusterRouter || subnet.Name == c.config.NodeSwitch {
×
996
                        continue
×
997
                }
998

999
                if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType {
×
1000
                        pgName := getOverlaySubnetsPortGroupName(subnet.Name, nodeName)
×
1001
                        if err = c.OVNNbClient.DeletePortGroup(pgName); err != nil {
×
1002
                                klog.Errorf("delete port group for subnet %s and node %s: %v", subnet.Name, nodeName, err)
×
1003
                                return err
×
1004
                        }
×
1005

1006
                        klog.Infof("delete policy route for distributed subnet %s, node %s", subnet.Name, nodeName)
×
1007
                        if err = c.deletePolicyRouteForDistributedSubnet(subnet, nodeName); err != nil {
×
1008
                                klog.Errorf("delete policy route for subnet %s and node %s: %v", subnet.Name, nodeName, err)
×
1009
                                return err
×
1010
                        }
×
1011
                }
1012

1013
                if subnet.Spec.GatewayType == kubeovnv1.GWCentralizedType {
×
1014
                        if subnet.Spec.EnableEcmp {
×
1015
                                for cidrBlock := range strings.SplitSeq(subnet.Spec.CIDRBlock, ",") {
×
1016
                                        nextHops, nameIPMap, err := c.getPolicyRouteParas(cidrBlock, util.GatewayRouterPolicyPriority)
×
1017
                                        if err != nil {
×
1018
                                                klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
×
1019
                                                continue
×
1020
                                        }
1021

1022
                                        exist := false
×
1023
                                        if _, ok := nameIPMap[nodeName]; ok {
×
1024
                                                exist = true
×
1025
                                        }
×
1026

1027
                                        if exist {
×
1028
                                                nextHops.Remove(nameIPMap[nodeName])
×
1029
                                                delete(nameIPMap, nodeName)
×
1030

×
1031
                                                if nextHops.Size() == 0 {
×
1032
                                                        klog.Infof("delete policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
1033
                                                        if err := c.deletePolicyRouteForCentralizedSubnet(subnet); err != nil {
×
1034
                                                                klog.Errorf("failed to delete policy route for centralized subnet %s, %v", subnet.Name, err)
×
1035
                                                                return err
×
1036
                                                        }
×
1037
                                                } else {
×
1038
                                                        klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
1039
                                                        if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
1040
                                                                klog.Errorf("failed to update policy route for subnet %s on node %s, %v", subnet.Name, nodeName, err)
×
1041
                                                                return err
×
1042
                                                        }
×
1043
                                                }
1044
                                        }
1045
                                }
1046
                        } else {
×
1047
                                klog.Infof("reconcile policy route for centralized subnet %s", subnet.Name)
×
1048
                                if err := c.reconcileDefaultCentralizedSubnetRouteInDefaultVpc(subnet); err != nil {
×
1049
                                        klog.Errorf("failed to delete policy route for centralized subnet %s, %v", subnet.Name, err)
×
1050
                                        return err
×
1051
                                }
×
1052
                        }
1053
                }
1054
        }
1055
        return nil
×
1056
}
1057

1058
func (c *Controller) addPolicyRouteForCentralizedSubnetOnNode(nodeName, nodeIP string) error {
×
1059
        subnets, err := c.subnetsLister.List(labels.Everything())
×
1060
        if err != nil {
×
1061
                klog.Errorf("failed to get subnets %v", err)
×
1062
                return err
×
1063
        }
×
1064

1065
        for _, subnet := range subnets {
×
1066
                if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) || subnet.Spec.Vpc != c.config.ClusterRouter || subnet.Name == c.config.NodeSwitch || subnet.Spec.GatewayType != kubeovnv1.GWCentralizedType {
×
1067
                        continue
×
1068
                }
1069

1070
                if subnet.Spec.EnableEcmp {
×
1071
                        if !util.GatewayContains(subnet.Spec.GatewayNode, nodeName) {
×
1072
                                continue
×
1073
                        }
1074

1075
                        for nextHop := range strings.SplitSeq(nodeIP, ",") {
×
1076
                                for cidrBlock := range strings.SplitSeq(subnet.Spec.CIDRBlock, ",") {
×
1077
                                        if util.CheckProtocol(cidrBlock) != util.CheckProtocol(nextHop) {
×
1078
                                                continue
×
1079
                                        }
1080

1081
                                        nextHops, nameIPMap, err := c.getPolicyRouteParas(cidrBlock, util.GatewayRouterPolicyPriority)
×
1082
                                        if err != nil {
×
1083
                                                klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
×
1084
                                                continue
×
1085
                                        }
1086
                                        if nameIPMap[nodeName] == nextHop {
×
1087
                                                continue
×
1088
                                        }
1089

1090
                                        nextHops.Add(nextHop)
×
1091
                                        if nameIPMap == nil {
×
1092
                                                nameIPMap = make(map[string]string, 1)
×
1093
                                        }
×
1094
                                        nameIPMap[nodeName] = nextHop
×
1095
                                        klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
1096
                                        if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
1097
                                                klog.Errorf("failed to update policy route for subnet %s on node %s, %v", subnet.Name, nodeName, err)
×
1098
                                                return err
×
1099
                                        }
×
1100
                                }
1101
                        }
1102
                } else {
×
1103
                        if subnet.Status.ActivateGateway != nodeName {
×
1104
                                continue
×
1105
                        }
1106
                        klog.Infof("add policy route for centralized subnet %s, on node %s, ip %s", subnet.Name, nodeName, nodeIP)
×
1107
                        if err = c.addPolicyRouteForCentralizedSubnet(subnet, nodeName, nil, strings.Split(nodeIP, ",")); err != nil {
×
1108
                                klog.Errorf("failed to add active-backup policy route for centralized subnet %s: %v", subnet.Name, err)
×
1109
                                return err
×
1110
                        }
×
1111
                }
1112
        }
1113
        return nil
×
1114
}
1115

1116
func (c *Controller) addPolicyRouteForLocalDNSCacheOnNode(dnsIPs []string, nodePortName, nodeIP, nodeName string, af int) error {
×
1117
        if len(dnsIPs) == 0 {
×
1118
                return c.deletePolicyRouteForLocalDNSCacheOnNode(nodeName, af)
×
1119
        }
×
1120

1121
        var (
×
1122
                externalIDs = map[string]string{
×
1123
                        "vendor":          util.CniTypeName,
×
1124
                        "node":            nodeName,
×
1125
                        "address-family":  strconv.Itoa(af),
×
1126
                        "isLocalDnsCache": "true",
×
1127
                }
×
1128
                pgAs     = strings.ReplaceAll(fmt.Sprintf("%s_ip%d", nodePortName, af), "-", ".")
×
1129
                action   = kubeovnv1.PolicyRouteActionReroute
×
1130
                nextHops = []string{nodeIP}
×
1131
        )
×
1132
        matches := strset.NewWithSize(len(dnsIPs))
×
1133
        for _, ip := range dnsIPs {
×
1134
                matches.Add(fmt.Sprintf("ip%d.src == $%s && ip%d.dst == %s", af, pgAs, af, ip))
×
1135
        }
×
1136

1137
        policies, err := c.OVNNbClient.GetLogicalRouterPoliciesByExtID(c.config.ClusterRouter, "node", nodeName)
×
1138
        if err != nil {
×
1139
                klog.Errorf("failed to list logical router policies with external-ids:node = %q: %v", nodeName, err)
×
1140
                return err
×
1141
        }
×
1142

1143
        for _, policy := range policies {
×
1144
                if len(policy.ExternalIDs) == 0 || policy.ExternalIDs["vendor"] != util.CniTypeName || policy.ExternalIDs["isLocalDnsCache"] != "true" {
×
1145
                        continue
×
1146
                }
1147
                if policy.Priority == util.NodeRouterPolicyPriority && policy.Action == string(action) && slices.Equal(policy.Nexthops, nextHops) && matches.Has(policy.Match) {
×
1148
                        matches.Remove(policy.Match)
×
1149
                        continue
×
1150
                }
1151
                // delete unused policy router policy
1152
                klog.Infof("deleting logical router policy by UUID %s", policy.UUID)
×
1153
                if err = c.OVNNbClient.DeleteLogicalRouterPolicyByUUID(c.config.ClusterRouter, policy.UUID); err != nil {
×
1154
                        klog.Errorf("failed to delete logical router policy by UUID %s: %v", policy.UUID, err)
×
1155
                        return err
×
1156
                }
×
1157
        }
1158

1159
        for _, match := range matches.List() {
×
1160
                klog.Infof("add node local dns cache policy route for router %s: match %q, action %q, nexthop %q, externalID %v", c.config.ClusterRouter, match, action, nodeIP, externalIDs)
×
1161
                if err := c.addPolicyRouteToVpc(
×
1162
                        c.config.ClusterRouter,
×
1163
                        &kubeovnv1.PolicyRoute{
×
1164
                                Priority:  util.NodeRouterPolicyPriority,
×
1165
                                Match:     match,
×
1166
                                Action:    action,
×
1167
                                NextHopIP: nodeIP,
×
1168
                        },
×
1169
                        externalIDs,
×
1170
                ); err != nil {
×
1171
                        klog.Errorf("failed to add logical router policy for node %s: %v", nodeName, err)
×
1172
                        return err
×
1173
                }
×
1174
        }
1175

1176
        return nil
×
1177
}
1178

1179
func (c *Controller) deletePolicyRouteForLocalDNSCacheOnNode(nodeName string, af int) error {
×
1180
        policies, err := c.OVNNbClient.ListLogicalRouterPolicies(c.config.ClusterRouter, -1, map[string]string{
×
1181
                "vendor":          util.CniTypeName,
×
1182
                "node":            nodeName,
×
1183
                "address-family":  strconv.Itoa(af),
×
1184
                "isLocalDnsCache": "true",
×
1185
        }, true)
×
1186
        if err != nil {
×
1187
                klog.Errorf("failed to list logical router policies: %v", err)
×
1188
                return err
×
1189
        }
×
1190
        if len(policies) == 0 {
×
1191
                return nil
×
1192
        }
×
1193

1194
        for _, policy := range policies {
×
1195
                klog.Infof("delete node local dns cache policy route for router %s with match %s", c.config.ClusterRouter, policy.Match)
×
1196

×
1197
                if err := c.OVNNbClient.DeleteLogicalRouterPolicyByUUID(c.config.ClusterRouter, policy.UUID); err != nil {
×
1198
                        klog.Errorf("failed to delete policy route for node local dns in router %s with match %s: %v", c.config.ClusterRouter, policy.Match, err)
×
1199
                        return err
×
1200
                }
×
1201
        }
1202
        return nil
×
1203
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc