• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 13276279838

05 Dec 2024 10:10AM UTC coverage: 22.382% (+0.06%) from 22.323%
13276279838

Pull #4768

github

hackerain
delete legacy acls when upgrading to v1.13.x (#4742)

the acls in v1.13.x are in tier 2 rather than tier 0 in v1.12.x, the legacy acls
may cause some unexpected behaviors because acls in tier 0 have the higest priority.
we should delete legacy acls and recreate them when upgrading to v1.13.x.

Signed-off-by: suo <yugsuo@gmail.com>
Pull Request #4768: delete legacy acls when upgrading to v1.13.x (#4742)

47 of 137 new or added lines in 9 files covered. (34.31%)

14 existing lines in 5 files now uncovered.

10429 of 46596 relevant lines covered (22.38%)

0.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.37
/pkg/controller/node.go
1
package controller
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "reflect"
8
        "slices"
9
        "strconv"
10
        "strings"
11
        "time"
12

13
        goping "github.com/prometheus-community/pro-bing"
14
        "github.com/scylladb/go-set/strset"
15
        v1 "k8s.io/api/core/v1"
16
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
17
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
        "k8s.io/apimachinery/pkg/labels"
19
        "k8s.io/apimachinery/pkg/types"
20
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
21
        "k8s.io/client-go/tools/cache"
22
        "k8s.io/klog/v2"
23

24
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
25
        "github.com/kubeovn/kube-ovn/pkg/ovs"
26
        "github.com/kubeovn/kube-ovn/pkg/util"
27
)
28

29
type ErrChassisNotFound struct {
30
        Chassis string
31
        Node    string
32
}
33

×
34
func (e *ErrChassisNotFound) Error() string {
×
35
        return fmt.Sprintf("chassis %s not found for node %s", e.Chassis, e.Node)
×
36
}
37

×
38
func (c *Controller) enqueueAddNode(obj interface{}) {
×
39
        var key string
×
40
        var err error
×
41
        if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
×
42
                utilruntime.HandleError(err)
43
                return
×
44
        }
×
45
        klog.V(3).Infof("enqueue add node %s", key)
×
46
        c.addNodeQueue.Add(key)
×
47
}
×
48

×
49
func nodeReady(node *v1.Node) bool {
×
50
        for _, con := range node.Status.Conditions {
×
51
                if con.Type == v1.NodeReady && con.Status == v1.ConditionTrue {
52
                        return true
53
                }
×
54
        }
55
        return false
56
}
×
57

×
58
func (c *Controller) enqueueUpdateNode(oldObj, newObj interface{}) {
×
59
        oldNode := oldObj.(*v1.Node)
×
60
        newNode := newObj.(*v1.Node)
×
61

×
62
        if nodeReady(oldNode) != nodeReady(newNode) ||
×
63
                !reflect.DeepEqual(oldNode.Annotations, newNode.Annotations) {
×
64
                var key string
×
65
                var err error
×
66
                if key, err = cache.MetaNamespaceKeyFunc(newObj); err != nil {
×
67
                        utilruntime.HandleError(err)
×
68
                        return
×
69
                }
×
70
                if len(newNode.Annotations) == 0 || newNode.Annotations[util.AllocatedAnnotation] != "true" {
71
                        klog.V(3).Infof("enqueue add node %s", key)
72
                        c.addNodeQueue.Add(key)
73
                } else {
×
74
                        klog.V(3).Infof("enqueue update node %s", key)
×
75
                        c.updateNodeQueue.Add(key)
×
76
                }
×
77
        }
×
78
}
×
79

×
80
func (c *Controller) enqueueDeleteNode(obj interface{}) {
81
        var key string
×
82
        var err error
×
83
        if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
×
84
                utilruntime.HandleError(err)
85
                return
86
        }
1✔
87
        klog.V(3).Infof("enqueue delete node %s", key)
1✔
88

1✔
89
        c.deletingNodeObjMap.Store(key, obj.(*v1.Node))
1✔
90
        c.deleteNodeQueue.Add(key)
×
91
}
×
92

×
93
func nodeUnderlayAddressSetName(node string, af int) string {
94
        return fmt.Sprintf("node_%s_underlay_v%d", strings.ReplaceAll(node, "-", "_"), af)
2✔
95
}
1✔
96

1✔
NEW
97
// for upgrading from v1.12.x to v1.13.x
×
98
func (c *Controller) upgradeNodesToV1_13() error {
99
        // clear legacy acls in tier 0 for node port group
1✔
NEW
100
        nodes, err := c.nodesLister.List(labels.Everything())
×
NEW
101
        if err != nil {
×
NEW
102
                klog.Errorf("failed to list nodes: %v", err)
×
103
                return err
104
        }
105

1✔
106
        for _, node := range nodes {
107
                pgName := strings.ReplaceAll(node.Annotations[util.PortNameAnnotation], "-", ".")
108
                if pgName == "" {
1✔
109
                        continue
1✔
NEW
110
                }
×
NEW
111
                if err = c.OVNNbClient.DeleteAcls(pgName, portGroupKey, "", nil, util.DefaultACLTier); err != nil {
×
NEW
112
                        klog.Errorf("delete legacy node acl for node pg %s: %v", pgName, err)
×
113
                        return err
1✔
114
                }
115
        }
NEW
116

×
NEW
117
        return nil
×
NEW
118
}
×
119

NEW
120
func (c *Controller) upgradeNodes() error {
×
NEW
121
        if err := c.upgradeNodesToV1_13(); err != nil {
×
NEW
122
                klog.Errorf("failed to upgrade nodes to v1.13.x, err: %v", err)
×
NEW
123
                return err
×
NEW
124
        }
×
NEW
125
        return nil
×
NEW
126
}
×
127

128
func (c *Controller) handleAddNode(key string) error {
×
129
        c.nodeKeyMutex.LockKey(key)
×
130
        defer func() { _ = c.nodeKeyMutex.UnlockKey(key) }()
×
131

×
132
        cachedNode, err := c.nodesLister.Get(key)
×
133
        if err != nil {
×
134
                if k8serrors.IsNotFound(err) {
×
135
                        return nil
×
136
                }
137
                klog.Errorf("failed to get node %s: %v", key, err)
×
138
                return err
×
139
        }
×
140
        node := cachedNode.DeepCopy()
×
141
        klog.Infof("handle add node %s", node.Name)
142

143
        subnets, err := c.subnetsLister.List(labels.Everything())
×
144
        if err != nil {
×
145
                klog.Errorf("failed to list subnets: %v", err)
×
146
                return err
×
147
        }
×
148

×
149
        nodeIPv4, nodeIPv6 := util.GetNodeInternalIP(*node)
150
        for _, subnet := range subnets {
151
                if subnet.Spec.Vpc != c.config.ClusterRouter {
152
                        continue
×
153
                }
×
154

×
155
                v4, v6 := util.SplitStringIP(subnet.Spec.CIDRBlock)
×
156
                if subnet.Spec.Vlan == "" && (util.CIDRContainIP(v4, nodeIPv4) || util.CIDRContainIP(v6, nodeIPv6)) {
157
                        msg := fmt.Sprintf("internal IP address of node %s is in CIDR of subnet %s, this may result in network issues", node.Name, subnet.Name)
×
158
                        klog.Warning(msg)
×
159
                        c.recorder.Eventf(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: node.Name, UID: types.UID(node.Name)}}, v1.EventTypeWarning, "NodeAddressConflictWithSubnet", msg)
×
160
                        break
×
161
                }
×
162
        }
163

×
164
        if err = c.handleNodeAnnotationsForProviderNetworks(node); err != nil {
×
165
                klog.Errorf("failed to handle annotations of node %s for provider networks: %v", node.Name, err)
×
166
                return err
×
167
        }
×
168

×
169
        subnet, err := c.subnetsLister.Get(c.config.NodeSwitch)
×
170
        if err != nil {
×
171
                klog.Errorf("failed to get node subnet: %v", err)
×
172
                return err
×
173
        }
×
174

×
175
        var v4IP, v6IP, mac string
×
176
        portName := util.NodeLspName(key)
×
177
        if node.Annotations[util.AllocatedAnnotation] == "true" && node.Annotations[util.IPAddressAnnotation] != "" && node.Annotations[util.MacAddressAnnotation] != "" {
×
178
                macStr := node.Annotations[util.MacAddressAnnotation]
×
179
                v4IP, v6IP, mac, err = c.ipam.GetStaticAddress(portName, portName, node.Annotations[util.IPAddressAnnotation],
180
                        &macStr, node.Annotations[util.LogicalSwitchAnnotation], true)
181
                if err != nil {
×
182
                        klog.Errorf("failed to alloc static ip addrs for node %v: %v", node.Name, err)
×
183
                        return err
×
184
                }
×
185
        } else {
×
186
                v4IP, v6IP, mac, err = c.ipam.GetRandomAddress(portName, portName, nil, c.config.NodeSwitch, "", nil, true)
187
                if err != nil {
×
188
                        klog.Errorf("failed to alloc random ip addrs for node %v: %v", node.Name, err)
×
189
                        return err
×
190
                }
191
        }
192

×
193
        ipStr := util.GetStringIP(v4IP, v6IP)
×
194
        if err := c.OVNNbClient.CreateBareLogicalSwitchPort(c.config.NodeSwitch, portName, ipStr, mac); err != nil {
×
195
                klog.Errorf("failed to create logical switch port %s: %v", portName, err)
×
196
                return err
×
197
        }
×
198

×
199
        for _, ip := range strings.Split(ipStr, ",") {
×
200
                if ip == "" {
×
201
                        continue
×
202
                }
×
203

×
204
                nodeIP, af := nodeIPv4, 4
×
205
                protocol := util.CheckProtocol(ip)
×
206
                if protocol == kubeovnv1.ProtocolIPv6 {
×
207
                        nodeIP, af = nodeIPv6, 6
×
208
                }
×
209
                if nodeIP != "" {
×
210
                        var (
×
211
                                match       = fmt.Sprintf("ip%d.dst == %s", af, nodeIP)
×
212
                                action      = kubeovnv1.PolicyRouteActionReroute
×
213
                                externalIDs = map[string]string{
×
214
                                        "vendor":         util.CniTypeName,
×
215
                                        "node":           node.Name,
×
216
                                        "address-family": strconv.Itoa(af),
×
217
                                }
×
218
                        )
×
219
                        klog.Infof("add policy route for router: %s, match %s, action %s, nexthop %s, externalID %v", c.config.ClusterRouter, match, action, ip, externalIDs)
×
220
                        if err = c.addPolicyRouteToVpc(
×
221
                                c.config.ClusterRouter,
222
                                &kubeovnv1.PolicyRoute{
×
223
                                        Priority:  util.NodeRouterPolicyPriority,
×
224
                                        Match:     match,
×
225
                                        Action:    action,
×
226
                                        NextHopIP: ip,
×
227
                                },
228
                                externalIDs,
229
                        ); err != nil {
×
230
                                klog.Errorf("failed to add logical router policy for node %s: %v", node.Name, err)
×
231
                                return err
×
232
                        }
×
233

234
                        dnsIPs := make([]string, 0, len(c.config.NodeLocalDNSIPs))
235
                        for _, ip := range c.config.NodeLocalDNSIPs {
236
                                if util.CheckProtocol(ip) == protocol {
×
237
                                        dnsIPs = append(dnsIPs, ip)
×
238
                                }
×
239
                        }
×
240

241
                        if err = c.addPolicyRouteForLocalDNSCacheOnNode(dnsIPs, portName, ip, node.Name, af); err != nil {
×
242
                                klog.Errorf("failed to add policy route for node %s: %v", node.Name, err)
×
243
                                return err
×
244
                        }
×
245
                }
×
246
        }
×
247

×
248
        if err := c.addNodeGatewayStaticRoute(); err != nil {
×
249
                klog.Errorf("failed to add static route for node gw: %v", err)
×
250
                return err
×
251
        }
×
252

×
253
        annotations := map[string]any{
×
254
                util.IPAddressAnnotation:     ipStr,
255
                util.MacAddressAnnotation:    mac,
×
256
                util.CidrAnnotation:          subnet.Spec.CIDRBlock,
×
257
                util.GatewayAnnotation:       subnet.Spec.Gateway,
×
258
                util.LogicalSwitchAnnotation: c.config.NodeSwitch,
×
259
                util.AllocatedAnnotation:     "true",
260
                util.PortNameAnnotation:      portName,
×
261
        }
×
262
        if err = util.UpdateNodeAnnotations(c.config.KubeClient.CoreV1().Nodes(), node.Name, annotations); err != nil {
×
263
                klog.Errorf("failed to update annotations of node %s: %v", node.Name, err)
264
                return err
×
265
        }
×
266

×
267
        if err := c.createOrUpdateIPCR("", "", ipStr, mac, c.config.NodeSwitch, "", node.Name, ""); err != nil {
×
268
                klog.Errorf("failed to create or update IPs %s: %v", portName, err)
269
                return err
×
270
        }
271

272
        for _, subnet := range subnets {
273
                if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) || subnet.Spec.Vpc != c.config.ClusterRouter || subnet.Name == c.config.NodeSwitch || subnet.Spec.GatewayType != kubeovnv1.GWDistributedType {
×
274
                        continue
×
275
                }
×
276
                if err = c.createPortGroupForDistributedSubnet(node, subnet); err != nil {
×
277
                        klog.Errorf("failed to create port group for node %s and subnet %s: %v", node.Name, subnet.Name, err)
×
278
                        return err
279
                }
×
280
                if err = c.addPolicyRouteForDistributedSubnet(subnet, node.Name, v4IP, v6IP); err != nil {
×
281
                        klog.Errorf("failed to add policy router for node %s and subnet %s: %v", node.Name, subnet.Name, err)
×
282
                        return err
×
283
                }
284
                // policy route for overlay distributed subnet should be reconciled when node ip changed
×
285
                c.addOrUpdateSubnetQueue.Add(subnet.Name)
×
286
        }
×
287

×
288
        // ovn acl doesn't support address_set name with '-', so replace '-' by '.'
289
        pgName := strings.ReplaceAll(portName, "-", ".")
×
290
        if err = c.OVNNbClient.CreatePortGroup(pgName, map[string]string{networkPolicyKey: "node" + "/" + key}); err != nil {
×
291
                klog.Errorf("create port group %s for node %s: %v", pgName, key, err)
×
292
                return err
×
293
        }
×
294

295
        if err := c.addPolicyRouteForCentralizedSubnetOnNode(node.Name, ipStr); err != nil {
296
                klog.Errorf("failed to add policy route for node %s, %v", key, err)
×
297
                return err
×
298
        }
×
299

×
300
        if err := c.UpdateChassisTag(node); err != nil {
×
301
                klog.Errorf("failed to update chassis tag for node %s: %v", node.Name, err)
×
302
                return err
303
        }
×
304

×
305
        if err := c.retryDelDupChassis(util.ChassisRetryMaxTimes, util.ChassisControllerRetryInterval, c.cleanDuplicatedChassis, node); err != nil {
×
306
                klog.Errorf("failed to clean duplicated chassis for node %s: %v", node.Name, err)
×
307
                return err
×
308
        }
×
309
        return nil
×
310
}
×
311

×
312
func (c *Controller) handleNodeAnnotationsForProviderNetworks(node *v1.Node) error {
×
313
        providerNetworks, err := c.providerNetworksLister.List(labels.Everything())
×
314
        if err != nil && !k8serrors.IsNotFound(err) {
315
                klog.Errorf("failed to list provider networks: %v", err)
×
316
                return err
×
317
        }
×
318

×
319
        for _, pn := range providerNetworks {
×
320
                excludeAnno := fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name)
321
                interfaceAnno := fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)
322

×
323
                var newPn *kubeovnv1.ProviderNetwork
×
324
                excluded := slices.Contains(pn.Spec.ExcludeNodes, node.Name)
×
325
                if !excluded && len(node.Annotations) != 0 && node.Annotations[excludeAnno] == "true" {
×
326
                        newPn = pn.DeepCopy()
×
327
                        newPn.Spec.ExcludeNodes = append(newPn.Spec.ExcludeNodes, node.Name)
×
328
                        excluded = true
×
329
                }
×
330

×
331
                var customInterface string
×
332
                for _, v := range pn.Spec.CustomInterfaces {
333
                        if slices.Contains(v.Nodes, node.Name) {
334
                                customInterface = v.Interface
×
335
                                break
×
336
                        }
×
337
                }
×
338
                if customInterface == "" && len(node.Annotations) != 0 {
×
339
                        if customInterface = node.Annotations[interfaceAnno]; customInterface != "" {
×
340
                                if newPn == nil {
341
                                        newPn = pn.DeepCopy()
342
                                }
343
                                var index *int
×
344
                                for i := range newPn.Spec.CustomInterfaces {
×
345
                                        if newPn.Spec.CustomInterfaces[i].Interface == customInterface {
×
346
                                                index = &i
×
347
                                                break
×
348
                                        }
349
                                }
350
                                if index != nil {
×
351
                                        newPn.Spec.CustomInterfaces[*index].Nodes = append(newPn.Spec.CustomInterfaces[*index].Nodes, node.Name)
×
352
                                } else {
×
353
                                        ci := kubeovnv1.CustomInterface{Interface: customInterface, Nodes: []string{node.Name}}
×
354
                                        newPn.Spec.CustomInterfaces = append(newPn.Spec.CustomInterfaces, ci)
×
355
                                }
×
356
                        }
357
                }
358

×
359
                if newPn != nil {
×
360
                        if newPn, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().Update(context.Background(), newPn, metav1.UpdateOptions{}); err != nil {
×
361
                                klog.Errorf("failed to update provider network %s: %v", pn.Name, err)
×
362
                                return err
×
363
                        }
×
364
                }
365

×
366
                if len(node.Annotations) != 0 {
×
367
                        newNode := node.DeepCopy()
×
368
                        delete(newNode.Annotations, excludeAnno)
×
369
                        delete(newNode.Annotations, interfaceAnno)
×
370
                        if len(newNode.Annotations) != len(node.Annotations) {
×
371
                                if _, err = c.config.KubeClient.CoreV1().Nodes().Update(context.Background(), newNode, metav1.UpdateOptions{}); err != nil {
372
                                        klog.Errorf("failed to update node %s: %v", node.Name, err)
373
                                        return err
374
                                }
375
                        }
×
376
                }
377

378
                if excluded {
×
379
                        if newPn == nil {
×
380
                                newPn = pn.DeepCopy()
×
381
                        } else {
×
382
                                newPn = newPn.DeepCopy()
×
383
                        }
×
384

×
385
                        if newPn.Status.EnsureNodeStandardConditions(node.Name) {
386
                                _, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().UpdateStatus(context.Background(), newPn, metav1.UpdateOptions{})
×
387
                                if err != nil {
×
388
                                        klog.Errorf("failed to update status of provider network %s: %v", pn.Name, err)
×
389
                                        return err
×
390
                                }
×
391
                        }
×
392
                }
×
393
        }
×
394

×
395
        return nil
×
396
}
×
397

×
398
func (c *Controller) handleDeleteNode(key string) (err error) {
399
        c.nodeKeyMutex.LockKey(key)
400
        defer func() {
×
401
                _ = c.nodeKeyMutex.UnlockKey(key)
×
402
                if err == nil {
×
403
                        c.deletingNodeObjMap.Delete(key)
×
404
                }
×
405
        }()
×
406
        klog.Infof("handle delete node %s", key)
×
407

×
408
        node, ok := c.deletingNodeObjMap.Load(key)
×
409
        if !ok {
×
410
                return nil
×
411
        }
412
        n, _ := c.nodesLister.Get(key)
×
413
        if n != nil && n.UID != node.UID {
×
414
                klog.Warningf("Node %s is adding, skip the node delete handler, but it may leave some gc resources behind", key)
×
415
                return nil
×
416
        }
×
417
        return c.deleteNode(key)
418
}
419

420
func (c *Controller) deleteNode(key string) error {
×
421
        portName := util.NodeLspName(key)
×
422
        klog.Infof("delete logical switch port %s", portName)
×
423
        if err := c.OVNNbClient.DeleteLogicalSwitchPort(portName); err != nil {
×
424
                klog.Errorf("failed to delete node switch port %s: %v", portName, err)
×
425
                return err
426
        }
×
427
        if err := c.OVNSbClient.DeleteChassisByHost(key); err != nil {
×
428
                klog.Errorf("failed to delete chassis for node %s: %v", key, err)
×
429
                return err
×
430
        }
431

×
432
        for _, af := range [...]int{4, 6} {
×
433
                if err := c.deletePolicyRouteForLocalDNSCacheOnNode(key, af); err != nil {
×
434
                        klog.Error(err)
×
435
                        return err
×
436
                }
×
437
        }
×
438

×
439
        // ovn acl doesn't support address_set name with '-', so replace '-' by '.'
440
        pgName := strings.ReplaceAll(portName, "-", ".")
×
441
        if err := c.OVNNbClient.DeletePortGroup(pgName); err != nil {
×
442
                klog.Errorf("delete port group %s for node: %v", portName, err)
×
443
                return err
×
444
        }
×
445

×
446
        if err := c.deletePolicyRouteForNode(key, portName); err != nil {
×
447
                klog.Errorf("failed to delete policy route for node %s: %v", key, err)
×
448
                return err
449
        }
×
450

×
451
        if err := c.OVNNbClient.DeleteAddressSet(nodeUnderlayAddressSetName(key, 4)); err != nil {
×
452
                klog.Errorf("failed to delete address set for node %s: %v", key, err)
×
453
                return err
×
454
        }
455
        if err := c.OVNNbClient.DeleteAddressSet(nodeUnderlayAddressSetName(key, 6)); err != nil {
×
456
                klog.Errorf("failed to delete address set for node %s: %v", key, err)
×
457
                return err
×
458
        }
×
459

460
        klog.Infof("release node port %s", portName)
×
461
        c.ipam.ReleaseAddressByPod(portName, c.config.NodeSwitch)
462

463
        providerNetworks, err := c.providerNetworksLister.List(labels.Everything())
×
464
        if err != nil && !k8serrors.IsNotFound(err) {
×
465
                klog.Errorf("failed to list provider networks: %v", err)
×
466
                return err
×
467
        }
×
468

×
469
        for _, pn := range providerNetworks {
×
470
                if err = c.updateProviderNetworkForNodeDeletion(pn, key); err != nil {
×
471
                        klog.Error(err)
×
472
                        return err
×
473
                }
×
474
        }
×
475
        klog.Infof("delete node ip %s", portName)
×
476
        if err = c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), portName, metav1.DeleteOptions{}); err != nil && !k8serrors.IsNotFound(err) {
×
477
                return err
×
478
        }
×
479

×
480
        return nil
×
481
}
482

483
func (c *Controller) updateProviderNetworkForNodeDeletion(pn *kubeovnv1.ProviderNetwork, node string) error {
484
        // update provider network status
×
485
        var needUpdate bool
×
486
        newPn := pn.DeepCopy()
×
487
        if slices.Contains(newPn.Status.ReadyNodes, node) {
×
488
                newPn.Status.ReadyNodes = util.RemoveString(newPn.Status.ReadyNodes, node)
×
489
                needUpdate = true
490
        }
×
491
        if newPn.Status.RemoveNodeConditions(node) {
×
492
                needUpdate = true
×
493
        }
×
494
        if needUpdate {
×
495
                var err error
×
496
                newPn, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().UpdateStatus(context.Background(), newPn, metav1.UpdateOptions{})
×
497
                if err != nil {
×
498
                        klog.Errorf("failed to update status of provider network %s: %v", pn.Name, err)
×
499
                        return err
×
500
                }
501
        }
×
502

×
503
        // update provider network spec
×
504
        pn, newPn = newPn, nil
×
505
        if excludeNodes := util.RemoveString(pn.Spec.ExcludeNodes, node); len(excludeNodes) != len(pn.Spec.ExcludeNodes) {
×
506
                newPn = pn.DeepCopy()
×
507
                newPn.Spec.ExcludeNodes = excludeNodes
×
508
        }
×
509

×
510
        var changed bool
511
        customInterfaces := make([]kubeovnv1.CustomInterface, 0, len(pn.Spec.CustomInterfaces))
512
        for _, ci := range pn.Spec.CustomInterfaces {
×
513
                nodes := util.RemoveString(ci.Nodes, node)
514
                if !changed {
515
                        changed = len(nodes) == 0 || len(nodes) != len(ci.Nodes)
×
516
                }
×
517
                if len(nodes) != 0 {
×
518
                        customInterfaces = append(customInterfaces, kubeovnv1.CustomInterface{Interface: ci.Interface, Nodes: nodes})
×
519
                }
×
520
        }
×
521
        if changed {
×
522
                newPn = pn.DeepCopy()
×
523
                newPn.Spec.CustomInterfaces = customInterfaces
×
524
        }
×
525
        if newPn != nil {
×
526
                if _, err := c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().Update(context.Background(), newPn, metav1.UpdateOptions{}); err != nil {
×
527
                        klog.Errorf("failed to update provider network %s: %v", pn.Name, err)
528
                        return err
529
                }
×
530
        }
×
531

×
532
        return nil
×
533
}
534

×
535
func (c *Controller) handleUpdateNode(key string) error {
×
536
        c.nodeKeyMutex.LockKey(key)
×
537
        defer func() { _ = c.nodeKeyMutex.UnlockKey(key) }()
×
538
        klog.Infof("handle update node %s", key)
×
539

540
        node, err := c.nodesLister.Get(key)
×
541
        if err != nil {
×
542
                if k8serrors.IsNotFound(err) {
×
543
                        return nil
×
544
                }
×
545
                klog.Errorf("failed to get node %s: %v", key, err)
×
546
                return err
×
547
        }
×
548

549
        if err = c.handleNodeAnnotationsForProviderNetworks(node); err != nil {
×
550
                klog.Errorf("failed to handle annotations of node %s for provider networks: %v", node.Name, err)
×
551
                return err
×
552
        }
×
553

×
554
        subnets, err := c.subnetsLister.List(labels.Everything())
×
555
        if err != nil {
×
556
                klog.Errorf("failed to get subnets %v", err)
557
                return err
558
        }
559

×
560
        if err := c.UpdateChassisTag(node); err != nil {
561
                klog.Errorf("failed to update chassis tag for node %s: %v", node.Name, err)
562
                return err
×
563
        }
×
564
        if err := c.retryDelDupChassis(util.ChassisRetryMaxTimes, util.ChassisControllerRetryInterval, c.cleanDuplicatedChassis, node); err != nil {
×
565
                klog.Errorf("failed to clean duplicated chassis for node %s: %v", node.Name, err)
×
566
                return err
567
        }
568

×
569
        for _, cachedSubnet := range subnets {
×
570
                subnet := cachedSubnet.DeepCopy()
×
571
                if util.GatewayContains(subnet.Spec.GatewayNode, node.Name) {
×
572
                        if err := c.reconcileOvnDefaultVpcRoute(subnet); err != nil {
×
573
                                klog.Error(err)
×
574
                                return err
×
575
                        }
×
576
                }
×
577
        }
×
578

×
579
        return nil
×
580
}
581

×
582
func (c *Controller) CheckGatewayReady() {
×
583
        if err := c.checkGatewayReady(); err != nil {
×
584
                klog.Errorf("failed to check gateway ready %v", err)
×
585
        }
×
586
}
×
587

×
588
func (c *Controller) checkGatewayReady() error {
×
589
        klog.V(3).Infoln("start to check gateway status")
590
        subnetList, err := c.subnetsLister.List(labels.Everything())
×
591
        if err != nil {
×
592
                klog.Errorf("failed to list subnets %v", err)
×
593
                return err
594
        }
595
        nodes, err := c.nodesLister.List(labels.Everything())
×
596
        if err != nil {
×
597
                klog.Errorf("failed to list nodes, %v", err)
×
598
                return err
×
599
        }
×
600

601
        for _, subnet := range subnetList {
×
602
                if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) ||
×
603
                        subnet.Spec.GatewayNode == "" ||
×
604
                        subnet.Spec.GatewayType != kubeovnv1.GWCentralizedType ||
×
605
                        !subnet.Spec.EnableEcmp {
×
606
                        continue
607
                }
608

×
609
                for _, node := range nodes {
×
610
                        ipStr := node.Annotations[util.IPAddressAnnotation]
×
611
                        for _, ip := range strings.Split(ipStr, ",") {
×
612
                                for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
×
613
                                        if util.CheckProtocol(cidrBlock) != util.CheckProtocol(ip) {
×
614
                                                continue
×
615
                                        }
×
616

×
617
                                        exist, err := c.checkPolicyRouteExistForNode(node.Name, cidrBlock, ip, util.GatewayRouterPolicyPriority)
×
618
                                        if err != nil {
×
619
                                                klog.Errorf("check ecmp policy route exist for subnet %v, error %v", subnet.Name, err)
×
620
                                                break
×
621
                                        }
×
622
                                        nextHops, nameIPMap, err := c.getPolicyRouteParas(cidrBlock, util.GatewayRouterPolicyPriority)
×
623
                                        if err != nil {
×
624
                                                klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
×
625
                                                break
×
626
                                        }
×
627

×
628
                                        if util.GatewayContains(subnet.Spec.GatewayNode, node.Name) {
×
629
                                                pinger, err := goping.NewPinger(ip)
×
630
                                                if err != nil {
631
                                                        return fmt.Errorf("failed to init pinger, %w", err)
×
632
                                                }
×
633
                                                pinger.SetPrivileged(true)
×
634

×
635
                                                count := 5
×
636
                                                pinger.Count = count
×
637
                                                pinger.Timeout = time.Duration(count) * time.Second
×
638
                                                pinger.Interval = 1 * time.Second
×
639

×
640
                                                success := false
×
641

×
642
                                                pinger.OnRecv = func(_ *goping.Packet) {
×
643
                                                        success = true
×
644
                                                        pinger.Stop()
×
645
                                                }
×
646
                                                if err = pinger.Run(); err != nil {
×
647
                                                        klog.Errorf("failed to run pinger for destination %s: %v", ip, err)
×
648
                                                        return err
649
                                                }
×
650

×
651
                                                if !nodeReady(node) {
×
652
                                                        success = false
×
653
                                                }
×
654

×
655
                                                if !success {
×
656
                                                        if exist {
×
657
                                                                klog.Warningf("failed to ping ovn0 %s or node %s is not ready, delete ecmp policy route for node", ip, node.Name)
×
658
                                                                nextHops.Remove(ip)
×
659
                                                                delete(nameIPMap, node.Name)
×
660
                                                                klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
661
                                                                if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
662
                                                                        klog.Errorf("failed to delete ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
663
                                                                        return err
664
                                                                }
×
665
                                                        }
×
666
                                                } else {
×
667
                                                        klog.V(3).Infof("succeed to ping gw %s", ip)
×
668
                                                        if !exist {
×
669
                                                                nextHops.Add(ip)
×
670
                                                                if nameIPMap == nil {
×
671
                                                                        nameIPMap = make(map[string]string, 1)
×
672
                                                                }
×
673
                                                                nameIPMap[node.Name] = ip
674
                                                                klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
675
                                                                if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
676
                                                                        klog.Errorf("failed to add ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
677
                                                                        return err
678
                                                                }
×
679
                                                        }
680
                                                }
681
                                        } else if exist {
×
682
                                                klog.Infof("subnet %s gatewayNode does not contains node %v, delete policy route for node ip %s", subnet.Name, node.Name, ip)
×
683
                                                nextHops.Remove(ip)
×
684
                                                delete(nameIPMap, node.Name)
×
685
                                                klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
686
                                                if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
687
                                                        klog.Errorf("failed to delete ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
×
688
                                                        return err
×
689
                                                }
×
690
                                        }
×
691
                                }
×
692
                        }
×
693
                }
×
694
        }
695
        return nil
×
696
}
697

698
func (c *Controller) cleanDuplicatedChassis(node *v1.Node) error {
×
699
        // if multi chassis has the same node name, delete all of them
×
700
        chassises, err := c.OVNSbClient.GetAllChassisByHost(node.Name)
×
701
        if err != nil {
×
702
                klog.Errorf("failed to list chassis %v", err)
×
703
                return err
×
704
        }
×
705
        if len(*chassises) > 1 {
×
706
                klog.Warningf("node %s has multiple chassis", node.Name)
×
707
                if err := c.OVNSbClient.DeleteChassisByHost(node.Name); err != nil {
×
708
                        klog.Errorf("failed to delete chassis for node %s: %v", node.Name, err)
709
                        return err
×
710
                }
711
        }
×
712
        return nil
×
713
}
×
714

×
715
func (c *Controller) retryDelDupChassis(attempts, sleep int, f func(node *v1.Node) error, node *v1.Node) (err error) {
×
716
        i := 0
×
717
        for ; ; i++ {
×
718
                err = f(node)
719
                if err == nil {
720
                        return
×
721
                }
×
722
                klog.Errorf("failed to delete duplicated chassis for node %s: %v", node.Name, err)
×
723
                if i >= (attempts - 1) {
×
724
                        break
×
725
                }
726
                time.Sleep(time.Duration(sleep) * time.Second)
×
727
        }
×
728
        if i >= (attempts - 1) {
×
729
                errMsg := errors.New("exhausting all attempts")
×
730
                klog.Error(errMsg)
×
731
                return errMsg
×
732
        }
×
733
        klog.V(3).Infof("finish check chassis")
734
        return nil
×
735
}
×
736

×
737
func (c *Controller) fetchPodsOnNode(nodeName string, pods []*v1.Pod) ([]string, error) {
738
        ports := make([]string, 0, len(pods))
739
        for _, pod := range pods {
×
740
                if !isPodAlive(pod) || pod.Spec.HostNetwork || pod.Spec.NodeName != nodeName || pod.Annotations[util.LogicalRouterAnnotation] != c.config.ClusterRouter {
×
741
                        continue
×
742
                }
743
                podName := c.getNameByPod(pod)
744

×
745
                podNets, err := c.getPodKubeovnNets(pod)
746
                if err != nil {
747
                        klog.Errorf("failed to get pod nets %v", err)
×
748
                        return nil, err
×
749
                }
×
750

×
751
                for _, podNet := range podNets {
752
                        if !isOvnSubnet(podNet.Subnet) {
753
                                continue
×
754
                        }
×
755

×
756
                        if pod.Annotations != nil && pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)] == "true" {
×
757
                                ports = append(ports, ovs.PodNameToPortName(podName, pod.Namespace, podNet.ProviderName))
×
758
                        }
×
759
                }
×
760
        }
×
761
        return ports, nil
×
762
}
×
763

764
func (c *Controller) CheckNodePortGroup() {
×
765
        if err := c.checkAndUpdateNodePortGroup(); err != nil {
×
766
                klog.Errorf("check node port group status: %v", err)
×
767
        }
×
768
}
×
769

770
func (c *Controller) checkAndUpdateNodePortGroup() error {
×
771
        klog.V(3).Infoln("start to check node port-group status")
×
772
        np, _ := c.npsLister.List(labels.Everything())
×
773
        networkPolicyExists := len(np) != 0
×
774

×
775
        nodes, err := c.nodesLister.List(labels.Everything())
×
776
        if err != nil {
×
777
                klog.Errorf("list nodes: %v", err)
×
778
                return err
×
779
        }
×
780

×
781
        pods, err := c.podsLister.List(labels.Everything())
×
782
        if err != nil {
×
783
                klog.Errorf("list pods, %v", err)
×
784
                return err
×
785
        }
×
786

×
787
        for _, node := range nodes {
×
788
                // The port-group should already created when add node
×
789
                pgName := strings.ReplaceAll(node.Annotations[util.PortNameAnnotation], "-", ".")
×
790

×
791
                // use join IP only when no internal IP exists
792
                nodeIPv4, nodeIPv6 := util.GetNodeInternalIP(*node)
×
793
                joinIP := node.Annotations[util.IPAddressAnnotation]
×
794
                joinIPv4, joinIPv6 := util.SplitStringIP(joinIP)
×
795
                if nodeIPv4 == "" {
×
796
                        nodeIPv4 = joinIPv4
797
                }
×
798
                if nodeIPv6 == "" {
×
799
                        nodeIPv6 = joinIPv6
×
800
                }
×
801
                nodeIP := strings.Trim(fmt.Sprintf("%s,%s", nodeIPv4, nodeIPv6), ",")
×
802

×
803
                nodePorts, err := c.fetchPodsOnNode(node.Name, pods)
×
804
                if err != nil {
×
805
                        klog.Errorf("fetch pods for node %v: %v", node.Name, err)
×
806
                        return err
807
                }
808

809
                if err = c.OVNNbClient.PortGroupSetPorts(pgName, nodePorts); err != nil {
×
810
                        klog.Errorf("add ports to port group %s: %v", pgName, err)
811
                        return err
812
                }
×
813

×
814
                if networkPolicyExists {
×
815
                        if err := c.OVNNbClient.CreateNodeACL(pgName, nodeIP, joinIP); err != nil {
×
816
                                klog.Errorf("create node acl for node pg %s: %v", pgName, err)
×
817
                        }
×
818
                } else {
×
819
                        // clear all acl
×
NEW
820
                        if err = c.OVNNbClient.DeleteAcls(pgName, portGroupKey, "", nil, util.NilACLTier); err != nil {
×
821
                                klog.Errorf("delete node acl for node pg %s: %v", pgName, err)
×
822
                        }
×
823
                }
×
824
        }
×
825

×
826
        return nil
×
827
}
×
828

×
829
func (c *Controller) UpdateChassisTag(node *v1.Node) error {
×
830
        annoChassisName := node.Annotations[util.ChassisAnnotation]
×
831
        if annoChassisName == "" {
×
832
                // kube-ovn-cni not ready to set chassis
×
833
                return nil
834
        }
835
        chassis, err := c.OVNSbClient.GetChassis(annoChassisName, true)
×
836
        if err != nil {
×
837
                klog.Errorf("failed to get chassis %s for node %s: %v", annoChassisName, node.Name, err)
×
838
                return err
×
839
        }
×
840
        if chassis == nil {
×
841
                klog.Infof("chassis %q not registered for node %s, do chassis gc once", annoChassisName, node.Name)
×
842
                // chassis name conflict, do GC
843
                if err = c.gcChassis(); err != nil {
×
844
                        klog.Errorf("failed to gc chassis: %v", err)
845
                        return err
846
                }
×
847
                err = &ErrChassisNotFound{Chassis: annoChassisName, Node: node.Name}
×
848
                klog.Error(err)
×
849
                return err
×
850
        }
×
851

×
852
        if chassis.ExternalIDs == nil || chassis.ExternalIDs["vendor"] != util.CniTypeName {
×
853
                klog.Infof("init tag %s for node %s chassis %s", util.CniTypeName, node.Name, chassis.Name)
×
854
                if err = c.OVNSbClient.UpdateChassisTag(chassis.Name, node.Name); err != nil {
×
855
                        err := fmt.Errorf("failed to init chassis tag, %w", err)
×
856
                        klog.Error(err)
×
857
                        return err
858
                }
×
859
        }
×
860
        return nil
×
861
}
×
862

×
863
func (c *Controller) addNodeGatewayStaticRoute() error {
864
        // If user not manage static route for default vpc, just add route about ovn-default to join
865
        if vpc, err := c.vpcsLister.Get(c.config.ClusterRouter); err != nil || vpc.Spec.StaticRoutes != nil {
×
866
                existRoute, err := c.OVNNbClient.ListLogicalRouterStaticRoutes(c.config.ClusterRouter, nil, nil, "", nil)
×
867
                if err != nil {
×
868
                        klog.Errorf("failed to get vpc %s static route list, %v", c.config.ClusterRouter, err)
×
869
                }
×
870
                if len(existRoute) != 0 {
×
871
                        klog.Infof("skip add static route for node gw")
×
872
                        return nil
×
873
                }
×
874
        }
×
875
        dstCidr := "0.0.0.0/0,::/0"
×
876
        for _, cidrBlock := range strings.Split(dstCidr, ",") {
×
877
                for _, nextHop := range strings.Split(c.config.NodeSwitchGateway, ",") {
878
                        if util.CheckProtocol(cidrBlock) != util.CheckProtocol(nextHop) {
879
                                continue
×
880
                        }
881

882
                        if err := c.addStaticRouteToVpc(
×
883
                                c.config.ClusterRouter,
×
884
                                &kubeovnv1.StaticRoute{
×
885
                                        Policy:     kubeovnv1.PolicyDst,
×
886
                                        CIDR:       cidrBlock,
×
887
                                        NextHopIP:  nextHop,
×
888
                                        RouteTable: util.MainRouteTable,
×
889
                                },
×
890
                        ); err != nil {
×
891
                                klog.Errorf("failed to add static route for node gw: %v", err)
×
892
                                return err
×
893
                        }
×
894
                }
×
895
        }
×
896
        return nil
×
897
}
898

899
func (c *Controller) getPolicyRouteParas(cidr string, priority int) (*strset.Set, map[string]string, error) {
×
900
        ipSuffix := "ip4"
×
901
        if util.CheckProtocol(cidr) == kubeovnv1.ProtocolIPv6 {
×
902
                ipSuffix = "ip6"
×
903
        }
×
904
        match := fmt.Sprintf("%s.src == %s", ipSuffix, cidr)
×
905
        policyList, err := c.OVNNbClient.GetLogicalRouterPolicy(c.config.ClusterRouter, priority, match, true)
906
        if err != nil {
×
907
                klog.Errorf("failed to get logical router policy: %v", err)
×
908
                return nil, nil, err
×
909
        }
×
910
        if len(policyList) == 0 {
911
                return strset.New(), map[string]string{}, nil
×
912
        }
×
913
        return strset.New(policyList[0].Nexthops...), policyList[0].ExternalIDs, nil
×
914
}
×
915

×
916
func (c *Controller) checkPolicyRouteExistForNode(nodeName, cidr, nexthop string, priority int) (bool, error) {
917
        _, nameIPMap, err := c.getPolicyRouteParas(cidr, priority)
918
        if err != nil {
×
919
                klog.Errorf("failed to get policy route paras, %v", err)
×
920
                return false, err
×
921
        }
922
        if nodeIP, ok := nameIPMap[nodeName]; ok && nodeIP == nexthop {
923
                return true, nil
×
924
        }
×
925
        return false, nil
×
926
}
×
927

×
928
func (c *Controller) deletePolicyRouteForNode(nodeName, portName string) error {
×
929
        subnets, err := c.subnetsLister.List(labels.Everything())
930
        if err != nil {
×
931
                klog.Errorf("get subnets: %v", err)
×
932
                return err
×
933
        }
×
934

×
935
        addresses := c.ipam.GetPodAddress(portName)
936
        for _, addr := range addresses {
937
                if addr.IP == "" {
×
938
                        continue
×
939
                }
×
940
                klog.Infof("deleting logical router policy with nexthop %q from %s for node %s", addr.IP, c.config.ClusterRouter, nodeName)
×
941
                if err = c.OVNNbClient.DeleteLogicalRouterPolicyByNexthop(c.config.ClusterRouter, util.NodeRouterPolicyPriority, addr.IP); err != nil {
×
942
                        klog.Errorf("failed to delete logical router policy with nexthop %q from %s for node %s: %v", addr.IP, c.config.ClusterRouter, nodeName, err)
×
943
                        return err
×
944
                }
945
        }
946

×
947
        for _, subnet := range subnets {
×
948
                if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) || subnet.Spec.Vpc != c.config.ClusterRouter || subnet.Name == c.config.NodeSwitch {
×
949
                        continue
×
950
                }
951

×
952
                if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType {
×
953
                        pgName := getOverlaySubnetsPortGroupName(subnet.Name, nodeName)
×
954
                        if err = c.OVNNbClient.DeletePortGroup(pgName); err != nil {
×
955
                                klog.Errorf("delete port group for subnet %s and node %s: %v", subnet.Name, nodeName, err)
×
956
                                return err
×
957
                        }
×
958

×
959
                        klog.Infof("delete policy route for distributed subnet %s, node %s", subnet.Name, nodeName)
×
960
                        if err = c.deletePolicyRouteForDistributedSubnet(subnet, nodeName); err != nil {
×
961
                                klog.Errorf("delete policy route for subnet %s and node %s: %v", subnet.Name, nodeName, err)
×
962
                                return err
×
963
                        }
×
964
                }
×
965

×
966
                if subnet.Spec.GatewayType == kubeovnv1.GWCentralizedType {
×
967
                        if subnet.Spec.EnableEcmp {
968
                                for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
969
                                        nextHops, nameIPMap, err := c.getPolicyRouteParas(cidrBlock, util.GatewayRouterPolicyPriority)
970
                                        if err != nil {
×
971
                                                klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
×
972
                                                continue
×
973
                                        }
×
974

×
975
                                        exist := false
×
976
                                        if _, ok := nameIPMap[nodeName]; ok {
977
                                                exist = true
978
                                        }
979

×
980
                                        if exist {
981
                                                nextHops.Remove(nameIPMap[nodeName])
982
                                                delete(nameIPMap, nodeName)
×
983

×
984
                                                if nextHops.Size() == 0 {
×
985
                                                        klog.Infof("delete policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
986
                                                        if err := c.deletePolicyRouteForCentralizedSubnet(subnet); err != nil {
×
987
                                                                klog.Errorf("failed to delete policy route for centralized subnet %s, %v", subnet.Name, err)
×
988
                                                                return err
989
                                                        }
×
990
                                                } else {
×
991
                                                        klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
992
                                                        if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
993
                                                                klog.Errorf("failed to update policy route for subnet %s on node %s, %v", subnet.Name, nodeName, err)
994
                                                                return err
×
995
                                                        }
×
996
                                                }
×
997
                                        }
998
                                }
999
                        } else {
×
1000
                                klog.Infof("reconcile policy route for centralized subnet %s", subnet.Name)
×
1001
                                if err := c.reconcileDefaultCentralizedSubnetRouteInDefaultVpc(subnet); err != nil {
×
1002
                                        klog.Errorf("failed to delete policy route for centralized subnet %s, %v", subnet.Name, err)
×
1003
                                        return err
1004
                                }
1005
                        }
×
1006
                }
×
1007
        }
×
1008
        return nil
×
1009
}
1010

×
1011
func (c *Controller) addPolicyRouteForCentralizedSubnetOnNode(nodeName, nodeIP string) error {
×
1012
        subnets, err := c.subnetsLister.List(labels.Everything())
1013
        if err != nil {
1014
                klog.Errorf("failed to get subnets %v", err)
×
1015
                return err
×
1016
        }
×
1017

×
1018
        for _, subnet := range subnets {
×
1019
                if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) || subnet.Spec.Vpc != c.config.ClusterRouter || subnet.Name == c.config.NodeSwitch || subnet.Spec.GatewayType != kubeovnv1.GWCentralizedType {
×
1020
                        continue
×
1021
                }
×
1022

×
1023
                if subnet.Spec.EnableEcmp {
×
1024
                        if !util.GatewayContains(subnet.Spec.GatewayNode, nodeName) {
1025
                                continue
1026
                        }
×
1027

×
1028
                        for _, nextHop := range strings.Split(nodeIP, ",") {
×
1029
                                for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
1030
                                        if util.CheckProtocol(cidrBlock) != util.CheckProtocol(nextHop) {
×
1031
                                                continue
×
1032
                                        }
×
1033
                                        exist, err := c.checkPolicyRouteExistForNode(nodeName, cidrBlock, nextHop, util.GatewayRouterPolicyPriority)
×
1034
                                        if err != nil {
×
1035
                                                klog.Errorf("check ecmp policy route exist for subnet %v, error %v", subnet.Name, err)
1036
                                                continue
1037
                                        }
×
1038
                                        if exist {
1039
                                                continue
1040
                                        }
×
1041

×
1042
                                        nextHops, nameIPMap, err := c.getPolicyRouteParas(cidrBlock, util.GatewayRouterPolicyPriority)
×
1043
                                        if err != nil {
×
1044
                                                klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
1045
                                                continue
×
1046
                                        }
×
1047
                                        nextHops.Add(nextHop)
×
1048
                                        if nameIPMap == nil {
×
1049
                                                nameIPMap = make(map[string]string, 1)
×
1050
                                        }
×
1051
                                        nameIPMap[nodeName] = nextHop
×
1052
                                        klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
×
1053
                                        if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIPMap); err != nil {
×
1054
                                                klog.Errorf("failed to update policy route for subnet %s on node %s, %v", subnet.Name, nodeName, err)
×
1055
                                                return err
×
1056
                                        }
×
1057
                                }
×
1058
                        }
×
1059
                } else {
×
1060
                        if subnet.Status.ActivateGateway != nodeName {
1061
                                continue
×
1062
                        }
×
1063
                        klog.Infof("add policy route for centralized subnet %s, on node %s, ip %s", subnet.Name, nodeName, nodeIP)
×
1064
                        if err = c.addPolicyRouteForCentralizedSubnet(subnet, nodeName, nil, strings.Split(nodeIP, ",")); err != nil {
×
1065
                                klog.Errorf("failed to add active-backup policy route for centralized subnet %s: %v", subnet.Name, err)
×
1066
                                return err
1067
                        }
×
1068
                }
×
1069
        }
×
1070
        return nil
1071
}
×
1072

×
1073
func (c *Controller) addPolicyRouteForLocalDNSCacheOnNode(dnsIPs []string, nodePortName, nodeIP, nodeName string, af int) error {
×
1074
        if len(dnsIPs) == 0 {
1075
                return c.deletePolicyRouteForLocalDNSCacheOnNode(nodeName, af)
1076
        }
×
1077

×
1078
        policies, err := c.OVNNbClient.GetLogicalRouterPoliciesByExtID(c.config.ClusterRouter, "node", nodeName)
×
1079
        if err != nil {
×
1080
                klog.Errorf("failed to list logical router policies with external-ids:node = %q: %v", nodeName, err)
×
1081
                return err
1082
        }
1083

×
1084
        var (
×
1085
                externalIDs = map[string]string{
×
1086
                        "vendor":          util.CniTypeName,
×
1087
                        "node":            nodeName,
×
1088
                        "address-family":  strconv.Itoa(af),
×
1089
                        "isLocalDnsCache": "true",
×
1090
                }
×
1091
                pgAs     = strings.ReplaceAll(fmt.Sprintf("%s_ip%d", nodePortName, af), "-", ".")
×
1092
                action   = kubeovnv1.PolicyRouteActionReroute
×
1093
                nextHops = []string{nodeIP}
×
1094
        )
×
1095
        matches := strset.NewWithSize(len(dnsIPs))
×
1096
        for _, ip := range dnsIPs {
×
1097
                matches.Add(fmt.Sprintf("ip%d.src == $%s && ip%d.dst == %s", af, pgAs, af, ip))
×
1098
        }
1099

1100
        for _, policy := range policies {
×
1101
                if len(policy.ExternalIDs) == 0 || policy.ExternalIDs["vendor"] != util.CniTypeName || policy.ExternalIDs["isLocalDnsCache"] != "true" {
1102
                        continue
1103
                }
×
1104
                if policy.Priority == util.NodeRouterPolicyPriority && policy.Action == string(action) && slices.Equal(policy.Nexthops, nextHops) && matches.Has(policy.Match) {
×
1105
                        matches.Remove(policy.Match)
×
1106
                        continue
×
1107
                }
×
1108
                // delete unused policy router policy
×
1109
                klog.Infof("deleting logical router policy by UUID %s", policy.UUID)
×
1110
                if err = c.OVNNbClient.DeleteLogicalRouterPolicyByUUID(c.config.ClusterRouter, policy.UUID); err != nil {
×
1111
                        klog.Errorf("failed to delete logical router policy by UUID %s: %v", policy.UUID, err)
×
1112
                        return err
×
1113
                }
×
1114
        }
×
1115

×
1116
        for _, match := range matches.List() {
×
1117
                klog.Infof("add node local dns cache policy route for router %s: match %q, action %q, nexthop %q, externalID %v", c.config.ClusterRouter, match, action, nodeIP, externalIDs)
1118
                if err := c.addPolicyRouteToVpc(
×
1119
                        c.config.ClusterRouter,
×
1120
                        &kubeovnv1.PolicyRoute{
×
1121
                                Priority:  util.NodeRouterPolicyPriority,
×
1122
                                Match:     match,
×
1123
                                Action:    action,
×
1124
                                NextHopIP: nodeIP,
×
1125
                        },
1126
                        externalIDs,
×
1127
                ); err != nil {
1128
                        klog.Errorf("failed to add logical router policy for node %s: %v", nodeName, err)
1129
                        return err
1130
                }
1131
        }
1132

1133
        return nil
1134
}
1135

1136
func (c *Controller) deletePolicyRouteForLocalDNSCacheOnNode(nodeName string, af int) error {
1137
        policies, err := c.OVNNbClient.ListLogicalRouterPolicies(c.config.ClusterRouter, -1, map[string]string{
1138
                "vendor":          util.CniTypeName,
1139
                "node":            nodeName,
1140
                "address-family":  strconv.Itoa(af),
1141
                "isLocalDnsCache": "true",
1142
        }, true)
1143
        if err != nil {
1144
                klog.Errorf("failed to list logical router policies: %v", err)
1145
                return err
1146
        }
1147
        if len(policies) == 0 {
1148
                return nil
1149
        }
1150

1151
        for _, policy := range policies {
1152
                klog.Infof("delete node local dns cache policy route for router %s with match %s", c.config.ClusterRouter, policy.Match)
1153

1154
                if err := c.OVNNbClient.DeleteLogicalRouterPolicyByUUID(c.config.ClusterRouter, policy.UUID); err != nil {
1155
                        klog.Errorf("failed to delete policy route for node local dns in router %s with match %s: %v", c.config.ClusterRouter, policy.Match, err)
1156
                        return err
1157
                }
1158
        }
1159
        return nil
1160
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc