• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 27017056817

05 Jun 2026 01:15PM UTC coverage: 25.681% (+0.09%) from 25.587%
27017056817

push

github

web-flow
perf(daemon): list subnets and pods once per gateway setter (#6827)

* perf(daemon): list subnets and pods once per gateway setter

runGateway runs every 3s, and each of setIPSet/setPolicyRouting/setIptables
re-listed the whole subnet cache up to 15 times per dual-stack tick (and pods
2-4 times). List subnets/pods once per setter and thread the slice through the
helpers, dropping the now-redundant error returns whose only source was List.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>

* perf(daemon): log getCidrByProtocol errors instead of dropping them

Address review feedback: getSubnetsNeedNAT, getSubnetsDistributedGateway and
getDefaultVpcSubnetsCIDR silently skipped subnets when getCidrByProtocol
failed, and StartTProxyTCPPortProbe swallowed getTProxyConditionPod errors.
Log these errors to match the existing handling in reconcileNatOutGoingPolicyIPset
and generateNatOutgoingPolicyChainRules; the error path only triggers on
malformed CIDRs, not on normal single-stack subnets.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>

---------

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

0 of 80 new or added lines in 3 files covered. (0.0%)

433 existing lines in 5 files now uncovered.

14867 of 57891 relevant lines covered (25.68%)

0.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

5.45
/pkg/controller/init.go
1
package controller
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "errors"
7
        "fmt"
8
        "maps"
9
        "strings"
10
        "time"
11

12
        "github.com/scylladb/go-set/strset"
13
        v1 "k8s.io/api/core/v1"
14
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
15
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16
        "k8s.io/apimachinery/pkg/labels"
17
        "k8s.io/apimachinery/pkg/types"
18
        "k8s.io/client-go/tools/cache"
19
        "k8s.io/klog/v2"
20
        "sigs.k8s.io/controller-runtime/pkg/client"
21
        "sigs.k8s.io/controller-runtime/pkg/client/config"
22
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
23

24
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
25
        "github.com/kubeovn/kube-ovn/pkg/ovs"
26
        "github.com/kubeovn/kube-ovn/pkg/ovsdb/ovnnb"
27
        "github.com/kubeovn/kube-ovn/pkg/util"
28
)
29

30
func (c *Controller) InitOVN() error {
×
31
        var err error
×
32

×
33
        // migrate vendor externalIDs to kube-ovn resources created in versions prior to v1.15.0
×
34
        // this must run before ACL cleanup to ensure existing resources are properly tagged
×
35
        if err = c.OVNNbClient.MigrateVendorExternalIDs(); err != nil {
×
36
                klog.Errorf("failed to migrate vendor externalIDs: %v", err)
×
37
                return err
×
38
        }
×
39

40
        // migrate tier field of ACL rules created in versions prior to v1.13.0
41
        // after upgrading, the tier field has a default value of zero, which is not the value used in versions >= v1.13.0
42
        // we need to migrate the tier field to the correct value
43
        if err = c.OVNNbClient.MigrateACLTier(); err != nil {
×
44
                klog.Errorf("failed to migrate ACL tier: %v", err)
×
45
                return err
×
46
        }
×
47

48
        // clean all no parent key acls
49
        if err = c.OVNNbClient.CleanNoParentKeyAcls(); err != nil {
×
50
                klog.Errorf("failed to clean all no parent key acls: %v", err)
×
51
                return err
×
52
        }
×
53

54
        if err = c.InitDefaultVpc(); err != nil {
×
55
                klog.Errorf("init default vpc failed: %v", err)
×
56
                return err
×
57
        }
×
58

59
        if err = c.initClusterRouter(); err != nil {
×
60
                klog.Errorf("init cluster router failed: %v", err)
×
61
                return err
×
62
        }
×
63

64
        if c.config.EnableLb {
×
65
                if err = c.initLoadBalancer(); err != nil {
×
66
                        klog.Errorf("init load balancer failed: %v", err)
×
67
                        return err
×
68
                }
×
69
        }
70

71
        if err = c.initDefaultVlan(); err != nil {
×
72
                klog.Errorf("init default vlan failed: %v", err)
×
73
                return err
×
74
        }
×
75

76
        if err = c.initNodeSwitch(); err != nil {
×
77
                klog.Errorf("init node switch failed: %v", err)
×
78
                return err
×
79
        }
×
80

81
        if err = c.initDefaultLogicalSwitch(); err != nil {
×
82
                klog.Errorf("init default switch failed: %v", err)
×
83
                return err
×
84
        }
×
85

86
        return nil
×
87
}
88

89
func (c *Controller) InitDefaultVpc() error {
×
90
        cachedVpc, err := c.vpcsLister.Get(c.config.ClusterRouter)
×
91
        if err != nil {
×
92
                if !k8serrors.IsNotFound(err) {
×
93
                        klog.Errorf("failed to get default vpc %q: %v", c.config.ClusterRouter, err)
×
94
                        return err
×
95
                }
×
96
                // create default vpc
97
                vpc := &kubeovnv1.Vpc{
×
98
                        ObjectMeta: metav1.ObjectMeta{Name: c.config.ClusterRouter},
×
99
                }
×
100
                cachedVpc, err = c.config.KubeOvnClient.KubeovnV1().Vpcs().Create(context.Background(), vpc, metav1.CreateOptions{})
×
101
                if err != nil {
×
102
                        klog.Errorf("failed to create default vpc %q: %v", c.config.ClusterRouter, err)
×
103
                        return err
×
104
                }
×
105
        }
106

107
        // update default vpc status
108
        vpc := cachedVpc.DeepCopy()
×
109
        if !vpc.Status.Default || !vpc.Status.Standby ||
×
110
                vpc.Status.Router != c.config.ClusterRouter ||
×
111
                vpc.Status.DefaultLogicalSwitch != c.config.DefaultLogicalSwitch {
×
112
                vpc.Status.Standby = true
×
113
                vpc.Status.Default = true
×
114
                vpc.Status.Router = c.config.ClusterRouter
×
115
                vpc.Status.DefaultLogicalSwitch = c.config.DefaultLogicalSwitch
×
116

×
117
                if _, err = c.config.KubeOvnClient.KubeovnV1().Vpcs().UpdateStatus(context.Background(), vpc, metav1.UpdateOptions{}); err != nil {
×
118
                        klog.Errorf("failed to update default vpc %q: %v", c.config.ClusterRouter, err)
×
119
                        return err
×
120
                }
×
121
        }
122

123
        return nil
×
124
}
125

126
// InitDefaultLogicalSwitch init the default logical switch for ovn network
127
func (c *Controller) initDefaultLogicalSwitch() error {
×
128
        subnet, err := c.subnetsLister.Get(c.config.DefaultLogicalSwitch)
×
129
        if err == nil {
×
130
                if subnet != nil && util.CheckProtocol(c.config.DefaultCIDR) != util.CheckProtocol(subnet.Spec.CIDRBlock) {
×
131
                        // single-stack upgrade to dual-stack
×
132
                        if util.CheckProtocol(c.config.DefaultCIDR) == kubeovnv1.ProtocolDual {
×
133
                                subnet := subnet.DeepCopy()
×
134
                                subnet.Spec.CIDRBlock = c.config.DefaultCIDR
×
135
                                if _, err = c.formatSubnet(subnet); err != nil {
×
136
                                        klog.Errorf("init format subnet %s failed: %v", c.config.DefaultLogicalSwitch, err)
×
137
                                        return err
×
138
                                }
×
139
                        }
140
                }
141
                return nil
×
142
        }
143

144
        if !k8serrors.IsNotFound(err) {
×
145
                klog.Errorf("get default subnet %s failed: %v", c.config.DefaultLogicalSwitch, err)
×
146
                return err
×
147
        }
×
148

149
        defaultSubnet := kubeovnv1.Subnet{
×
150
                ObjectMeta: metav1.ObjectMeta{Name: c.config.DefaultLogicalSwitch},
×
151
                Spec: kubeovnv1.SubnetSpec{
×
152
                        Vpc:                 c.config.ClusterRouter,
×
153
                        Default:             true,
×
154
                        Provider:            util.OvnProvider,
×
155
                        CIDRBlock:           c.config.DefaultCIDR,
×
156
                        Gateway:             c.config.DefaultGateway,
×
157
                        GatewayNode:         "",
×
158
                        DisableGatewayCheck: !c.config.DefaultGatewayCheck,
×
159
                        ExcludeIps:          strings.Split(c.config.DefaultExcludeIps, ","),
×
160
                        NatOutgoing:         true,
×
161
                        GatewayType:         kubeovnv1.GWDistributedType,
×
162
                        Protocol:            util.CheckProtocol(c.config.DefaultCIDR),
×
163
                        EnableLb:            &c.config.EnableLb,
×
164
                },
×
165
        }
×
166
        if c.config.NetworkType == util.NetworkTypeVlan {
×
167
                defaultSubnet.Spec.Vlan = c.config.DefaultVlanName
×
168
                if c.config.DefaultLogicalGateway && c.config.DefaultU2OInterconnection {
×
169
                        err = errors.New("logicalGateway and u2oInterconnection can't be opened at the same time")
×
170
                        klog.Error(err)
×
171
                        return err
×
172
                }
×
173
                defaultSubnet.Spec.LogicalGateway = c.config.DefaultLogicalGateway
×
174
                defaultSubnet.Spec.U2OInterconnection = c.config.DefaultU2OInterconnection
×
175
        }
176

177
        if _, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Create(context.Background(), &defaultSubnet, metav1.CreateOptions{}); err != nil {
×
178
                klog.Errorf("failed to create default subnet %q: %v", c.config.DefaultLogicalSwitch, err)
×
179
                return err
×
180
        }
×
181
        return nil
×
182
}
183

184
// InitNodeSwitch init node switch to connect host and pod
185
func (c *Controller) initNodeSwitch() error {
×
186
        subnet, err := c.subnetsLister.Get(c.config.NodeSwitch)
×
187
        if err == nil {
×
188
                if util.CheckProtocol(c.config.NodeSwitchCIDR) == kubeovnv1.ProtocolDual && util.CheckProtocol(subnet.Spec.CIDRBlock) != kubeovnv1.ProtocolDual {
×
189
                        // single-stack upgrade to dual-stack
×
190
                        subnet := subnet.DeepCopy()
×
191
                        subnet.Spec.CIDRBlock = c.config.NodeSwitchCIDR
×
192
                        if _, err = c.formatSubnet(subnet); err != nil {
×
193
                                klog.Errorf("init format subnet %s failed: %v", c.config.NodeSwitch, err)
×
194
                                return err
×
195
                        }
×
196
                } else {
×
197
                        c.config.NodeSwitchCIDR = subnet.Spec.CIDRBlock
×
198
                }
×
199
                return nil
×
200
        }
201

202
        if !k8serrors.IsNotFound(err) {
×
203
                klog.Errorf("get node subnet %s failed: %v", c.config.NodeSwitch, err)
×
204
                return err
×
205
        }
×
206

207
        nodeSubnet := kubeovnv1.Subnet{
×
208
                ObjectMeta: metav1.ObjectMeta{Name: c.config.NodeSwitch},
×
209
                Spec: kubeovnv1.SubnetSpec{
×
210
                        Vpc:                    c.config.ClusterRouter,
×
211
                        Default:                false,
×
212
                        Provider:               util.OvnProvider,
×
213
                        CIDRBlock:              c.config.NodeSwitchCIDR,
×
214
                        Gateway:                c.config.NodeSwitchGateway,
×
215
                        GatewayNode:            "",
×
216
                        ExcludeIps:             strings.Split(c.config.NodeSwitchGateway, ","),
×
217
                        Protocol:               util.CheckProtocol(c.config.NodeSwitchCIDR),
×
218
                        DisableInterConnection: true,
×
219
                },
×
220
        }
×
221

×
222
        if _, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Create(context.Background(), &nodeSubnet, metav1.CreateOptions{}); err != nil {
×
223
                klog.Errorf("failed to create node subnet %q: %v", c.config.NodeSwitch, err)
×
224
                return err
×
225
        }
×
226
        return nil
×
227
}
228

229
// InitClusterRouter init cluster router to connect different logical switches
230
func (c *Controller) initClusterRouter() error {
×
231
        if err := c.OVNNbClient.CreateLogicalRouter(c.config.ClusterRouter); err != nil {
×
232
                klog.Errorf("create logical router %s failed: %v", c.config.ClusterRouter, err)
×
233
                return err
×
234
        }
×
235

236
        lr, err := c.OVNNbClient.GetLogicalRouter(c.config.ClusterRouter, false)
×
237
        if err != nil {
×
238
                klog.Errorf("get logical router %s failed: %v", c.config.ClusterRouter, err)
×
239
                return err
×
240
        }
×
241

242
        lrOptions := make(map[string]string, len(lr.Options))
×
243
        maps.Copy(lrOptions, lr.Options)
×
244
        lrOptions["mac_binding_age_threshold"] = "300"
×
245
        lrOptions["dynamic_neigh_routers"] = "true"
×
246
        if !maps.Equal(lr.Options, lrOptions) {
×
247
                lr.Options = lrOptions
×
248
                if err = c.OVNNbClient.UpdateLogicalRouter(lr, &lr.Options); err != nil {
×
249
                        klog.Errorf("update logical router %s failed: %v", c.config.ClusterRouter, err)
×
250
                        return err
×
251
                }
×
252
        }
253

254
        return nil
×
255
}
256

257
func (c *Controller) initLB(name, protocol string, sessionAffinity bool) error {
1✔
258
        protocol = strings.ToLower(protocol)
1✔
259

1✔
260
        var (
1✔
261
                selectFields []string
1✔
262
                err          error
1✔
263
        )
1✔
264

1✔
265
        if sessionAffinity {
2✔
266
                selectFields = []string{
1✔
267
                        ovnnb.LoadBalancerSelectionFieldsIPSrc,
1✔
268
                        ovnnb.LoadBalancerSelectionFieldsIpv6Src,
1✔
269
                }
1✔
270
        }
1✔
271

272
        if err = c.OVNNbClient.CreateLoadBalancer(name, protocol, selectFields...); err != nil {
1✔
273
                klog.Errorf("create load balancer %s: %v", name, err)
×
274
                return err
×
275
        }
×
276

277
        if sessionAffinity {
2✔
278
                if err = c.OVNNbClient.SetLoadBalancerAffinityTimeout(name, util.DefaultServiceSessionStickinessTimeout); err != nil {
1✔
279
                        klog.Errorf("failed to set affinity timeout of %s load balancer %s: %v", protocol, name, err)
×
280
                        return err
×
281
                }
×
282
        }
283

284
        err = c.OVNNbClient.SetLoadBalancerPreferLocalBackend(name, c.config.EnableOVNLBPreferLocal)
1✔
285
        if err != nil {
1✔
286
                klog.Errorf("failed to set prefer local backend for load balancer %s: %v", name, err)
×
287
                return err
×
288
        }
×
289

290
        // ct_flush wipes all conntrack entries on the LB's datapath whenever a vip
291
        // is mutated. Session-affinity LBs are shared across services, and their
292
        // per-client affinity binding is carried in conntrack; enabling ct_flush on
293
        // those LBs lets an unrelated service's backend change invalidate another
294
        // service's active affinity. Only enable ct_flush on non-session UDP LBs.
295
        if protocol == "udp" && !sessionAffinity {
2✔
296
                if err = c.OVNNbClient.SetLoadBalancerCtFlush(name, true); err != nil {
1✔
297
                        klog.Errorf("failed to set ct_flush for load balancer %s: %v", name, err)
×
298
                        return err
×
299
                }
×
300
        }
301

302
        return nil
1✔
303
}
304

305
// InitLoadBalancer creates the default TCP/UDP/SCTP cluster load balancers in
306
// OVN for every existing VPC and records their names in each VPC's status so
307
// the subnet worker can attach them to its logical switch on its first
308
// reconcile.
309
//
310
// The status write uses a targeted merge patch that contains only the six
311
// LB-name fields. An earlier version serialized the whole VpcStatus via
312
// vpc.Status.Bytes() and raced InitDefaultVpc: if the VPC lister cache still
313
// held the pre-UpdateStatus copy (Standby=false) the whole-status merge patch
314
// would silently overwrite the Standby/Default/Router/DefaultLogicalSwitch
315
// fields that InitDefaultVpc had just written, deadlocking the subnet worker.
316
// A field-scoped patch avoids that class of overwrite entirely.
317
func (c *Controller) initLoadBalancer() error {
×
318
        vpcs, err := c.vpcsLister.List(labels.Everything())
×
319
        if err != nil {
×
320
                klog.Errorf("failed to list vpc: %v", err)
×
321
                return err
×
322
        }
×
323

324
        for _, cachedVpc := range vpcs {
×
325
                vpcLb := c.GenVpcLoadBalancer(cachedVpc.Name)
×
326
                if err = c.initLB(vpcLb.TCPLoadBalancer, string(v1.ProtocolTCP), false); err != nil {
×
327
                        klog.Error(err)
×
328
                        return err
×
329
                }
×
330
                if err = c.initLB(vpcLb.TCPSessLoadBalancer, string(v1.ProtocolTCP), true); err != nil {
×
331
                        klog.Error(err)
×
332
                        return err
×
333
                }
×
334
                if err = c.initLB(vpcLb.UDPLoadBalancer, string(v1.ProtocolUDP), false); err != nil {
×
335
                        klog.Error(err)
×
336
                        return err
×
337
                }
×
338
                if err = c.initLB(vpcLb.UDPSessLoadBalancer, string(v1.ProtocolUDP), true); err != nil {
×
339
                        klog.Error(err)
×
340
                        return err
×
341
                }
×
342
                if err = c.initLB(vpcLb.SctpLoadBalancer, string(v1.ProtocolSCTP), false); err != nil {
×
343
                        klog.Error(err)
×
344
                        return err
×
345
                }
×
346
                if err = c.initLB(vpcLb.SctpSessLoadBalancer, string(v1.ProtocolSCTP), true); err != nil {
×
347
                        klog.Error(err)
×
348
                        return err
×
349
                }
×
350

351
                body, err := buildVpcLBStatusPatch(vpcLb)
×
352
                if err != nil {
×
353
                        klog.Error(err)
×
354
                        return err
×
355
                }
×
356
                if _, err = c.config.KubeOvnClient.KubeovnV1().Vpcs().Patch(context.Background(), cachedVpc.Name, types.MergePatchType, body, metav1.PatchOptions{}, "status"); err != nil {
×
357
                        klog.Error(err)
×
358
                        return err
×
359
                }
×
360
        }
361
        return nil
×
362
}
363

364
// buildVpcLBStatusPatch builds a merge-patch body that updates only the six
365
// LB-name fields of VpcStatus. It deliberately excludes every other field so
366
// the merge patch cannot overwrite state owned by InitDefaultVpc (Standby,
367
// Default, Router, DefaultLogicalSwitch) when the caller reads from a stale
368
// lister cache.
369
func buildVpcLBStatusPatch(vpcLb *VpcLoadBalancer) ([]byte, error) {
1✔
370
        patch := struct {
1✔
371
                Status struct {
1✔
372
                        TCPLoadBalancer         string `json:"tcpLoadBalancer"`
1✔
373
                        TCPSessionLoadBalancer  string `json:"tcpSessionLoadBalancer"`
1✔
374
                        UDPLoadBalancer         string `json:"udpLoadBalancer"`
1✔
375
                        UDPSessionLoadBalancer  string `json:"udpSessionLoadBalancer"`
1✔
376
                        SctpLoadBalancer        string `json:"sctpLoadBalancer"`
1✔
377
                        SctpSessionLoadBalancer string `json:"sctpSessionLoadBalancer"`
1✔
378
                } `json:"status"`
1✔
379
        }{}
1✔
380
        patch.Status.TCPLoadBalancer = vpcLb.TCPLoadBalancer
1✔
381
        patch.Status.TCPSessionLoadBalancer = vpcLb.TCPSessLoadBalancer
1✔
382
        patch.Status.UDPLoadBalancer = vpcLb.UDPLoadBalancer
1✔
383
        patch.Status.UDPSessionLoadBalancer = vpcLb.UDPSessLoadBalancer
1✔
384
        patch.Status.SctpLoadBalancer = vpcLb.SctpLoadBalancer
1✔
385
        patch.Status.SctpSessionLoadBalancer = vpcLb.SctpSessLoadBalancer
1✔
386
        return json.Marshal(patch)
1✔
387
}
1✔
388

389
func (c *Controller) InitIPAM() error {
×
390
        start := time.Now()
×
391
        subnets, err := c.subnetsLister.List(labels.Everything())
×
392
        if err != nil {
×
393
                klog.Errorf("failed to list subnet: %v", err)
×
394
                return err
×
395
        }
×
396
        subnetProviderMaps := make(map[string]string, len(subnets))
×
397
        for _, subnet := range subnets {
×
398
                klog.Infof("Init subnet %s", subnet.Name)
×
399
                subnetProviderMaps[subnet.Name] = subnet.Spec.Provider
×
400
                if err := c.ipam.AddOrUpdateSubnet(subnet.Name, subnet.Spec.CIDRBlock, subnet.Spec.Gateway, subnet.Spec.ExcludeIps); err != nil {
×
401
                        klog.Errorf("failed to init subnet %s: %v", subnet.Name, err)
×
402
                }
×
403

404
                u2oInterconnName := fmt.Sprintf(util.U2OInterconnName, subnet.Spec.Vpc, subnet.Name)
×
405
                u2oInterconnLrpName := fmt.Sprintf("%s-%s", subnet.Spec.Vpc, subnet.Name)
×
406
                if subnet.Status.U2OInterconnectionIP != "" {
×
407
                        var mac *string
×
408
                        klog.Infof("Init U2O for subnet %s", subnet.Name)
×
409
                        if subnet.Status.U2OInterconnectionMAC != "" {
×
410
                                mac = new(subnet.Status.U2OInterconnectionMAC)
×
411
                        } else {
×
412
                                lrp, err := c.OVNNbClient.GetLogicalRouterPort(u2oInterconnLrpName, true)
×
413
                                if err != nil {
×
414
                                        klog.Errorf("failed to get logical router port %s: %v", u2oInterconnLrpName, err)
×
415
                                        return err
×
416
                                }
×
417
                                if lrp != nil {
×
418
                                        mac = new(lrp.MAC)
×
419
                                }
×
420
                        }
421
                        if _, _, _, err = c.ipam.GetStaticAddress(u2oInterconnName, u2oInterconnLrpName, subnet.Status.U2OInterconnectionIP, mac, subnet.Name, true); err != nil {
×
422
                                klog.Errorf("failed to init subnet %q u2o interconnection ip to ipam %v", subnet.Name, err)
×
423
                        }
×
424
                }
425
        }
426

427
        ippools, err := c.ippoolLister.List(labels.Everything())
×
428
        if err != nil {
×
429
                klog.Errorf("failed to list ippool: %v", err)
×
430
                return err
×
431
        }
×
432
        for _, ippool := range ippools {
×
433
                if err = c.ipam.AddOrUpdateIPPool(ippool.Spec.Subnet, ippool.Name, ippool.Spec.IPs); err != nil {
×
434
                        klog.Errorf("failed to init ippool %s: %v", ippool.Name, err)
×
435
                }
×
436
        }
437

438
        klog.Infof("Init IPAM from StatefulSet or VM IP CR")
×
439
        ips, err := c.ipsLister.List(labels.Everything())
×
440
        if err != nil {
×
441
                klog.Errorf("failed to list IPs: %v", err)
×
442
                return err
×
443
        }
×
444

445
        for _, ip := range ips {
×
446
                if !ip.DeletionTimestamp.IsZero() {
×
447
                        klog.Infof("enqueue update for removing finalizer to delete ip %s", ip.Name)
×
448
                        c.updateIPQueue.Add(ip.Name)
×
449
                        continue
×
450
                }
451
                // recover sts and kubevirt vm ip, other ip recover in later pod loop
452
                if ip.Spec.PodType != util.KindStatefulSet &&
×
453
                        ip.Spec.PodType != util.KindVirtualMachine {
×
454
                        continue
×
455
                }
456

457
                var ipamKey string
×
458
                if ip.Spec.Namespace != "" {
×
459
                        ipamKey = fmt.Sprintf("%s/%s", ip.Spec.Namespace, ip.Spec.PodName)
×
460
                } else {
×
461
                        ipamKey = util.NodeLspName(ip.Spec.PodName)
×
462
                }
×
463
                if _, _, _, err = c.ipam.GetStaticAddress(ipamKey, ip.Name, ip.Spec.IPAddress, &ip.Spec.MacAddress, ip.Spec.Subnet, true); err != nil {
×
464
                        klog.Errorf("failed to init IPAM from IP CR %s: %v", ip.Name, err)
×
465
                }
×
466
        }
467

468
        klog.Infof("Init IPAM from pod")
×
469
        pods, err := c.podsLister.List(labels.Everything())
×
470
        if err != nil {
×
471
                klog.Errorf("failed to list pods: %v", err)
×
472
                return err
×
473
        }
×
474
        for _, pod := range pods {
×
475
                if pod.Spec.HostNetwork {
×
476
                        continue
×
477
                }
478

479
                isAlive := isPodAlive(pod)
×
480
                isStsPod, _, _ := isStatefulSetPod(pod)
×
481
                if !isAlive && !isStsPod {
×
482
                        continue
×
483
                }
484

485
                if !hasAllocatedAnnotation(pod) {
×
486
                        continue
×
487
                }
488

489
                podNets, err := c.getPodKubeovnNets(pod)
×
490
                if err != nil {
×
491
                        klog.Errorf("failed to get pod kubeovn nets %s.%s address %s: %v", pod.Name, pod.Namespace, pod.Annotations[util.IPAddressAnnotation], err)
×
492
                        continue
×
493
                }
494

495
                podType := getPodType(pod)
×
496
                podName := c.getNameByPod(pod)
×
497
                key := cache.NewObjectName(pod.Namespace, podName).String()
×
498
                for _, podNet := range podNets {
×
499
                        if pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)] == "true" {
×
500
                                portName := ovs.PodNameToPortName(podName, pod.Namespace, podNet.ProviderName)
×
501
                                ip := pod.Annotations[fmt.Sprintf(util.IPAddressAnnotationTemplate, podNet.ProviderName)]
×
502
                                mac := pod.Annotations[fmt.Sprintf(util.MacAddressAnnotationTemplate, podNet.ProviderName)]
×
503
                                if ip == "" {
×
504
                                        klog.Warningf("pod %s/%s has empty IP annotation for provider %s, skip IPAM init", pod.Namespace, podName, podNet.ProviderName)
×
505
                                        continue
×
506
                                }
507
                                _, _, _, err := c.ipam.GetStaticAddress(key, portName, ip, &mac, podNet.Subnet.Name, true)
×
508
                                if err != nil {
×
509
                                        klog.Errorf("failed to init pod %s.%s address %s: %v", podName, pod.Namespace, ip, err)
×
510
                                } else {
×
511
                                        err = c.createOrUpdateIPCR(portName, podName, ip, mac, podNet.Subnet.Name, pod.Namespace, pod.Spec.NodeName, podType)
×
512
                                        if err != nil {
×
513
                                                klog.Errorf("failed to create/update ips CR %s.%s with ip address %s: %v", podName, pod.Namespace, ip, err)
×
514
                                        }
×
515
                                }
516

517
                                // Append ExternalIds is added in v1.7, used for upgrading from v1.6.3. It should be deleted now since v1.7 is not used anymore.
518
                        }
519
                }
520
        }
521

522
        klog.Infof("Init IPAM from vip CR")
×
523
        vips, err := c.virtualIpsLister.List(labels.Everything())
×
524
        if err != nil {
×
525
                klog.Errorf("failed to list vips: %v", err)
×
526
                return err
×
527
        }
×
528
        for _, vip := range vips {
×
529
                provider, ok := subnetProviderMaps[vip.Spec.Subnet]
×
530
                if !ok {
×
531
                        klog.Errorf("failed to find subnet %s for vip %s", vip.Spec.Subnet, vip.Name)
×
532
                        continue
×
533
                }
534
                portName := ovs.PodNameToPortName(vip.Name, vip.Spec.Namespace, provider)
×
535
                if _, _, _, err = c.ipam.GetStaticAddress(vip.Name, portName, vip.Status.V4ip, &vip.Status.Mac, vip.Spec.Subnet, true); err != nil {
×
536
                        klog.Errorf("failed to init ipam from vip cr %s: %v", vip.Name, err)
×
537
                }
×
538
        }
539

540
        klog.Infof("Init IPAM from iptables EIP CR")
×
541
        eips, err := c.iptablesEipsLister.List(labels.Everything())
×
542
        if err != nil {
×
543
                klog.Errorf("failed to list EIPs: %v", err)
×
544
                return err
×
545
        }
×
546
        for _, eip := range eips {
×
547
                externalNetwork := util.GetExternalNetwork(eip.Spec.ExternalSubnet)
×
548
                if _, _, _, err = c.ipam.GetStaticAddress(eip.Name, eip.Name, eip.Status.IP, &eip.Spec.MacAddress, externalNetwork, true); err != nil {
×
549
                        klog.Errorf("failed to init ipam from iptables eip cr %s: %v", eip.Name, err)
×
550
                }
×
551
        }
552

553
        klog.Infof("Init IPAM from ovn EIP CR")
×
554
        oeips, err := c.ovnEipsLister.List(labels.Everything())
×
555
        if err != nil {
×
556
                klog.Errorf("failed to list ovn eips: %v", err)
×
557
                return err
×
558
        }
×
559
        for _, oeip := range oeips {
×
560
                if _, _, _, err = c.ipam.GetStaticAddress(oeip.Name, oeip.Name, oeip.Status.V4Ip, &oeip.Status.MacAddress, oeip.Spec.ExternalSubnet, true); err != nil {
×
561
                        klog.Errorf("failed to init ipam from ovn eip cr %s: %v", oeip.Name, err)
×
562
                }
×
563
        }
564

565
        klog.Infof("Init IPAM from node annotation")
×
566
        nodes, err := c.nodesLister.List(labels.Everything())
×
567
        if err != nil {
×
568
                klog.Errorf("failed to list nodes: %v", err)
×
569
                return err
×
570
        }
×
571
        for _, node := range nodes {
×
572
                if node.Annotations[util.AllocatedAnnotation] == "true" {
×
573
                        portName := util.NodeLspName(node.Name)
×
574
                        mac := node.Annotations[util.MacAddressAnnotation]
×
575
                        v4IP, v6IP, _, err := c.ipam.GetStaticAddress(portName, portName,
×
576
                                node.Annotations[util.IPAddressAnnotation], &mac,
×
577
                                node.Annotations[util.LogicalSwitchAnnotation], true)
×
578
                        if err != nil {
×
579
                                klog.Errorf("failed to init node %s.%s address %s: %v", node.Name, node.Namespace, node.Annotations[util.IPAddressAnnotation], err)
×
580
                        }
×
581
                        if v4IP != "" && v6IP != "" {
×
582
                                ipStr := util.GetStringIP(v4IP, v6IP)
×
583
                                if ipStr != node.Annotations[util.IPAddressAnnotation] {
×
584
                                        patch := util.KVPatch{util.IPAddressAnnotation: ipStr}
×
585
                                        if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
×
586
                                                klog.Errorf("failed to patch node %s IP annotation: %v", node.Name, err)
×
587
                                        }
×
588
                                }
589
                        }
590
                }
591
        }
592

593
        klog.Infof("take %.2f seconds to initialize IPAM", time.Since(start).Seconds())
×
594
        return nil
×
595
}
596

597
func (c *Controller) initDefaultProviderNetwork() error {
×
598
        _, err := c.providerNetworksLister.Get(c.config.DefaultProviderName)
×
599
        if err == nil {
×
600
                return nil
×
601
        }
×
602
        if !k8serrors.IsNotFound(err) {
×
603
                klog.Errorf("failed to get default provider network %s: %v", c.config.DefaultProviderName, err)
×
604
                return err
×
605
        }
×
606

607
        nodes, err := c.nodesLister.List(labels.Everything())
×
608
        if err != nil {
×
609
                klog.Errorf("failed to get nodes: %v", err)
×
610
                return err
×
611
        }
×
612

613
        pn := kubeovnv1.ProviderNetwork{
×
614
                ObjectMeta: metav1.ObjectMeta{
×
615
                        Name: c.config.DefaultProviderName,
×
616
                },
×
617
                Spec: kubeovnv1.ProviderNetworkSpec{
×
618
                        DefaultInterface: c.config.DefaultHostInterface,
×
619
                        ExchangeLinkName: c.config.DefaultExchangeLinkName,
×
620
                },
×
621
        }
×
622

×
623
        excludeAnno := fmt.Sprintf(util.ProviderNetworkExcludeTemplate, c.config.DefaultProviderName)
×
624
        interfaceAnno := fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, c.config.DefaultProviderName)
×
625
        patchNodes := make([]string, 0, len(nodes))
×
626
        for _, node := range nodes {
×
627
                if len(node.Annotations) == 0 {
×
628
                        continue
×
629
                }
630

631
                if node.Annotations[excludeAnno] == "true" {
×
632
                        pn.Spec.ExcludeNodes = append(pn.Spec.ExcludeNodes, node.Name)
×
633
                        patchNodes = append(patchNodes, node.Name)
×
634
                } else if s := node.Annotations[interfaceAnno]; s != "" {
×
635
                        var index *int
×
636
                        for i := range pn.Spec.CustomInterfaces {
×
637
                                if pn.Spec.CustomInterfaces[i].Interface == s {
×
638
                                        index = &i
×
639
                                        break
×
640
                                }
641
                        }
642
                        if index != nil {
×
643
                                pn.Spec.CustomInterfaces[*index].Nodes = append(pn.Spec.CustomInterfaces[*index].Nodes, node.Name)
×
644
                        } else {
×
645
                                ci := kubeovnv1.CustomInterface{Interface: s, Nodes: []string{node.Name}}
×
646
                                pn.Spec.CustomInterfaces = append(pn.Spec.CustomInterfaces, ci)
×
647
                        }
×
648
                        patchNodes = append(patchNodes, node.Name)
×
649
                }
650
        }
651

652
        defer func() {
×
653
                if err != nil {
×
654
                        return
×
655
                }
×
656

657
                // update nodes only when provider network has been created successfully
658
                patch := util.KVPatch{excludeAnno: nil, interfaceAnno: nil}
×
659
                for _, node := range patchNodes {
×
660
                        if err := util.PatchAnnotations(c.config.KubeClient.CoreV1().Nodes(), node, patch); err != nil {
×
661
                                klog.Errorf("failed to patch node %s: %v", node, err)
×
662
                        }
×
663
                }
664
        }()
665

666
        _, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().Create(context.Background(), &pn, metav1.CreateOptions{})
×
667
        if err != nil {
×
668
                klog.Errorf("failed to create provider network %s: %v", c.config.DefaultProviderName, err)
×
669
                return err
×
670
        }
×
671
        return nil
×
672
}
673

674
func (c *Controller) initDefaultVlan() error {
×
675
        if c.config.NetworkType != util.NetworkTypeVlan {
×
676
                return nil
×
677
        }
×
678

679
        if err := c.initDefaultProviderNetwork(); err != nil {
×
680
                klog.Error(err)
×
681
                return err
×
682
        }
×
683

684
        _, err := c.vlansLister.Get(c.config.DefaultVlanName)
×
685
        if err == nil {
×
686
                return nil
×
687
        }
×
688

689
        if !k8serrors.IsNotFound(err) {
×
690
                klog.Errorf("get default vlan %s failed: %v", c.config.DefaultVlanName, err)
×
691
                return err
×
692
        }
×
693

694
        if c.config.DefaultVlanID < 0 || c.config.DefaultVlanID > 4095 {
×
695
                return errors.New("the default vlan id is not between 1-4095")
×
696
        }
×
697

698
        defaultVlan := kubeovnv1.Vlan{
×
699
                ObjectMeta: metav1.ObjectMeta{Name: c.config.DefaultVlanName},
×
700
                Spec: kubeovnv1.VlanSpec{
×
701
                        ID:       c.config.DefaultVlanID,
×
702
                        Provider: c.config.DefaultProviderName,
×
703
                },
×
704
        }
×
705

×
706
        _, err = c.config.KubeOvnClient.KubeovnV1().Vlans().Create(context.Background(), &defaultVlan, metav1.CreateOptions{})
×
707
        if err != nil {
×
708
                klog.Errorf("failed to create vlan %s: %v", defaultVlan.Name, err)
×
709
                return err
×
710
        }
×
711
        return nil
×
712
}
713

714
func (c *Controller) syncIPCR() error {
×
715
        klog.Info("start to sync ips")
×
716
        ips, err := c.ipsLister.List(labels.Everything())
×
717
        if err != nil {
×
718
                if k8serrors.IsNotFound(err) {
×
719
                        return nil
×
720
                }
×
721
                klog.Error(err)
×
722
                return err
×
723
        }
724

725
        vmLsps, err := c.getVMLsps()
×
726
        if err != nil {
×
727
                klog.Errorf("failed to get vm lsps, %v", err)
×
728
                return err
×
729
        }
×
730
        ipMap := strset.New(vmLsps...)
×
UNCOV
731
        for _, ip := range ips {
×
732
                if !ip.DeletionTimestamp.IsZero() {
×
733
                        klog.Infof("enqueue update for removing finalizer to delete ip %s", ip.Name)
×
734
                        c.updateIPQueue.Add(ip.Name)
×
735
                        continue
×
736
                }
737
                changed := false
×
UNCOV
738
                ip = ip.DeepCopy()
×
739
                if ipMap.Has(ip.Name) && ip.Spec.PodType == "" {
×
740
                        ip.Spec.PodType = util.KindVirtualMachine
×
741
                        changed = true
×
UNCOV
742
                }
×
743

744
                v4IP, v6IP := util.SplitStringIP(ip.Spec.IPAddress)
×
745
                if ip.Spec.V4IPAddress == v4IP && ip.Spec.V6IPAddress == v6IP && !changed {
×
746
                        continue
×
747
                }
748

749
                ip.Spec.V4IPAddress = v4IP
×
750
                ip.Spec.V6IPAddress = v6IP
×
UNCOV
751
                _, err := c.config.KubeOvnClient.KubeovnV1().IPs().Update(context.Background(), ip, metav1.UpdateOptions{})
×
752
                if err != nil {
×
UNCOV
753
                        klog.Errorf("failed to sync crd ip %s: %v", ip.Spec.IPAddress, err)
×
UNCOV
754
                        return err
×
755
                }
×
756
        }
757
        return nil
×
758
}
759

760
func (c *Controller) syncSubnetCR() error {
×
761
        klog.Info("start to sync subnets")
×
762
        subnets, err := c.subnetsLister.List(labels.Everything())
×
763
        if err != nil {
×
UNCOV
764
                if k8serrors.IsNotFound(err) {
×
765
                        return nil
×
766
                }
×
767
                klog.Error(err)
×
768
                return err
×
769
        }
UNCOV
770
        for _, cachedSubnet := range subnets {
×
771
                subnet := cachedSubnet.DeepCopy()
×
772
                if !subnet.Status.IsReady() {
×
773
                        klog.Warningf("subnet %s is not ready", subnet.Name)
×
774
                        continue
×
775
                }
UNCOV
776
                subnet, err = c.calcSubnetStatusIP(subnet)
×
UNCOV
777
                if err != nil {
×
778
                        klog.Errorf("failed to calculate subnet %s used ip: %v", cachedSubnet.Name, err)
×
779
                        return err
×
780
                }
×
781

782
                // only sync subnet spec enableEcmp when subnet.Spec.EnableEcmp is false and c.config.EnableEcmp is true
783
                if subnet.Spec.GatewayType == kubeovnv1.GWCentralizedType && !subnet.Spec.EnableEcmp && subnet.Spec.EnableEcmp != c.config.EnableEcmp {
×
UNCOV
784
                        subnet, err = c.subnetsLister.Get(subnet.Name)
×
785
                        if err != nil {
×
786
                                klog.Errorf("failed to get subnet %s: %v", subnet.Name, err)
×
787
                                return err
×
788
                        }
×
789

UNCOV
790
                        subnet.Spec.EnableEcmp = c.config.EnableEcmp
×
UNCOV
791
                        if _, err := c.config.KubeOvnClient.KubeovnV1().Subnets().Update(context.Background(), subnet, metav1.UpdateOptions{}); err != nil {
×
792
                                klog.Errorf("failed to sync subnet spec enableEcmp with kube-ovn-controller config enableEcmp %s: %v", subnet.Name, err)
×
UNCOV
793
                                return err
×
UNCOV
794
                        }
×
795
                }
796
        }
797
        return nil
×
798
}
799

800
func (c *Controller) syncVpcNatGatewayCR() error {
×
801
        klog.Info("start to sync crd vpc nat gw")
×
802
        gws, err := c.vpcNatGatewayLister.List(labels.Everything())
×
803
        if err != nil {
×
804
                klog.Errorf("failed to list vpc nat gateway, %v", err)
×
UNCOV
805
                return err
×
806
        }
×
807
        if len(gws) == 0 {
×
808
                return nil
×
809
        }
×
810
        // get vpc nat gateway enable state
811
        cm, err := c.configMapsLister.ConfigMaps(c.config.PodNamespace).Get(util.VpcNatGatewayConfig)
×
812
        if err != nil && !k8serrors.IsNotFound(err) {
×
813
                klog.Errorf("failed to get config map %s, %v", util.VpcNatGatewayConfig, err)
×
UNCOV
814
                return err
×
815
        }
×
816
        if k8serrors.IsNotFound(err) || cm.Data["enable-vpc-nat-gw"] == "false" {
×
817
                return nil
×
818
        }
×
819
        // get vpc nat gateway image
820
        cm, err = c.configMapsLister.ConfigMaps(c.config.PodNamespace).Get(util.VpcNatConfig)
×
821
        if err != nil {
×
822
                if k8serrors.IsNotFound(err) {
×
UNCOV
823
                        klog.Errorf("should set config map for vpc-nat-gateway %s, %v", util.VpcNatConfig, err)
×
UNCOV
824
                        return err
×
825
                }
×
826
                klog.Errorf("failed to get config map %s, %v", util.VpcNatConfig, err)
×
827
                return err
×
828
        }
829

UNCOV
830
        if cm.Data["image"] == "" {
×
831
                err = errors.New("should set image for vpc-nat-gateway pod")
×
832
                klog.Error(err)
×
833
                return err
×
834
        }
×
835

UNCOV
836
        for _, gw := range gws {
×
837
                if err := c.updateCrdNatGwLabels(gw.Name, ""); err != nil {
×
UNCOV
838
                        klog.Errorf("failed to update nat gw %s: %v", gw.Name, err)
×
UNCOV
839
                        return err
×
840
                }
×
841
        }
842
        return nil
×
843
}
844

845
func (c *Controller) syncVlanCR() error {
×
846
        klog.Info("start to sync vlans")
×
847
        vlans, err := c.vlansLister.List(labels.Everything())
×
848
        if err != nil {
×
UNCOV
849
                if k8serrors.IsNotFound(err) {
×
UNCOV
850
                        return nil
×
851
                }
×
852
                klog.Error(err)
×
853
                return err
×
854
        }
855

856
        for _, vlan := range vlans {
×
857
                var needUpdate bool
×
858
                newVlan := vlan.DeepCopy()
×
UNCOV
859
                if newVlan.Spec.VlanID != 0 && newVlan.Spec.ID == 0 {
×
860
                        newVlan.Spec.ID = newVlan.Spec.VlanID
×
861
                        newVlan.Spec.VlanID = 0
×
862
                        needUpdate = true
×
863
                }
×
864
                //nolint:staticcheck // Ignore SA1019 for backward compatibility of deprecated field ProviderInterfaceName
865
                if newVlan.Spec.ProviderInterfaceName != "" && newVlan.Spec.Provider == "" {
×
866
                        //nolint:staticcheck // Ignore SA1019
×
867
                        newVlan.Spec.Provider = newVlan.Spec.ProviderInterfaceName
×
868
                        //nolint:staticcheck // Ignore SA1019
×
869
                        newVlan.Spec.ProviderInterfaceName = ""
×
870
                        needUpdate = true
×
871
                }
×
UNCOV
872
                if needUpdate {
×
UNCOV
873
                        if _, err = c.config.KubeOvnClient.KubeovnV1().Vlans().Update(context.Background(), newVlan, metav1.UpdateOptions{}); err != nil {
×
UNCOV
874
                                klog.Errorf("failed to update spec of vlan %s: %v", newVlan.Name, err)
×
875
                                return err
×
UNCOV
876
                        }
×
877
                }
878
        }
879

880
        return nil
×
881
}
882

883
func (c *Controller) batchMigrateNodeRoute(nodes []*v1.Node) error {
×
884
        start := time.Now()
×
885
        addPolicies := make([]*kubeovnv1.PolicyRoute, 0)
×
886
        delPolicies := make([]*kubeovnv1.PolicyRoute, 0)
×
887
        staticRoutes := make([]*kubeovnv1.StaticRoute, 0)
×
UNCOV
888
        externalIDsMap := make(map[string]map[string]string)
×
889
        delAsNames := make([]string, 0)
×
890
        for _, node := range nodes {
×
891
                if node.Annotations[util.AllocatedAnnotation] != "true" {
×
892
                        continue
×
893
                }
894
                nodeName := node.Name
×
895
                nodeIPv4, nodeIPv6 := util.GetNodeInternalIP(*node)
×
896
                joinAddrV4, joinAddrV6 := util.SplitStringIP(node.Annotations[util.IPAddressAnnotation])
×
897
                if nodeIPv4 != "" && joinAddrV4 != "" {
×
UNCOV
898
                        buildNodeRoute(4, nodeName, joinAddrV4, nodeIPv4, &addPolicies, &delPolicies, &staticRoutes, externalIDsMap, &delAsNames)
×
UNCOV
899
                }
×
900
                if nodeIPv6 != "" && joinAddrV6 != "" {
×
901
                        buildNodeRoute(6, nodeName, joinAddrV6, nodeIPv6, &addPolicies, &delPolicies, &staticRoutes, externalIDsMap, &delAsNames)
×
902
                }
×
903
        }
904

905
        if err := c.batchAddPolicyRouteToVpc(c.config.ClusterRouter, addPolicies, externalIDsMap); err != nil {
×
906
                klog.Errorf("failed to batch add logical router policy for lr %s nodes %d: %v", c.config.ClusterRouter, len(nodes), err)
×
907
                return err
×
908
        }
×
909
        if err := c.batchDeleteStaticRouteFromVpc(c.config.ClusterRouter, staticRoutes); err != nil {
×
910
                klog.Errorf("failed to batch delete  obsolete logical router static route for lr %s nodes %d: %v", c.config.ClusterRouter, len(nodes), err)
×
911
                return err
×
912
        }
×
913
        if err := c.batchDeletePolicyRouteFromVpc(c.config.ClusterRouter, delPolicies); err != nil {
×
914
                klog.Errorf("failed to batch delete obsolete logical router policy for lr %s nodes %d: %v", c.config.ClusterRouter, len(nodes), err)
×
915
                return err
×
916
        }
×
917
        if err := c.OVNNbClient.BatchDeleteAddressSetByNames(delAsNames); err != nil {
×
918
                klog.Errorf("failed to batch delete obsolete address set for asNames %v nodes %d: %v", delAsNames, len(nodes), err)
×
919
                return err
×
UNCOV
920
        }
×
UNCOV
921
        klog.V(3).Infof("take to %v batch migrate node route for router: %s priority: %d add policy len: %d externalID len: %d del policy len: %d del address set len: %d",
×
922
                time.Since(start), c.config.ClusterRouter, util.NodeRouterPolicyPriority, len(addPolicies), len(externalIDsMap), len(delPolicies), len(delAsNames))
×
923

×
924
        return nil
×
925
}
926

927
func buildNodeRoute(af int, nodeName, nexthop, ip string, addPolicies, delPolicies *[]*kubeovnv1.PolicyRoute, staticRoutes *[]*kubeovnv1.StaticRoute, externalIDsMap map[string]map[string]string, delAsNames *[]string) {
×
928
        var (
×
929
                match       = fmt.Sprintf("ip%d.dst == %s", af, ip)
×
930
                action      = kubeovnv1.PolicyRouteActionReroute
×
931
                externalIDs = map[string]string{
×
932
                        "vendor": util.CniTypeName,
×
933
                        "node":   nodeName,
×
934
                }
×
935
        )
×
936
        *addPolicies = append(*addPolicies, &kubeovnv1.PolicyRoute{
×
937
                Priority:  util.NodeRouterPolicyPriority,
×
938
                Match:     match,
×
939
                Action:    action,
×
940
                NextHopIP: nexthop,
×
941
        })
×
942
        externalIDsMap[buildExternalIDsMapKey(match, string(action), util.NodeRouterPolicyPriority)] = externalIDs
×
943
        *staticRoutes = append(*staticRoutes, &kubeovnv1.StaticRoute{
×
944
                Policy:     kubeovnv1.PolicyDst,
×
945
                RouteTable: util.MainRouteTable,
×
946
                NextHopIP:  "",
×
947
                CIDR:       ip,
×
948
        })
×
949
        asName := nodeUnderlayAddressSetName(nodeName, af)
×
950
        obsoleteMatch := fmt.Sprintf("ip%d.dst == %s && ip%d.src != $%s", af, ip, af, asName)
×
951
        *delPolicies = append(*delPolicies, &kubeovnv1.PolicyRoute{
×
UNCOV
952
                Match:    obsoleteMatch,
×
953
                Priority: util.NodeRouterPolicyPriority,
×
954
        })
×
955
        *delAsNames = append(*delAsNames, asName)
×
956
}
×
957

958
func (c *Controller) syncNodeRoutes() error {
×
UNCOV
959
        nodes, err := c.nodesLister.List(labels.Everything())
×
960
        if err != nil {
×
961
                klog.Errorf("failed to list nodes: %v", err)
×
962
                return err
×
963
        }
×
964

965
        if err := c.batchMigrateNodeRoute(nodes); err != nil {
×
UNCOV
966
                klog.Errorf("failed to batch migrate node routes: %v", err)
×
UNCOV
967
                return err
×
968
        }
×
969

970
        return nil
×
971
}
972

973
func (c *Controller) initNodeChassis() error {
×
974
        nodes, err := c.nodesLister.List(labels.Everything())
×
975
        if err != nil {
×
976
                klog.Errorf("failed to list nodes: %v", err)
×
977
                return err
×
978
        }
×
979
        chassises, err := c.OVNSbClient.GetKubeOvnChassises()
×
980
        if err != nil {
×
981
                klog.Errorf("failed to get chassis nodes: %v", err)
×
982
                return err
×
983
        }
×
984
        chassisNodes := make(map[string]string, len(*chassises))
×
985
        for _, chassis := range *chassises {
×
986
                chassisNodes[chassis.Name] = chassis.Hostname
×
987
        }
×
988
        for _, node := range nodes {
×
UNCOV
989
                if err := c.UpdateChassisTag(node); err != nil {
×
UNCOV
990
                        klog.Error(err)
×
991
                        if _, ok := err.(*ErrChassisNotFound); !ok {
×
UNCOV
992
                                return err
×
UNCOV
993
                        }
×
994
                }
995
        }
996
        return nil
×
997
}
998

UNCOV
999
func migrateFinalizers(c client.Client, list client.ObjectList, getObjectItem func(int) (client.Object, client.Object)) error {
×
1000
        if err := c.List(context.Background(), list); err != nil {
×
1001
                klog.Errorf("failed to list objects: %v", err)
×
1002
                return err
×
1003
        }
×
1004

UNCOV
1005
        var i int
×
1006
        var cachedObj, patchedObj client.Object
×
1007
        for {
×
1008
                if cachedObj, patchedObj = getObjectItem(i); cachedObj == nil {
×
UNCOV
1009
                        break
×
1010
                }
1011
                if !controllerutil.ContainsFinalizer(cachedObj, util.DeprecatedFinalizerName) {
×
1012
                        i++
×
1013
                        continue
×
1014
                }
1015
                controllerutil.RemoveFinalizer(patchedObj, util.DeprecatedFinalizerName)
×
1016
                if cachedObj.GetDeletionTimestamp() == nil {
×
1017
                        // if the object is not being deleted, add the new finalizer
×
1018
                        controllerutil.AddFinalizer(patchedObj, util.KubeOVNControllerFinalizer)
×
1019
                }
×
1020
                if err := c.Patch(context.Background(), patchedObj, client.MergeFrom(cachedObj)); client.IgnoreNotFound(err) != nil {
×
1021
                        klog.Errorf("failed to sync finalizers for %s %s: %v",
×
UNCOV
1022
                                patchedObj.GetObjectKind().GroupVersionKind().Kind,
×
UNCOV
1023
                                cache.MetaObjectToName(patchedObj), err)
×
1024
                        return err
×
UNCOV
1025
                }
×
UNCOV
1026
                i++
×
1027
        }
1028

1029
        return nil
×
1030
}
1031

1032
func (c *Controller) syncFinalizers() error {
×
UNCOV
1033
        cl, err := client.New(config.GetConfigOrDie(), client.Options{})
×
UNCOV
1034
        if err != nil {
×
1035
                klog.Errorf("failed to create client: %v", err)
×
1036
                return err
×
1037
        }
×
1038

1039
        // migrate deprecated finalizer to new finalizer
1040
        klog.Info("start to sync finalizers")
×
1041
        if err := c.syncIPFinalizer(cl); err != nil {
×
1042
                klog.Errorf("failed to sync ip finalizer: %v", err)
×
1043
                return err
×
1044
        }
×
1045
        if err := c.syncIPPoolFinalizer(cl); err != nil {
×
1046
                klog.Errorf("failed to sync ippool finalizer: %v", err)
×
1047
                return err
×
1048
        }
×
1049
        if err := c.syncOvnDnatFinalizer(cl); err != nil {
×
1050
                klog.Errorf("failed to sync ovn dnat finalizer: %v", err)
×
1051
                return err
×
1052
        }
×
1053
        if err := c.syncOvnEipFinalizer(cl); err != nil {
×
1054
                klog.Errorf("failed to sync ovn eip finalizer: %v", err)
×
1055
                return err
×
1056
        }
×
1057
        if err := c.syncOvnFipFinalizer(cl); err != nil {
×
1058
                klog.Errorf("failed to sync ovn fip finalizer: %v", err)
×
1059
                return err
×
1060
        }
×
1061
        if err := c.syncOvnSnatFinalizer(cl); err != nil {
×
1062
                klog.Errorf("failed to sync ovn snat finalizer: %v", err)
×
1063
                return err
×
1064
        }
×
1065
        if err := c.syncQoSPolicyFinalizer(cl); err != nil {
×
1066
                klog.Errorf("failed to sync qos policy finalizer: %v", err)
×
1067
                return err
×
1068
        }
×
1069
        if err := c.syncSubnetFinalizer(cl); err != nil {
×
1070
                klog.Errorf("failed to sync subnet finalizer: %v", err)
×
1071
                return err
×
1072
        }
×
1073
        if err := c.syncVipFinalizer(cl); err != nil {
×
1074
                klog.Errorf("failed to sync vip finalizer: %v", err)
×
1075
                return err
×
1076
        }
×
1077
        if err := c.syncIptablesEipFinalizer(cl); err != nil {
×
1078
                klog.Errorf("failed to sync iptables eip finalizer: %v", err)
×
1079
                return err
×
1080
        }
×
1081
        if err := c.syncIptablesFipFinalizer(cl); err != nil {
×
1082
                klog.Errorf("failed to sync iptables fip finalizer: %v", err)
×
1083
                return err
×
1084
        }
×
1085
        if err := c.syncIptablesDnatFinalizer(cl); err != nil {
×
1086
                klog.Errorf("failed to sync iptables dnat finalizer: %v", err)
×
1087
                return err
×
1088
        }
×
1089
        if err := c.syncIptablesSnatFinalizer(cl); err != nil {
×
UNCOV
1090
                klog.Errorf("failed to sync iptables snat finalizer: %v", err)
×
UNCOV
1091
                return err
×
UNCOV
1092
        }
×
UNCOV
1093
        klog.Info("sync finalizers done")
×
UNCOV
1094
        return nil
×
1095
}
1096

1097
func hasAllocatedAnnotation(pod *v1.Pod) bool {
1✔
1098
        for key, value := range pod.Annotations {
2✔
1099
                if value == "true" && strings.HasSuffix(key, util.AllocatedAnnotationSuffix) {
2✔
1100
                        return true
1✔
1101
                }
1✔
1102
        }
1103
        return false
1✔
1104
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc