• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 23428559357

23 Mar 2026 08:41AM UTC coverage: 23.792% (+0.07%) from 23.72%
23428559357

push

github

web-flow
fix(controller): remove dead code in retryDelDupChassis (#6503)

The loop only exits via break when i >= attempts-1, making the
post-loop condition always true and the "finish check chassis"
path unreachable. Simplify to a standard for-range loop.

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

0 of 9 new or added lines in 1 file covered. (0.0%)

53 existing lines in 1 file now uncovered.

13123 of 55157 relevant lines covered (23.79%)

0.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

12.39
/pkg/controller/node.go
1
package controller
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "maps"
8
        "reflect"
9
        "slices"
10
        "strconv"
11
        "strings"
12
        "time"
13

14
        goping "github.com/prometheus-community/pro-bing"
15
        "github.com/scylladb/go-set/strset"
16
        v1 "k8s.io/api/core/v1"
17
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
18
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19
        "k8s.io/apimachinery/pkg/labels"
20
        "k8s.io/apimachinery/pkg/types"
21
        "k8s.io/client-go/tools/cache"
22
        "k8s.io/klog/v2"
23

24
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
25
        "github.com/kubeovn/kube-ovn/pkg/ovs"
26
        "github.com/kubeovn/kube-ovn/pkg/util"
27
)
28

29
type ErrChassisNotFound struct {
30
        Chassis string
31
        Node    string
32
}
33

34
func (e *ErrChassisNotFound) Error() string {
×
35
        return fmt.Sprintf("chassis %s not found for node %s", e.Chassis, e.Node)
×
36
}
×
37

38
func (c *Controller) enqueueAddNode(obj any) {
×
39
        key := cache.MetaObjectToName(obj.(*v1.Node)).String()
×
40
        klog.V(3).Infof("enqueue add node %s", key)
×
41
        c.addNodeQueue.Add(key)
×
42
}
×
43

44
func nodeReady(node *v1.Node) bool {
×
45
        var ready, networkUnavailable bool
×
46
        for _, c := range node.Status.Conditions {
×
47
                switch c.Type {
×
48
                case v1.NodeReady:
×
49
                        ready = c.Status == v1.ConditionTrue
×
50
                case v1.NodeNetworkUnavailable:
×
51
                        networkUnavailable = c.Status == v1.ConditionTrue
×
52
                }
53
        }
54
        return ready && !networkUnavailable
×
55
}
56

57
func kubeOvnAnnotationsChanged(oldAnnotations, newAnnotations map[string]string) bool {
1✔
58
        filterKubeOvnAnnotations := func(annotations map[string]string) map[string]string {
2✔
59
                filtered := make(map[string]string)
1✔
60
                for key, value := range annotations {
2✔
61
                        if strings.Contains(key, ".kubernetes.io/") {
2✔
62
                                filtered[key] = value
1✔
63
                        }
1✔
64
                }
65
                return filtered
1✔
66
        }
67

68
        return !maps.Equal(filterKubeOvnAnnotations(oldAnnotations), filterKubeOvnAnnotations(newAnnotations))
1✔
69
}
70

71
func (c *Controller) enqueueUpdateNode(oldObj, newObj any) {
×
72
        oldNode := oldObj.(*v1.Node)
×
73
        newNode := newObj.(*v1.Node)
×
74

×
75
        if nodeReady(oldNode) != nodeReady(newNode) ||
×
76
                kubeOvnAnnotationsChanged(oldNode.Annotations, newNode.Annotations) ||
×
77
                !reflect.DeepEqual(oldNode.Labels, newNode.Labels) {
×
78
                key := cache.MetaObjectToName(newNode).String()
×
79
                if len(newNode.Annotations) == 0 || newNode.Annotations[util.AllocatedAnnotation] != "true" {
×
80
                        klog.V(3).Infof("enqueue add node %s", key)
×
81
                        c.addNodeQueue.Add(key)
×
82
                } else {
×
83
                        klog.V(3).Infof("enqueue update node %s", key)
×
84
                        c.updateNodeQueue.Add(key)
×
85
                }
×
86
        }
87
}
88

89
func (c *Controller) enqueueDeleteNode(obj any) {
×
90
        var node *v1.Node
×
91
        switch t := obj.(type) {
×
92
        case *v1.Node:
×
93
                node = t
×
94
        case cache.DeletedFinalStateUnknown:
×
95
                n, ok := t.Obj.(*v1.Node)
×
96
                if !ok {
×
97
                        klog.Warningf("unexpected object type: %T", t.Obj)
×
98
                        return
×
99
                }
×
100
                node = n
×
101
        default:
×
102
                klog.Warningf("unexpected type: %T", obj)
×
103
                return
×
104
        }
105

106
        key := cache.MetaObjectToName(node).String()
×
107
        klog.V(3).Infof("enqueue delete node %s", key)
×
108
        c.deletingNodeObjMap.Store(key, node)
×
109
        c.deleteNodeQueue.Add(key)
×
110
}
111

112
func nodeUnderlayAddressSetName(node string, af int) string {
×
113
        return fmt.Sprintf("node_%s_underlay_v%d", strings.ReplaceAll(node, "-", "_"), af)
×
114
}
×
115

116
func (c *Controller) handleAddNode(key string) error {
×
117
        c.nodeKeyMutex.LockKey(key)
×
118
        defer func() { _ = c.nodeKeyMutex.UnlockKey(key) }()
×
119

120
        cachedNode, err := c.nodesLister.Get(key)
×
121
        if err != nil {
×
122
                if k8serrors.IsNotFound(err) {
×
123
                        return nil
×
124
                }
×
125
                klog.Errorf("failed to get node %s: %v", key, err)
×
126
                return err
×
127
        }
128
        node := cachedNode.DeepCopy()
×
129
        klog.Infof("handle add node %s", node.Name)
×
130

×
131
        subnets, err := c.subnetsLister.List(labels.Everything())
×
132
        if err != nil {
×
133
                klog.Errorf("failed to list subnets: %v", err)
×
134
                return err
×
135
        }
×
136

137
        nodeIPv4, nodeIPv6 := util.GetNodeInternalIP(*node)
×
138
        for _, subnet := range subnets {
×
139
                if subnet.Spec.Vpc != c.config.ClusterRouter {
×
140
                        continue
×
141
                }
142

143
                v4, v6 := util.SplitStringIP(subnet.Spec.CIDRBlock)
×
144
                if subnet.Spec.Vlan == "" && (util.CIDRContainIP(v4, nodeIPv4) || util.CIDRContainIP(v6, nodeIPv6)) {
×
145
                        msg := fmt.Sprintf("internal IP address of node %s is in CIDR of subnet %s, this may result in network issues", node.Name, subnet.Name)
×
146
                        klog.Warning(msg)
×
147
                        c.recorder.Eventf(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: node.Name, UID: types.UID(node.Name)}}, v1.EventTypeWarning, "NodeAddressConflictWithSubnet", msg)
×
148
                        break
×
149
                }
150
        }
151

152
        if err = c.handleNodeAnnotationsForProviderNetworks(node); err != nil {
×
153
                klog.Errorf("failed to handle annotations of node %s for provider networks: %v", node.Name, err)
×
154
                return err
×
155
        }
×
156

157
        subnet, err := c.subnetsLister.Get(c.config.NodeSwitch)
×
158
        if err != nil {
×
159
                klog.Errorf("failed to get node subnet: %v", err)
×
160
                return err
×
161
        }
×
162

163
        var v4IP, v6IP, mac string
×
164

×
165
        portName := util.NodeLspName(key)
×
166
        if node.Annotations[util.AllocatedAnnotation] == "true" && node.Annotations[util.IPAddressAnnotation] != "" && node.Annotations[util.MacAddressAnnotation] != "" {
×
167
                macStr := node.Annotations[util.MacAddressAnnotation]
×
168
                v4IP, v6IP, mac, err = c.ipam.GetStaticAddress(portName, portName, node.Annotations[util.IPAddressAnnotation],
×
169
                        &macStr, node.Annotations[util.LogicalSwitchAnnotation], true)
×
170
                if err != nil {
×
171
                        klog.Errorf("failed to alloc static ip addrs for node %v: %v", node.Name, err)
×
172
                        return err
×
173
                }
×
174
        } else {
×
175
                v4IP, v6IP, mac, err = c.ipam.GetRandomAddress(portName, portName, nil, c.config.NodeSwitch, "", nil, true)
×
176
                if err != nil {
×
177
                        klog.Errorf("failed to alloc random ip addrs for node %v: %v", node.Name, err)
×
178
                        return err
×
179
                }
×
180

181
                // Clean up potentially existing logical switch ports to avoid leftover issues from previous failed configurations
182
                if err := c.OVNNbClient.DeleteLogicalSwitchPort(portName); err != nil {
×
183
                        klog.Errorf("failed to delete stale logical switch port %s: %v", portName, err)
×
184
                        return err
×
185
                }
×
186
                klog.Infof("deleted stale logical switch port %s", portName)
×
187
        }
188

189
        ipStr := util.GetStringIP(v4IP, v6IP)
×
190
        if err := c.OVNNbClient.CreateBareLogicalSwitchPort(c.config.NodeSwitch, portName, ipStr, mac); err != nil {
×
191
                klog.Errorf("failed to create logical switch port %s: %v", portName, err)
×
192
                return err
×
193
        }
×
194

195
        for ip := range strings.SplitSeq(ipStr, ",") {
×
196
                if ip == "" {
×
197
                        continue
×
198
                }
199

200
                nodeIP, af := nodeIPv4, 4
×
201
                protocol := util.CheckProtocol(ip)
×
202
                if protocol == kubeovnv1.ProtocolIPv6 {
×
203
                        nodeIP, af = nodeIPv6, 6
×
204
                }
×
205
                if nodeIP != "" {
×
206
                        var (
×
207
                                match       = fmt.Sprintf("ip%d.dst == %s", af, nodeIP)
×
208
                                action      = kubeovnv1.PolicyRouteActionReroute
×
209
                                externalIDs = map[string]string{
×
210
                                        "vendor":         util.CniTypeName,
×
211
                                        "node":           node.Name,
×
212
                                        "address-family": strconv.Itoa(af),
×
213
                                }
×
214
                        )
×
215
                        klog.Infof("add policy route for router: %s, match %s, action %s, nexthop %s, externalID %v", c.config.ClusterRouter, match, action, ip, externalIDs)
×
216
                        if err = c.addPolicyRouteToVpc(
×
217
                                c.config.ClusterRouter,
×
218
                                &kubeovnv1.PolicyRoute{
×
219
                                        Priority:  util.NodeRouterPolicyPriority,
×
220
                                        Match:     match,
×
221
                                        Action:    action,
×
222
                                        NextHopIP: ip,
×
223
                                },
×
224
                                externalIDs,
×
225
                        ); err != nil {
×
226
                                klog.Errorf("failed to add logical router policy for node %s: %v", node.Name, err)
×
227
                                return err
×
228
                        }
×
229

230
                        dnsIPs := make([]string, 0, len(c.config.NodeLocalDNSIPs))
×
231
                        for _, ip := range c.config.NodeLocalDNSIPs {
×
232
                                if util.CheckProtocol(ip) == protocol {
×
233
                                        dnsIPs = append(dnsIPs, ip)
×
234
                                }
×
235
                        }
236

237
                        if err = c.addPolicyRouteForLocalDNSCacheOnNode(dnsIPs, portName, ip, node.Name, af); err != nil {
×
238
                                klog.Errorf("failed to add policy route for node %s: %v", node.Name, err)
×
239
                                return err
×
240
                        }
×
241
                }
242
        }
243

244
        patch := util.KVPatch{
×
245
                util.IPAddressAnnotation:     ipStr,
×
246
                util.MacAddressAnnotation:    mac,
×
247
                util.CidrAnnotation:          subnet.Spec.CIDRBlock,
×
248
                util.GatewayAnnotation:       subnet.Spec.Gateway,
×
249
                util.LogicalSwitchAnnotation: c.config.NodeSwitch,
×
250
                util.AllocatedAnnotation:     "true",
×
251
                util.PortNameAnnotation:      portName,
×
252
        }
×
253
        if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
254
                klog.Errorf("failed to update annotations of node %s: %v", node.Name, err)
×
255
                return err
×
256
        }
×
257

258
        if err := c.createOrUpdateIPCR("", "", ipStr, mac, c.config.NodeSwitch, "", node.Name, ""); err != nil {
×
259
                klog.Errorf("failed to create or update IPs %s: %v", portName, err)
×
260
                return err
×
261
        }
×
262

263
        for _, subnet := range subnets {
×
264
                if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) || subnet.Spec.Vpc != c.config.ClusterRouter || subnet.Name == c.config.NodeSwitch || subnet.Spec.GatewayType != kubeovnv1.GWDistributedType {
×
265
                        continue
×
266
                }
267
                if err = c.createPortGroupForDistributedSubnet(node, subnet); err != nil {
×
268
                        klog.Errorf("failed to create port group for node %s and subnet %s: %v", node.Name, subnet.Name, err)
×
269
                        return err
×
270
                }
×
271
        }
272
        c.distributedSubnetNeedSync.Store(true)
×
273

×
274
        // ovn acl doesn't support address_set name with '-', so replace '-' by '.'
×
275
        pgName := strings.ReplaceAll(portName, "-", ".")
×
276
        if err = c.OVNNbClient.CreatePortGroup(pgName, map[string]string{"node": node.Name, networkPolicyKey: "node" + "/" + key}); err != nil {
×
277
                klog.Errorf("create port group %s for node %s: %v", pgName, key, err)
×
278
                return err
×
279
        }
×
280

281
        if err := c.addPolicyRouteForCentralizedSubnetOnNode(node, ipStr); err != nil {
×
282
                klog.Errorf("failed to add policy route for node %s, %v", key, err)
×
283
                return err
×
284
        }
×
285

286
        if err := c.UpdateChassisTag(node); err != nil {
×
287
                klog.Errorf("failed to update chassis tag for node %s: %v", node.Name, err)
×
288
                return err
×
289
        }
×
290

291
        if err := c.retryDelDupChassis(util.ChassisRetryMaxTimes, util.ChassisControllerRetryInterval, c.cleanDuplicatedChassis, node); err != nil {
×
292
                klog.Errorf("failed to clean duplicated chassis for node %s: %v", node.Name, err)
×
293
                return err
×
294
        }
×
295
        return nil
×
296
}
297

298
func (c *Controller) handleNodeAnnotationsForProviderNetworks(node *v1.Node) error {
×
299
        providerNetworks, err := c.providerNetworksLister.List(labels.Everything())
×
300
        if err != nil && !k8serrors.IsNotFound(err) {
×
301
                klog.Errorf("failed to list provider networks: %v", err)
×
302
                return err
×
303
        }
×
304

305
        for _, pn := range providerNetworks {
×
306
                excludeAnno := fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name)
×
307
                interfaceAnno := fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)
×
308

×
309
                var newPn *kubeovnv1.ProviderNetwork
×
310
                excluded, err := util.IsNodeExcludedFromProviderNetwork(node, pn)
×
311
                if err != nil {
×
312
                        klog.Error(err)
×
313
                        return err
×
314
                }
×
315

316
                // Handle node annotation for exclusion (only when nodeSelector is not set)
317
                if !excluded && pn.Spec.NodeSelector == nil && len(node.Annotations) != 0 && node.Annotations[excludeAnno] == "true" {
×
318
                        newPn = pn.DeepCopy()
×
319
                        newPn.Spec.ExcludeNodes = append(newPn.Spec.ExcludeNodes, node.Name)
×
320
                        excluded = true
×
321
                }
×
322

323
                var customInterface string
×
324
                for _, v := range pn.Spec.CustomInterfaces {
×
325
                        if slices.Contains(v.Nodes, node.Name) {
×
326
                                customInterface = v.Interface
×
327
                                break
×
328
                        }
329
                }
330
                if customInterface == "" && len(node.Annotations) != 0 {
×
331
                        if customInterface = node.Annotations[interfaceAnno]; customInterface != "" {
×
332
                                if newPn == nil {
×
333
                                        newPn = pn.DeepCopy()
×
334
                                }
×
335
                                var index *int
×
336
                                for i := range newPn.Spec.CustomInterfaces {
×
337
                                        if newPn.Spec.CustomInterfaces[i].Interface == customInterface {
×
338
                                                index = &i
×
339
                                                break
×
340
                                        }
341
                                }
342
                                if index != nil {
×
343
                                        newPn.Spec.CustomInterfaces[*index].Nodes = append(newPn.Spec.CustomInterfaces[*index].Nodes, node.Name)
×
344
                                } else {
×
345
                                        ci := kubeovnv1.CustomInterface{Interface: customInterface, Nodes: []string{node.Name}}
×
346
                                        newPn.Spec.CustomInterfaces = append(newPn.Spec.CustomInterfaces, ci)
×
347
                                }
×
348
                        }
349
                }
350

351
                if newPn != nil {
×
352
                        if newPn, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().Update(context.Background(), newPn, metav1.UpdateOptions{}); err != nil {
×
353
                                klog.Errorf("failed to update provider network %s: %v", pn.Name, err)
×
354
                                return err
×
355
                        }
×
356
                }
357

358
                if len(node.Annotations) != 0 {
×
359
                        patch := util.KVPatch{excludeAnno: nil, interfaceAnno: nil}
×
360
                        if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
361
                                klog.Errorf("failed to patch node %s: %v", node.Name, err)
×
362
                                return err
×
363
                        }
×
364
                }
365

366
                if excluded {
×
367
                        if newPn == nil {
×
368
                                newPn = pn.DeepCopy()
×
369
                        } else {
×
370
                                newPn = newPn.DeepCopy()
×
371
                        }
×
372

373
                        if newPn.Status.EnsureNodeStandardConditions(node.Name) {
×
374
                                _, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().UpdateStatus(context.Background(), newPn, metav1.UpdateOptions{})
×
375
                                if err != nil {
×
376
                                        klog.Errorf("failed to update status of provider network %s: %v", pn.Name, err)
×
377
                                        return err
×
378
                                }
×
379
                        }
380
                }
381
        }
382

383
        return nil
×
384
}
385

386
func (c *Controller) handleDeleteNode(key string) (err error) {
×
387
        c.nodeKeyMutex.LockKey(key)
×
388
        defer func() {
×
389
                _ = c.nodeKeyMutex.UnlockKey(key)
×
390
                if err == nil {
×
391
                        c.deletingNodeObjMap.Delete(key)
×
392
                }
×
393
        }()
394
        klog.Infof("handle delete node %s", key)
×
395

×
396
        node, ok := c.deletingNodeObjMap.Load(key)
×
397
        if !ok {
×
398
                return nil
×
399
        }
×
400
        n, _ := c.nodesLister.Get(key)
×
401
        if n != nil && n.UID != node.UID {
×
402
                klog.Warningf("Node %s is adding, skip the node delete handler, but it may leave some gc resources behind", key)
×
403
                return nil
×
404
        }
×
405
        return c.deleteNode(key)
×
406
}
407

408
func (c *Controller) deleteNode(key string) error {
×
409
        portName := util.NodeLspName(key)
×
410
        klog.Infof("delete logical switch port %s", portName)
×
411
        if err := c.OVNNbClient.DeleteLogicalSwitchPort(portName); err != nil {
×
412
                klog.Errorf("failed to delete node switch port %s: %v", portName, err)
×
413
                return err
×
414
        }
×
415
        if err := c.OVNSbClient.DeleteChassisByHost(key); err != nil {
×
416
                klog.Errorf("failed to delete chassis for node %s: %v", key, err)
×
417
                return err
×
418
        }
×
419

420
        for _, af := range [...]int{4, 6} {
×
421
                if err := c.deletePolicyRouteForLocalDNSCacheOnNode(key, af); err != nil {
×
422
                        klog.Error(err)
×
423
                        return err
×
424
                }
×
425
        }
426

427
        // ovn acl doesn't support address_set name with '-', so replace '-' by '.'
428
        pgName := strings.ReplaceAll(portName, "-", ".")
×
429
        if err := c.OVNNbClient.DeletePortGroup(pgName); err != nil {
×
430
                klog.Errorf("delete port group %s for node: %v", portName, err)
×
431
                return err
×
432
        }
×
433

434
        if err := c.deletePolicyRouteForNode(key, portName); err != nil {
×
435
                klog.Errorf("failed to delete policy route for node %s: %v", key, err)
×
436
                return err
×
437
        }
×
438

439
        if err := c.OVNNbClient.DeleteAddressSet(nodeUnderlayAddressSetName(key, 4)); err != nil {
×
440
                klog.Errorf("failed to delete address set for node %s: %v", key, err)
×
441
                return err
×
442
        }
×
443
        if err := c.OVNNbClient.DeleteAddressSet(nodeUnderlayAddressSetName(key, 6)); err != nil {
×
444
                klog.Errorf("failed to delete address set for node %s: %v", key, err)
×
445
                return err
×
446
        }
×
447

448
        klog.Infof("release node port %s", portName)
×
449
        c.ipam.ReleaseAddressByPod(portName, c.config.NodeSwitch)
×
450

×
451
        providerNetworks, err := c.providerNetworksLister.List(labels.Everything())
×
452
        if err != nil && !k8serrors.IsNotFound(err) {
×
453
                klog.Errorf("failed to list provider networks: %v", err)
×
454
                return err
×
455
        }
×
456

457
        for _, pn := range providerNetworks {
×
458
                if err = c.updateProviderNetworkForNodeDeletion(pn, key); err != nil {
×
459
                        klog.Error(err)
×
460
                        return err
×
461
                }
×
462
        }
463
        klog.Infof("delete node ip %s", portName)
×
464
        if err = c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), portName, metav1.DeleteOptions{}); err != nil && !k8serrors.IsNotFound(err) {
×
465
                return err
×
466
        }
×
467

468
        return nil
×
469
}
470

471
func (c *Controller) updateProviderNetworkForNodeDeletion(pn *kubeovnv1.ProviderNetwork, node string) error {
×
472
        // update provider network status
×
473
        var needUpdate bool
×
474
        newPn := pn.DeepCopy()
×
475
        if slices.Contains(newPn.Status.ReadyNodes, node) {
×
476
                newPn.Status.ReadyNodes = util.RemoveString(newPn.Status.ReadyNodes, node)
×
477
                needUpdate = true
×
478
        }
×
479
        if newPn.Status.RemoveNodeConditions(node) {
×
480
                needUpdate = true
×
481
        }
×
482
        if needUpdate {
×
483
                var err error
×
484
                newPn, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().UpdateStatus(context.Background(), newPn, metav1.UpdateOptions{})
×
485
                if err != nil {
×
486
                        klog.Errorf("failed to update status of provider network %s: %v", pn.Name, err)
×
487
                        return err
×
488
                }
×
489
        }
490

491
        // update provider network spec
492
        pn, newPn = newPn, nil
×
493
        if excludeNodes := util.RemoveString(pn.Spec.ExcludeNodes, node); len(excludeNodes) != len(pn.Spec.ExcludeNodes) {
×
494
                newPn = pn.DeepCopy()
×
495
                newPn.Spec.ExcludeNodes = excludeNodes
×
496
        }
×
497

498
        var changed bool
×
499
        customInterfaces := make([]kubeovnv1.CustomInterface, 0, len(pn.Spec.CustomInterfaces))
×
500
        for _, ci := range pn.Spec.CustomInterfaces {
×
501
                nodes := util.RemoveString(ci.Nodes, node)
×
502
                if !changed {
×
503
                        changed = len(nodes) == 0 || len(nodes) != len(ci.Nodes)
×
504
                }
×
505
                if len(nodes) != 0 {
×
506
                        customInterfaces = append(customInterfaces, kubeovnv1.CustomInterface{Interface: ci.Interface, Nodes: nodes})
×
507
                }
×
508
        }
509
        if changed {
×
510
                newPn = pn.DeepCopy()
×
511
                newPn.Spec.CustomInterfaces = customInterfaces
×
512
        }
×
513
        if newPn != nil {
×
514
                if _, err := c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().Update(context.Background(), newPn, metav1.UpdateOptions{}); err != nil {
×
515
                        klog.Errorf("failed to update provider network %s: %v", pn.Name, err)
×
516
                        return err
×
517
                }
×
518
        }
519

520
        return nil
×
521
}
522

523
func (c *Controller) handleUpdateNode(key string) error {
×
524
        c.nodeKeyMutex.LockKey(key)
×
525
        defer func() { _ = c.nodeKeyMutex.UnlockKey(key) }()
×
526
        klog.Infof("handle update node %s", key)
×
527

×
528
        node, err := c.nodesLister.Get(key)
×
529
        if err != nil {
×
530
                if k8serrors.IsNotFound(err) {
×
531
                        return nil
×
532
                }
×
533
                klog.Errorf("failed to get node %s: %v", key, err)
×
534
                return err
×
535
        }
536

537
        if err = c.handleNodeAnnotationsForProviderNetworks(node); err != nil {
×
538
                klog.Errorf("failed to handle annotations of node %s for provider networks: %v", node.Name, err)
×
539
                return err
×
540
        }
×
541

542
        subnets, err := c.subnetsLister.List(labels.Everything())
×
543
        if err != nil {
×
544
                klog.Errorf("failed to get subnets %v", err)
×
545
                return err
×
546
        }
×
547

548
        if err := c.UpdateChassisTag(node); err != nil {
×
549
                klog.Errorf("failed to update chassis tag for node %s: %v", node.Name, err)
×
550
                return err
×
551
        }
×
552
        if err := c.retryDelDupChassis(util.ChassisRetryMaxTimes, util.ChassisControllerRetryInterval, c.cleanDuplicatedChassis, node); err != nil {
×
553
                klog.Errorf("failed to clean duplicated chassis for node %s: %v", node.Name, err)
×
554
                return err
×
555
        }
×
556

557
        c.distributedSubnetNeedSync.Store(true)
×
558

×
559
        for _, cachedSubnet := range subnets {
×
560
                if cachedSubnet.Spec.GatewayType != kubeovnv1.GWCentralizedType {
×
561
                        continue
×
562
                }
563

564
                // For subnets using GatewayNodeSelectors, always trigger reconciliation
565
                // when node labels change, since the node might have been added or removed
566
                // from the gateway list
567
                if cachedSubnet.Spec.GatewayNode == "" && len(cachedSubnet.Spec.GatewayNodeSelectors) > 0 {
×
568
                        c.addOrUpdateSubnetQueue.Add(cachedSubnet.Name)
×
569
                        continue
×
570
                }
571

572
                if util.GatewayContains(cachedSubnet.Spec.GatewayNode, node.Name) {
×
573
                        if err := c.reconcileOvnDefaultVpcRoute(cachedSubnet); err != nil {
×
574
                                klog.Error(err)
×
575
                                return err
×
576
                        }
×
577
                }
578
        }
579

580
        return nil
×
581
}
582

583
func (c *Controller) syncDistributedSubnetRoutes() {
×
584
        if !c.distributedSubnetNeedSync.Swap(false) {
×
585
                return
×
586
        }
×
587

588
        klog.V(3).Infoln("start to sync distributed subnet routes")
×
589
        subnets, err := c.subnetsLister.List(labels.Everything())
×
590
        if err != nil {
×
591
                klog.Errorf("failed to list subnets: %v", err)
×
592
                c.distributedSubnetNeedSync.Store(true)
×
593
                return
×
594
        }
×
595

596
        for _, subnet := range subnets {
×
597
                if subnet.Spec.Vpc != c.config.ClusterRouter ||
×
598
                        subnet.Name == c.config.NodeSwitch ||
×
599
                        subnet.Spec.GatewayType != kubeovnv1.GWDistributedType ||
×
600
                        (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) {
×
601
                        continue
×
602
                }
603
                if err := c.reconcileDistributedSubnetRouteInDefaultVpc(subnet); err != nil {
×
604
                        klog.Errorf("failed to reconcile distributed subnet %s route: %v", subnet.Name, err)
×
605
                        c.distributedSubnetNeedSync.Store(true)
×
606
                }
×
607
        }
608
}
609

610
func (c *Controller) checkSubnetGateway() {
×
611
        if err := c.checkSubnetGatewayNode(); err != nil {
×
612
                klog.Errorf("failed to check subnet gateway node: %v", err)
×
613
        }
×
614
}
615

616
func (c *Controller) checkSubnetGatewayNode() error {
×
617
        klog.V(3).Infoln("start to check subnet gateway node")
×
618
        subnetList, err := c.subnetsLister.List(labels.Everything())
×
619
        if err != nil {
×
620
                klog.Errorf("failed to list subnets: %v", err)
×
621
                return err
×
622
        }
×
623
        nodes, err := c.nodesLister.List(labels.Everything())
×
624
        if err != nil {
×
625
                klog.Errorf("failed to list nodes: %v", err)
×
626
                return err
×
627
        }
×
628

629
        for _, subnet := range subnetList {
×
630
                if (subnet.Spec.Vlan != "" && (subnet.Spec.U2OInterconnection || !subnet.Spec.LogicalGateway)) ||
×
631
                        subnet.Spec.Vpc != c.config.ClusterRouter ||
×
632
                        subnet.Name == c.config.NodeSwitch ||
×
633
                        subnet.Spec.GatewayNode == "" ||
×
634
                        subnet.Spec.GatewayType != kubeovnv1.GWCentralizedType ||
×
635
                        !subnet.Spec.EnableEcmp {
×
636
                        continue
×
637
                }
638
                gwNodes := strings.Split(subnet.Spec.GatewayNode, ",")
×
639
                if len(gwNodes) < 2 {
×
640
                        continue
×
641
                }
642

643
                for cidrBlock := range strings.SplitSeq(subnet.Spec.CIDRBlock, ",") {
×
644
                        nextHops, nameIPMap, err := c.getPolicyRouteParams(cidrBlock, util.GatewayRouterPolicyPriority)
×
645
                        if err != nil {
×
646
                                klog.Errorf("failed to get ecmp policy route paras for subnet %s: %v", subnet.Name, err)
×
647
                                continue
×
648
                        }
649
                        for _, node := range nodes {
×
650
                                ipStr := node.Annotations[util.IPAddressAnnotation]
×
651
                                for ip := range strings.SplitSeq(ipStr, ",") {
×
652
                                        if util.CheckProtocol(cidrBlock) != util.CheckProtocol(ip) {
×
653
                                                continue
×
654
                                        }
655

656
                                        exist := nameIPMap[node.Name] == ip
×
657
                                        if util.GatewayContains(subnet.Spec.GatewayNode, node.Name) {
×
658
                                                pinger, err := goping.NewPinger(ip)
×
659
                                                if err != nil {
×
660
                                                        return fmt.Errorf("failed to init pinger, %w", err)
×
661
                                                }
×
662
                                                pinger.SetPrivileged(true)
×
663

×
664
                                                count := 5
×
665
                                                pinger.Count = count
×
666
                                                pinger.Timeout = time.Duration(count) * time.Second
×
667
                                                pinger.Interval = 1 * time.Second
×
668

×
669
                                                var pingSucceeded bool
×
670
                                                pinger.OnRecv = func(_ *goping.Packet) {
×
671
                                                        pingSucceeded = true
×
672
                                                        pinger.Stop()
×
673
                                                }
×
674
                                                if err = pinger.Run(); err != nil {
×
675
                                                        klog.Errorf("failed to run pinger for destination %s: %v", ip, err)
×
676
                                                        return err
×
677
                                                }
×
678

679
                                                nodeIsReady := nodeReady(node)
×
680
                                                if !pingSucceeded || !nodeIsReady {
×
681
                                                        if exist {
×
682
                                                                if !pingSucceeded {
×
683
                                                                        klog.Warningf("failed to ping %s ip %s on node %s", util.NodeNic, ip, node.Name)
×
684
                                                                }
×
685
                                                                if !nodeIsReady {
×
686
                                                                        klog.Warningf("node %s is not ready", node.Name)
×
687
                                                                }
×
688
                                                                klog.Warningf("delete ecmp policy route for node %s ip %s", node.Name, ip)
×
689
                                                                nextHops.Remove(ip)
×
690
                                                                delete(nameIPMap, node.Name)
×
691
                                                                klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
692
                                                                if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
693
                                                                        klog.Errorf("failed to delete ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
×
694
                                                                        return err
×
695
                                                                }
×
696
                                                        }
697
                                                } else {
×
698
                                                        klog.V(3).Infof("succeeded to ping %s ip %s on node %s", util.NodeNic, ip, node.Name)
×
699
                                                        if !exist {
×
700
                                                                nextHops.Add(ip)
×
701
                                                                if nameIPMap == nil {
×
702
                                                                        nameIPMap = make(map[string]string, 1)
×
703
                                                                }
×
704
                                                                nameIPMap[node.Name] = ip
×
705
                                                                klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
706
                                                                if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
707
                                                                        klog.Errorf("failed to add ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
×
708
                                                                        return err
×
709
                                                                }
×
710
                                                        }
711
                                                }
712
                                        } else if exist {
×
713
                                                klog.Infof("subnet %s gateway nodes does not contain node %s, delete policy route for node ip %s", subnet.Name, node.Name, ip)
×
714
                                                nextHops.Remove(ip)
×
715
                                                delete(nameIPMap, node.Name)
×
716
                                                klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
717
                                                if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
718
                                                        klog.Errorf("failed to delete ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
×
719
                                                        return err
×
720
                                                }
×
721
                                        }
722
                                }
723
                        }
724
                }
725
        }
726
        return nil
×
727
}
728

729
func (c *Controller) cleanDuplicatedChassis(node *v1.Node) error {
1✔
730
        // if multi chassis has the same node name, delete all of them
1✔
731
        _, err := c.OVNSbClient.GetChassisByHost(node.Name)
1✔
732
        if err == nil {
2✔
733
                return nil
1✔
734
        }
1✔
735

736
        if !errors.Is(err, ovs.ErrOneNodeMultiChassis) {
2✔
737
                klog.Errorf("failed to get chassis for node %s: %v", node.Name, err)
1✔
738
                return err
1✔
739
        }
1✔
740

741
        klog.Warningf("node %s has multiple chassis, deleting all", node.Name)
1✔
742
        if err := c.OVNSbClient.DeleteChassisByHost(node.Name); err != nil {
2✔
743
                klog.Errorf("failed to delete chassis for node %s: %v", node.Name, err)
1✔
744
                return err
1✔
745
        }
1✔
746
        return nil
1✔
747
}
748

NEW
749
func (c *Controller) retryDelDupChassis(attempts, sleep int, f func(node *v1.Node) error, node *v1.Node) error {
×
NEW
750
        for i := range attempts {
×
NEW
751
                err := f(node)
×
752
                if err == nil {
×
NEW
753
                        return nil
×
754
                }
×
755
                klog.Errorf("failed to delete duplicated chassis for node %s: %v", node.Name, err)
×
NEW
756
                if i < attempts-1 {
×
NEW
757
                        time.Sleep(time.Duration(sleep) * time.Second)
×
758
                }
×
759
        }
NEW
760
        errMsg := errors.New("exhausting all attempts")
×
NEW
761
        klog.Error(errMsg)
×
NEW
762
        return errMsg
×
763
}
764

765
func (c *Controller) fetchPodsOnNode(nodeName string, pods []*v1.Pod) ([]string, error) {
1✔
766
        ports := make([]string, 0, len(pods))
1✔
767
        for _, pod := range pods {
1✔
768
                if pod.Spec.HostNetwork || pod.Spec.NodeName != nodeName || !isPodAlive(pod) {
×
769
                        continue
×
770
                }
771

772
                if pod.Annotations[util.LogicalRouterAnnotation] != c.config.ClusterRouter {
×
773
                        subnetName := pod.Annotations[util.LogicalSwitchAnnotation]
×
774
                        if subnetName == "" {
×
775
                                klog.V(4).Infof("Pod %s/%s is not on cluster router and has no logical switch annotation, skipping for VLAN check.", pod.Namespace, pod.Name)
×
776
                                continue
×
777
                        }
778

779
                        subnet, err := c.subnetsLister.Get(subnetName)
×
780
                        if err != nil {
×
781
                                klog.Errorf("failed to get subnet %s: %v", subnetName, err)
×
782
                                return nil, err
×
783
                        }
×
784

785
                        if subnet.Spec.Vlan == "" {
×
786
                                continue
×
787
                        }
788
                }
789

790
                podName := c.getNameByPod(pod)
×
791

×
792
                podNets, err := c.getPodKubeovnNets(pod)
×
793
                if err != nil {
×
794
                        klog.Errorf("failed to get pod nets %v", err)
×
795
                        return nil, err
×
796
                }
×
797

798
                for _, podNet := range podNets {
×
799
                        if !isOvnSubnet(podNet.Subnet) {
×
800
                                continue
×
801
                        }
802

803
                        if pod.Annotations != nil && pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)] == "true" {
×
804
                                ports = append(ports, ovs.PodNameToPortName(podName, pod.Namespace, podNet.ProviderName))
×
805
                        }
×
806
                }
807
        }
808
        return ports, nil
1✔
809
}
810

811
func (c *Controller) CheckNodePortGroup() {
×
812
        if err := c.checkAndUpdateNodePortGroup(); err != nil {
×
813
                klog.Errorf("check node port group status: %v", err)
×
814
        }
×
815
}
816

817
func (c *Controller) checkAndUpdateNodePortGroup() error {
1✔
818
        klog.V(3).Infoln("start to check node port-group status")
1✔
819
        var networkPolicyExists bool
1✔
820
        if c.config.EnableNP {
2✔
821
                np, err := c.npsLister.List(labels.Everything())
1✔
822
                if err != nil {
2✔
823
                        klog.Errorf("failed to list network policies: %v", err)
1✔
824
                        return err
1✔
825
                }
1✔
826
                networkPolicyExists = len(np) != 0
×
827
        }
828

829
        nodes, err := c.nodesLister.List(labels.Everything())
1✔
830
        if err != nil {
1✔
831
                klog.Errorf("list nodes: %v", err)
×
832
                return err
×
833
        }
×
834

835
        pods, err := c.podsLister.List(labels.Everything())
1✔
836
        if err != nil {
1✔
837
                klog.Errorf("list pods, %v", err)
×
838
                return err
×
839
        }
×
840

841
        for _, node := range nodes {
2✔
842
                // The port-group should already created when add node
1✔
843
                pgName := strings.ReplaceAll(node.Annotations[util.PortNameAnnotation], "-", ".")
1✔
844
                if pgName == "" {
2✔
845
                        klog.V(2).Infof("node %s does not have port name annotation, skip port group update", node.Name)
1✔
846
                        continue
1✔
847
                }
848

849
                // use join IP only when no internal IP exists
850
                nodeIPv4, nodeIPv6 := util.GetNodeInternalIP(*node)
1✔
851
                joinIP := node.Annotations[util.IPAddressAnnotation]
1✔
852
                joinIPv4, joinIPv6 := util.SplitStringIP(joinIP)
1✔
853
                if nodeIPv4 == "" {
1✔
854
                        nodeIPv4 = joinIPv4
×
855
                }
×
856
                if nodeIPv6 == "" {
2✔
857
                        nodeIPv6 = joinIPv6
1✔
858
                }
1✔
859
                nodeIP := strings.Trim(fmt.Sprintf("%s,%s", nodeIPv4, nodeIPv6), ",")
1✔
860

1✔
861
                nodePorts, err := c.fetchPodsOnNode(node.Name, pods)
1✔
862
                if err != nil {
1✔
863
                        klog.Errorf("fetch pods for node %v: %v", node.Name, err)
×
864
                        return err
×
865
                }
×
866

867
                if err = c.OVNNbClient.PortGroupSetPorts(pgName, nodePorts); err != nil {
1✔
868
                        klog.Errorf("failed to set ports of port group %s: %v", pgName, err)
×
869
                        return err
×
870
                }
×
871

872
                if networkPolicyExists {
1✔
873
                        if err := c.OVNNbClient.CreateNodeACL(pgName, nodeIP, joinIP); err != nil {
×
874
                                klog.Errorf("create node acl for node pg %s: %v", pgName, err)
×
875
                        }
×
876
                } else {
1✔
877
                        // clear all acl
1✔
878
                        if err = c.OVNNbClient.DeleteAcls(pgName, portGroupKey, "", nil); err != nil {
1✔
UNCOV
879
                                klog.Errorf("delete node acl for node pg %s: %v", pgName, err)
×
880
                        }
×
881
                }
882
        }
883

884
        return nil
1✔
885
}
886

887
func (c *Controller) UpdateChassisTag(node *v1.Node) error {
×
888
        annoChassisName := node.Annotations[util.ChassisAnnotation]
×
889
        if annoChassisName == "" {
×
890
                // kube-ovn-cni not ready to set chassis
×
891
                return nil
×
892
        }
×
893
        chassis, err := c.OVNSbClient.GetChassis(annoChassisName, true)
×
894
        if err != nil {
×
895
                klog.Errorf("failed to get chassis %s for node %s: %v", annoChassisName, node.Name, err)
×
896
                return err
×
897
        }
×
898
        if chassis == nil {
×
899
                klog.Infof("chassis %q not registered for node %s, do chassis gc once", annoChassisName, node.Name)
×
900
                // chassis name conflict, do GC
×
901
                if err = c.gcChassis(); err != nil {
×
902
                        klog.Errorf("failed to gc chassis: %v", err)
×
903
                        return err
×
UNCOV
904
                }
×
UNCOV
905
                err = &ErrChassisNotFound{Chassis: annoChassisName, Node: node.Name}
×
906
                klog.Error(err)
×
907
                return err
×
908
        }
909

910
        if chassis.ExternalIDs == nil || chassis.ExternalIDs["vendor"] != util.CniTypeName {
×
911
                klog.Infof("init tag %s for node %s chassis %s", util.CniTypeName, node.Name, chassis.Name)
×
912
                if err = c.OVNSbClient.UpdateChassisTag(chassis.Name, node.Name); err != nil {
×
UNCOV
913
                        err := fmt.Errorf("failed to init chassis tag, %w", err)
×
914
                        klog.Error(err)
×
UNCOV
915
                        return err
×
UNCOV
916
                }
×
917
        }
918
        return nil
×
919
}
920

921
func (c *Controller) getPolicyRouteParams(cidr string, priority int) (*strset.Set, map[string]string, error) {
1✔
922
        ipSuffix := "ip4"
1✔
923
        if util.CheckProtocol(cidr) == kubeovnv1.ProtocolIPv6 {
1✔
924
                ipSuffix = "ip6"
×
925
        }
×
926
        match := fmt.Sprintf("%s.src == %s", ipSuffix, cidr)
1✔
927
        policyList, err := c.OVNNbClient.GetLogicalRouterPolicy(c.config.ClusterRouter, priority, match, true)
1✔
928
        if err != nil {
1✔
929
                klog.Errorf("failed to get logical router policy: %v", err)
×
930
                return nil, nil, err
×
931
        }
×
932
        if len(policyList) == 0 {
1✔
UNCOV
933
                return strset.New(), map[string]string{}, nil
×
934
        }
×
935
        return strset.New(policyList[0].Nexthops...), maps.Clone(policyList[0].ExternalIDs), nil
1✔
936
}
937

938
func (c *Controller) deletePolicyRouteForNode(nodeName, portName string) error {
×
939
        subnets, err := c.subnetsLister.List(labels.Everything())
×
UNCOV
940
        if err != nil {
×
941
                klog.Errorf("get subnets: %v", err)
×
942
                return err
×
943
        }
×
944

UNCOV
945
        addresses := c.ipam.GetPodAddress(portName)
×
946
        for _, addr := range addresses {
×
947
                if addr.IP == "" {
×
948
                        continue
×
949
                }
950
                klog.Infof("deleting logical router policy with nexthop %q from %s for node %s", addr.IP, c.config.ClusterRouter, nodeName)
×
UNCOV
951
                if err = c.OVNNbClient.DeleteLogicalRouterPolicyByNexthop(c.config.ClusterRouter, util.NodeRouterPolicyPriority, addr.IP); err != nil {
×
UNCOV
952
                        klog.Errorf("failed to delete logical router policy with nexthop %q from %s for node %s: %v", addr.IP, c.config.ClusterRouter, nodeName, err)
×
953
                        return err
×
954
                }
×
955
        }
956

UNCOV
957
        for _, subnet := range subnets {
×
958
                if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) || subnet.Spec.Vpc != c.config.ClusterRouter || subnet.Name == c.config.NodeSwitch {
×
959
                        continue
×
960
                }
961

962
                if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType {
×
963
                        pgName := getOverlaySubnetsPortGroupName(subnet.Name, nodeName)
×
UNCOV
964
                        if err = c.OVNNbClient.DeletePortGroup(pgName); err != nil {
×
965
                                klog.Errorf("delete port group for subnet %s and node %s: %v", subnet.Name, nodeName, err)
×
966
                                return err
×
967
                        }
×
968

969
                        klog.Infof("delete policy route for distributed subnet %s, node %s", subnet.Name, nodeName)
×
UNCOV
970
                        if err = c.deletePolicyRouteForDistributedSubnet(subnet, nodeName); err != nil {
×
UNCOV
971
                                klog.Errorf("delete policy route for subnet %s and node %s: %v", subnet.Name, nodeName, err)
×
972
                                return err
×
973
                        }
×
974
                }
975

976
                if subnet.Spec.GatewayType == kubeovnv1.GWCentralizedType {
×
977
                        if subnet.Spec.EnableEcmp {
×
978
                                for cidrBlock := range strings.SplitSeq(subnet.Spec.CIDRBlock, ",") {
×
UNCOV
979
                                        nextHops, nameIPMap, err := c.getPolicyRouteParams(cidrBlock, util.GatewayRouterPolicyPriority)
×
UNCOV
980
                                        if err != nil {
×
981
                                                klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
×
982
                                                continue
×
983
                                        }
984

UNCOV
985
                                        exist := false
×
986
                                        if _, ok := nameIPMap[nodeName]; ok {
×
987
                                                exist = true
×
988
                                        }
×
989

990
                                        if exist {
×
991
                                                nextHops.Remove(nameIPMap[nodeName])
×
992
                                                delete(nameIPMap, nodeName)
×
993

×
994
                                                if nextHops.Size() == 0 {
×
995
                                                        klog.Infof("delete policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
996
                                                        if err := c.deletePolicyRouteForCentralizedSubnet(subnet); err != nil {
×
997
                                                                klog.Errorf("failed to delete policy route for centralized subnet %s, %v", subnet.Name, err)
×
998
                                                                return err
×
999
                                                        }
×
1000
                                                } else {
×
1001
                                                        klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
UNCOV
1002
                                                        if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
UNCOV
1003
                                                                klog.Errorf("failed to update policy route for subnet %s on node %s, %v", subnet.Name, nodeName, err)
×
UNCOV
1004
                                                                return err
×
1005
                                                        }
×
1006
                                                }
1007
                                        }
1008
                                }
1009
                        } else {
×
1010
                                klog.Infof("reconcile policy route for centralized subnet %s", subnet.Name)
×
UNCOV
1011
                                if err := c.reconcileDefaultCentralizedSubnetRouteInDefaultVpc(subnet); err != nil {
×
UNCOV
1012
                                        klog.Errorf("failed to delete policy route for centralized subnet %s, %v", subnet.Name, err)
×
UNCOV
1013
                                        return err
×
1014
                                }
×
1015
                        }
1016
                }
1017
        }
1018
        return nil
×
1019
}
1020

1021
func (c *Controller) addPolicyRouteForCentralizedSubnetOnNode(node *v1.Node, nodeIP string) error {
×
1022
        subnets, err := c.subnetsLister.List(labels.Everything())
×
UNCOV
1023
        if err != nil {
×
1024
                klog.Errorf("failed to get subnets %v", err)
×
1025
                return err
×
1026
        }
×
1027

1028
        for _, subnet := range subnets {
×
1029
                if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) || subnet.Spec.Vpc != c.config.ClusterRouter || subnet.Name == c.config.NodeSwitch || subnet.Spec.GatewayType != kubeovnv1.GWCentralizedType {
×
1030
                        continue
×
1031
                }
1032
                nodeName := node.Name
×
UNCOV
1033
                if subnet.Spec.EnableEcmp {
×
UNCOV
1034
                        if !util.GatewayContains(subnet.Spec.GatewayNode, nodeName) &&
×
1035
                                (subnet.Spec.GatewayNode != "" || !util.MatchLabelSelectors(subnet.Spec.GatewayNodeSelectors, node.Labels)) {
×
1036
                                continue
×
1037
                        }
1038

UNCOV
1039
                        for nextHop := range strings.SplitSeq(nodeIP, ",") {
×
UNCOV
1040
                                for cidrBlock := range strings.SplitSeq(subnet.Spec.CIDRBlock, ",") {
×
1041
                                        if util.CheckProtocol(cidrBlock) != util.CheckProtocol(nextHop) {
×
1042
                                                continue
×
1043
                                        }
1044

UNCOV
1045
                                        nextHops, nameIPMap, err := c.getPolicyRouteParams(cidrBlock, util.GatewayRouterPolicyPriority)
×
1046
                                        if err != nil {
×
1047
                                                klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
×
UNCOV
1048
                                                continue
×
1049
                                        }
1050
                                        if nameIPMap[nodeName] == nextHop {
×
1051
                                                continue
×
1052
                                        }
1053

1054
                                        nextHops.Add(nextHop)
×
1055
                                        if nameIPMap == nil {
×
1056
                                                nameIPMap = make(map[string]string, 1)
×
1057
                                        }
×
1058
                                        nameIPMap[nodeName] = nextHop
×
1059
                                        klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
UNCOV
1060
                                        if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
UNCOV
1061
                                                klog.Errorf("failed to update policy route for subnet %s on node %s, %v", subnet.Name, nodeName, err)
×
1062
                                                return err
×
1063
                                        }
×
1064
                                }
1065
                        }
1066
                } else {
×
1067
                        if subnet.Status.ActivateGateway != nodeName {
×
1068
                                continue
×
1069
                        }
1070
                        klog.Infof("add policy route for centralized subnet %s, on node %s, ip %s", subnet.Name, nodeName, nodeIP)
×
UNCOV
1071
                        if err = c.addPolicyRouteForCentralizedSubnet(subnet, nodeName, nil, strings.Split(nodeIP, ",")); err != nil {
×
UNCOV
1072
                                klog.Errorf("failed to add active-backup policy route for centralized subnet %s: %v", subnet.Name, err)
×
1073
                                return err
×
UNCOV
1074
                        }
×
1075
                }
1076
        }
UNCOV
1077
        return nil
×
1078
}
1079

1080
func (c *Controller) addPolicyRouteForLocalDNSCacheOnNode(dnsIPs []string, nodePortName, nodeIP, nodeName string, af int) error {
1✔
1081
        if len(dnsIPs) == 0 {
1✔
UNCOV
1082
                return c.deletePolicyRouteForLocalDNSCacheOnNode(nodeName, af)
×
UNCOV
1083
        }
×
1084

1085
        var (
1✔
1086
                externalIDs = map[string]string{
1✔
1087
                        "vendor":          util.CniTypeName,
1✔
1088
                        "node":            nodeName,
1✔
1089
                        "address-family":  strconv.Itoa(af),
1✔
1090
                        "isLocalDnsCache": "true",
1✔
1091
                }
1✔
1092
                pgAs     = strings.ReplaceAll(fmt.Sprintf("%s_ip%d", nodePortName, af), "-", ".")
1✔
1093
                action   = kubeovnv1.PolicyRouteActionReroute
1✔
1094
                nextHops = []string{nodeIP}
1✔
1095
        )
1✔
1096
        matches := strset.NewWithSize(len(dnsIPs))
1✔
1097
        for _, ip := range dnsIPs {
2✔
1098
                matches.Add(fmt.Sprintf("ip%d.src == $%s && ip%d.dst == %s", af, pgAs, af, ip))
1✔
1099
        }
1✔
1100

1101
        policies, err := c.OVNNbClient.ListLogicalRouterPolicies(c.config.ClusterRouter, -1, externalIDs, true)
1✔
1102
        if err != nil {
1✔
UNCOV
1103
                klog.Errorf("failed to list logical router policies for node %q af %d: %v", nodeName, af, err)
×
UNCOV
1104
                return err
×
UNCOV
1105
        }
×
1106

1107
        for _, policy := range policies {
2✔
1108
                if policy.Priority == util.NodeRouterPolicyPriority && policy.Action == string(action) && slices.Equal(policy.Nexthops, nextHops) && matches.Has(policy.Match) {
2✔
1109
                        matches.Remove(policy.Match)
1✔
1110
                        continue
1✔
1111
                }
1112
                // delete unused policy router policy
1113
                klog.Infof("deleting logical router policy by UUID %s", policy.UUID)
1✔
1114
                if err = c.OVNNbClient.DeleteLogicalRouterPolicyByUUID(c.config.ClusterRouter, policy.UUID); err != nil {
1✔
UNCOV
1115
                        klog.Errorf("failed to delete logical router policy by UUID %s: %v", policy.UUID, err)
×
UNCOV
1116
                        return err
×
UNCOV
1117
                }
×
1118
        }
1119

1120
        for _, match := range matches.List() {
2✔
1121
                klog.Infof("add node local dns cache policy route for router %s: match %q, action %q, nexthop %q, externalID %v", c.config.ClusterRouter, match, action, nodeIP, externalIDs)
1✔
1122
                if err := c.addPolicyRouteToVpc(
1✔
1123
                        c.config.ClusterRouter,
1✔
1124
                        &kubeovnv1.PolicyRoute{
1✔
1125
                                Priority:  util.NodeRouterPolicyPriority,
1✔
1126
                                Match:     match,
1✔
1127
                                Action:    action,
1✔
1128
                                NextHopIP: nodeIP,
1✔
1129
                        },
1✔
1130
                        externalIDs,
1✔
1131
                ); err != nil {
1✔
UNCOV
1132
                        klog.Errorf("failed to add logical router policy for node %s: %v", nodeName, err)
×
UNCOV
1133
                        return err
×
UNCOV
1134
                }
×
1135
        }
1136

1137
        return nil
1✔
1138
}
1139

1140
func (c *Controller) deletePolicyRouteForLocalDNSCacheOnNode(nodeName string, af int) error {
×
1141
        policies, err := c.OVNNbClient.ListLogicalRouterPolicies(c.config.ClusterRouter, -1, map[string]string{
×
1142
                "vendor":          util.CniTypeName,
×
1143
                "node":            nodeName,
×
1144
                "address-family":  strconv.Itoa(af),
×
1145
                "isLocalDnsCache": "true",
×
1146
        }, true)
×
1147
        if err != nil {
×
1148
                klog.Errorf("failed to list logical router policies: %v", err)
×
1149
                return err
×
UNCOV
1150
        }
×
1151
        if len(policies) == 0 {
×
1152
                return nil
×
1153
        }
×
1154

1155
        for _, policy := range policies {
×
1156
                klog.Infof("delete node local dns cache policy route for router %s with match %s", c.config.ClusterRouter, policy.Match)
×
1157

×
UNCOV
1158
                if err := c.OVNNbClient.DeleteLogicalRouterPolicyByUUID(c.config.ClusterRouter, policy.UUID); err != nil {
×
1159
                        klog.Errorf("failed to delete policy route for node local dns in router %s with match %s: %v", c.config.ClusterRouter, policy.Match, err)
×
UNCOV
1160
                        return err
×
UNCOV
1161
                }
×
1162
        }
UNCOV
1163
        return nil
×
1164
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc