• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 21793992783

08 Feb 2026 06:56AM UTC coverage: 22.895% (+0.008%) from 22.887%
21793992783

push

github

web-flow
fix: go modernize doesn't work (#6262)

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>

1 of 3 new or added lines in 3 files covered. (33.33%)

1 existing line in 1 file now uncovered.

12438 of 54327 relevant lines covered (22.89%)

0.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.45
/pkg/controller/vpc_nat_gateway.go
1
package controller
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "errors"
7
        "fmt"
8
        "maps"
9
        "os"
10
        "reflect"
11
        "regexp"
12
        "slices"
13
        "strings"
14
        "time"
15

16
        nadv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
17
        v1 "k8s.io/api/apps/v1"
18
        corev1 "k8s.io/api/core/v1"
19
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
20
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21
        "k8s.io/apimachinery/pkg/labels"
22
        "k8s.io/apimachinery/pkg/types"
23
        "k8s.io/client-go/tools/cache"
24
        "k8s.io/klog/v2"
25
        "k8s.io/utils/ptr"
26

27
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
28
        "github.com/kubeovn/kube-ovn/pkg/request"
29
        "github.com/kubeovn/kube-ovn/pkg/util"
30
)
31

32
var (
33
        vpcNatEnabled   = "unknown"
34
        VpcNatCmVersion = ""
35
        natGwCreatedAT  = ""
36
)
37

38
const (
39
        natGwInit             = "init"
40
        natGwEipAdd           = "eip-add"
41
        natGwEipDel           = "eip-del"
42
        natGwDnatAdd          = "dnat-add"
43
        natGwDnatDel          = "dnat-del"
44
        natGwSnatAdd          = "snat-add"
45
        natGwSnatDel          = "snat-del"
46
        natGwEipIngressQoSAdd = "eip-ingress-qos-add"
47
        natGwEipIngressQoSDel = "eip-ingress-qos-del"
48
        QoSAdd                = "qos-add"
49
        QoSDel                = "qos-del"
50
        natGwEipEgressQoSAdd  = "eip-egress-qos-add"
51
        natGwEipEgressQoSDel  = "eip-egress-qos-del"
52
        natGwSubnetFipAdd     = "floating-ip-add"
53
        natGwSubnetFipDel     = "floating-ip-del"
54
        natGwSubnetRouteAdd   = "subnet-route-add"
55
        natGwSubnetRouteDel   = "subnet-route-del"
56

57
        getIptablesVersion = "get-iptables-version"
58
)
59

60
func (c *Controller) resyncVpcNatGwConfig() {
×
61
        cm, err := c.configMapsLister.ConfigMaps(c.config.PodNamespace).Get(util.VpcNatGatewayConfig)
×
62
        if err != nil && !k8serrors.IsNotFound(err) {
×
63
                klog.Errorf("failed to get ovn-vpc-nat-gw-config, %v", err)
×
64
                return
×
65
        }
×
66

67
        if k8serrors.IsNotFound(err) || cm.Data["enable-vpc-nat-gw"] == "false" {
×
68
                if vpcNatEnabled == "false" {
×
69
                        return
×
70
                }
×
71
                klog.Info("start to clean up vpc nat gateway")
×
72
                if err := c.cleanUpVpcNatGw(); err != nil {
×
73
                        klog.Errorf("failed to clean up vpc nat gateway, %v", err)
×
74
                        return
×
75
                }
×
76
                vpcNatEnabled = "false"
×
77
                VpcNatCmVersion = ""
×
78
                klog.Info("finish clean up vpc nat gateway")
×
79
                return
×
80
        }
81
        if vpcNatEnabled == "true" && VpcNatCmVersion == cm.ResourceVersion {
×
82
                return
×
83
        }
×
84
        gws, err := c.vpcNatGatewayLister.List(labels.Everything())
×
85
        if err != nil {
×
86
                klog.Errorf("failed to get vpc nat gateway, %v", err)
×
87
                return
×
88
        }
×
89
        vpcNatEnabled = "true"
×
90
        VpcNatCmVersion = cm.ResourceVersion
×
91
        for _, gw := range gws {
×
92
                c.addOrUpdateVpcNatGatewayQueue.Add(gw.Name)
×
93
        }
×
94
        klog.Info("finish establishing vpc-nat-gateway")
×
95
}
96

97
func (c *Controller) enqueueAddVpcNatGw(obj any) {
×
98
        key := cache.MetaObjectToName(obj.(*kubeovnv1.VpcNatGateway)).String()
×
99
        klog.V(3).Infof("enqueue add vpc-nat-gw %s", key)
×
100
        c.addOrUpdateVpcNatGatewayQueue.Add(key)
×
101
}
×
102

103
func (c *Controller) enqueueUpdateVpcNatGw(_, newObj any) {
×
104
        key := cache.MetaObjectToName(newObj.(*kubeovnv1.VpcNatGateway)).String()
×
105
        klog.V(3).Infof("enqueue update vpc-nat-gw %s", key)
×
106
        c.addOrUpdateVpcNatGatewayQueue.Add(key)
×
107
}
×
108

109
func (c *Controller) enqueueDeleteVpcNatGw(obj any) {
×
110
        var gw *kubeovnv1.VpcNatGateway
×
111
        switch t := obj.(type) {
×
112
        case *kubeovnv1.VpcNatGateway:
×
113
                gw = t
×
114
        case cache.DeletedFinalStateUnknown:
×
115
                g, ok := t.Obj.(*kubeovnv1.VpcNatGateway)
×
116
                if !ok {
×
117
                        klog.Warningf("unexpected object type: %T", t.Obj)
×
118
                        return
×
119
                }
×
120
                gw = g
×
121
        default:
×
122
                klog.Warningf("unexpected type: %T", obj)
×
123
                return
×
124
        }
125

126
        key := cache.MetaObjectToName(gw).String()
×
127
        klog.V(3).Infof("enqueue del vpc-nat-gw %s", key)
×
128
        c.delVpcNatGatewayQueue.Add(key)
×
129

×
130
        // Trigger QoS Policy reconcile after NatGw is deleted
×
131
        // This allows the QoS Policy to remove its finalizer if no other NatGws are using it
×
132
        if gw.Status.QoSPolicy != "" {
×
133
                c.updateQoSPolicyQueue.Add(gw.Status.QoSPolicy)
×
134
        }
×
135
}
136

137
func (c *Controller) handleDelVpcNatGw(key string) error {
×
138
        c.vpcNatGwKeyMutex.LockKey(key)
×
139
        defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }()
×
140

141
        name := util.GenNatGwName(key)
×
142
        klog.Infof("delete vpc nat gw %s", name)
×
143
        if err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace).Delete(context.Background(),
×
144
                name, metav1.DeleteOptions{}); err != nil {
×
145
                if k8serrors.IsNotFound(err) {
×
146
                        return nil
×
147
                }
×
148
                klog.Error(err)
×
149
                return err
×
150
        }
151
        return nil
×
152
}
153

154
// isVpcNatGwChanged checks if VpcNatGateway spec fields have changed compared to status.
155
// Note: User-defined annotations (gw.Spec.Annotations) are NOT checked here because
156
// updating StatefulSet Pod template annotations would trigger Pod recreation.
157
// TODO: support hot update of runtime Pod annotations directly via patch
158
func isVpcNatGwChanged(gw *kubeovnv1.VpcNatGateway) bool {
1✔
159
        if !slices.Equal(gw.Spec.ExternalSubnets, gw.Status.ExternalSubnets) {
2✔
160
                return true
1✔
161
        }
1✔
162
        if !slices.Equal(gw.Spec.Selector, gw.Status.Selector) {
2✔
163
                return true
1✔
164
        }
1✔
165
        if !reflect.DeepEqual(gw.Spec.Tolerations, gw.Status.Tolerations) {
2✔
166
                return true
1✔
167
        }
1✔
168
        if !reflect.DeepEqual(gw.Spec.Affinity, gw.Status.Affinity) {
2✔
169
                return true
1✔
170
        }
1✔
171
        return false
1✔
172
}
173

174
func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error {
×
175
        gw, err := c.vpcNatGatewayLister.Get(key)
×
176
        if err != nil {
×
177
                if k8serrors.IsNotFound(err) {
×
178
                        return nil
×
179
                }
×
180
                klog.Error(err)
×
181
                return err
×
182
        }
183

184
        // create nat gw statefulset
185
        c.vpcNatGwKeyMutex.LockKey(key)
×
186
        defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }()
×
187
        klog.Infof("handle add/update vpc nat gateway %s", key)
×
188

×
189
        if vpcNatEnabled != "true" {
×
190
                return errors.New("iptables nat gw not enable")
×
191
        }
×
192

193
        if _, err := c.vpcsLister.Get(gw.Spec.Vpc); err != nil {
×
194
                err = fmt.Errorf("failed to get vpc '%s', err: %w", gw.Spec.Vpc, err)
×
195
                klog.Error(err)
×
196
                return err
×
197
        }
×
198
        if _, err := c.subnetsLister.Get(gw.Spec.Subnet); err != nil {
×
199
                err = fmt.Errorf("failed to get subnet '%s', err: %w", gw.Spec.Subnet, err)
×
200
                klog.Error(err)
×
201
                return err
×
202
        }
×
203

204
        var natGwPodContainerRestartCount int32
×
205
        pod, err := c.getNatGwPod(key)
×
206
        if err == nil {
×
207
                for _, containerStatus := range pod.Status.ContainerStatuses {
×
208
                        if containerStatus.Name == "vpc-nat-gw" {
×
209
                                natGwPodContainerRestartCount = containerStatus.RestartCount
×
210
                                break
×
211
                        }
212
                }
213
        }
214
        needRestartRecovery := natGwPodContainerRestartCount > 0
×
215

×
216
        // check or create statefulset
×
217
        needToCreate := false
×
218
        oldSts, err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace).
×
219
                Get(context.Background(), util.GenNatGwName(gw.Name), metav1.GetOptions{})
×
220
        if err != nil {
×
221
                if !k8serrors.IsNotFound(err) {
×
222
                        klog.Error(err)
×
223
                        return err
×
224
                }
×
225
                needToCreate, oldSts = true, nil
×
226
        }
227
        gwChanged := isVpcNatGwChanged(gw)
×
228

×
229
        newSts, err := c.genNatGwStatefulSet(gw, oldSts, natGwPodContainerRestartCount)
×
230
        if err != nil {
×
231
                klog.Error(err)
×
232
                return err
×
233
        }
×
234

235
        // Handle StatefulSet creation (early return - QoS will be handled in init flow)
236
        if needToCreate {
×
237
                if _, err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace).
×
238
                        Create(context.Background(), newSts, metav1.CreateOptions{}); err != nil {
×
239
                        err := fmt.Errorf("failed to create statefulset '%s', err: %w", newSts.Name, err)
×
240
                        klog.Error(err)
×
241
                        return err
×
242
                }
×
243
                if err = c.patchNatGwStatus(key); err != nil {
×
244
                        klog.Errorf("failed to patch nat gw sts status for nat gw %s, %v", key, err)
×
245
                        return err
×
246
                }
×
247
                return nil
×
248
        }
249

250
        // Handle StatefulSet update if needed
251
        // WARNING: This will update STS template directly, which triggers NAT GW Pod recreation.
252
        // TODO: support hot update of runtime Pod annotations directly via patch
253
        if gwChanged || needRestartRecovery {
×
254
                if _, err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace).
×
255
                        Update(context.Background(), newSts, metav1.UpdateOptions{}); err != nil {
×
256
                        err := fmt.Errorf("failed to update statefulset '%s', err: %w", newSts.Name, err)
×
257
                        klog.Error(err)
×
258
                        return err
×
259
                }
×
260
                if err = c.patchNatGwStatus(key); err != nil {
×
261
                        klog.Errorf("failed to patch nat gw sts status for nat gw %s, %v", key, err)
×
262
                        return err
×
263
                }
×
264
        }
265

266
        // Handle QoS update (independent of StatefulSet changes)
267
        if gw.Spec.QoSPolicy != gw.Status.QoSPolicy {
×
268
                if gw.Status.QoSPolicy != "" {
×
269
                        if err = c.execNatGwQoS(gw, gw.Status.QoSPolicy, QoSDel); err != nil {
×
270
                                klog.Errorf("failed to del qos for nat gw %s, %v", key, err)
×
271
                                return err
×
272
                        }
×
273
                }
274
                if gw.Spec.QoSPolicy != "" {
×
275
                        if err = c.execNatGwQoS(gw, gw.Spec.QoSPolicy, QoSAdd); err != nil {
×
276
                                klog.Errorf("failed to add qos for nat gw %s, %v", key, err)
×
277
                                return err
×
278
                        }
×
279
                }
280
                if err := c.updateCrdNatGwLabels(key, gw.Spec.QoSPolicy); err != nil {
×
281
                        err := fmt.Errorf("failed to update nat gw %s: %w", gw.Name, err)
×
282
                        klog.Error(err)
×
283
                        return err
×
284
                }
×
285
                if err = c.patchNatGwQoSStatus(key, gw.Spec.QoSPolicy); err != nil {
×
286
                        klog.Errorf("failed to patch nat gw qos status for nat gw %s, %v", key, err)
×
287
                        return err
×
288
                }
×
289
        }
290

291
        return nil
×
292
}
293

294
func (c *Controller) handleInitVpcNatGw(key string) error {
×
295
        gw, err := c.vpcNatGatewayLister.Get(key)
×
296
        if err != nil {
×
297
                if k8serrors.IsNotFound(err) {
×
298
                        return nil
×
299
                }
×
300
                klog.Error(err)
×
301
                return err
×
302
        }
303

304
        if vpcNatEnabled != "true" {
×
305
                return errors.New("iptables nat gw not enable")
×
306
        }
×
307

308
        c.vpcNatGwKeyMutex.LockKey(key)
×
309
        defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }()
×
310
        klog.Infof("handle init vpc nat gateway %s", key)
×
311

×
312
        // subnet for vpc-nat-gw has been checked when create vpc-nat-gw
×
313

×
314
        pod, err := c.getNatGwPod(key)
×
315
        if err != nil {
×
316
                err := fmt.Errorf("failed to get nat gw %s pod: %w", gw.Name, err)
×
317
                klog.Error(err)
×
318
                return err
×
319
        }
×
320

321
        if pod.Status.Phase != corev1.PodRunning {
×
322
                time.Sleep(10 * time.Second)
×
323
                err = fmt.Errorf("failed to init vpc nat gateway %s, pod is not ready", key)
×
324
                klog.Error(err)
×
325
                return err
×
326
        }
×
327

328
        if _, hasInit := pod.Annotations[util.VpcNatGatewayInitAnnotation]; hasInit {
×
329
                return nil
×
330
        }
×
331
        natGwCreatedAT = pod.CreationTimestamp.Format("2006-01-02T15:04:05")
×
332
        klog.V(3).Infof("nat gw pod '%s' inited at %s", key, natGwCreatedAT)
×
333
        // During initialization, when KubeOVN is running on non primary cni mode, we need to ensure the NAT gateway interfaces
×
334
        // are properly configured. We extract the interfaces from the runtime Pod annotations (network-status).
×
335
        var interfaces []string
×
336
        if c.config.EnableNonPrimaryCNI {
×
337
                // extract external nad interface name
×
338
                externalNadNs, externalNadName, nadErr := c.getExternalSubnetNad(gw)
×
339
                if nadErr != nil {
×
340
                        klog.Errorf("failed to get external subnet NAD for gateway %s: %v", gw.Name, nadErr)
×
341
                        return nadErr
×
342
                }
×
343
                networkStatusAnnotations := pod.Annotations[nadv1.NetworkStatusAnnot]
×
344
                externalNadFullName := fmt.Sprintf("%s/%s", externalNadNs, externalNadName)
×
345
                externalNadIfName, err := util.GetNadInterfaceFromNetworkStatusAnnotation(networkStatusAnnotations, externalNadFullName)
×
346
                if err != nil {
×
347
                        klog.Errorf("failed to extract external nad interface name from runtime Pod annotation network-status, %v", err)
×
348
                        return err
×
349
                }
×
350
                // extract vpc nad interface name
351
                providers, err := c.getPodProviders(pod)
×
352
                if err != nil || len(providers) == 0 {
×
353
                        klog.Errorf("failed to get providers for pod %s/%s: %v", pod.Namespace, pod.Name, err)
×
354
                        return fmt.Errorf("failed to get providers for pod %s/%s: %w", pod.Namespace, pod.Name, err)
×
355
                }
×
356
                // if more than one provider exists, use the first one
357
                provider := providers[0]
×
358
                providerParts := strings.Split(provider, ".")
×
359
                if len(providerParts) < 2 {
×
360
                        klog.Errorf("failed to format provider %s for pod %s/%s", provider, pod.Namespace, pod.Name)
×
361
                        return fmt.Errorf("failed to format provider %s parts for pod %s/%s", provider, pod.Namespace, pod.Name)
×
362
                }
×
363
                vpcNadName, vpcNadNamespace := providerParts[0], providerParts[1]
×
364
                vpcNadFullName := fmt.Sprintf("%s/%s", vpcNadNamespace, vpcNadName)
×
365
                vpcNadIfName, err := util.GetNadInterfaceFromNetworkStatusAnnotation(networkStatusAnnotations, vpcNadFullName)
×
366
                if err != nil {
×
367
                        klog.Errorf("failed to extract internal nad interface name from runtime Pod annotation network-status, %v", err)
×
368
                        return err
×
369
                }
×
370

371
                klog.Infof("nat gw pod %s/%s internal nad interface %s, external nad interface %s", pod.Namespace, pod.Name, vpcNadIfName, externalNadIfName)
×
372
                interfaces = []string{
×
373
                        strings.Join([]string{vpcNadIfName, externalNadIfName}, ","),
×
374
                }
×
375
        }
376
        if err = c.execNatGwRules(pod, natGwInit, interfaces); err != nil {
×
377
                // Check if this is a transient initialization error (e.g., first attempt before iptables chains are created)
×
378
                // The init script may fail on first run but succeed on retry after chains are established
×
379
                klog.Warningf("vpc nat gateway %s init attempt failed (will retry): %v", key, err)
×
380
                return fmt.Errorf("failed to init vpc nat gateway, %w", err)
×
381
        }
×
382

383
        if gw.Spec.QoSPolicy != "" {
×
384
                if err = c.execNatGwQoS(gw, gw.Spec.QoSPolicy, QoSAdd); err != nil {
×
385
                        klog.Errorf("failed to add qos for nat gw %s, %v", key, err)
×
386
                        return err
×
387
                }
×
388
        }
389
        // if update qos success, will update nat gw status
390
        if gw.Spec.QoSPolicy != gw.Status.QoSPolicy {
×
391
                if err = c.patchNatGwQoSStatus(key, gw.Spec.QoSPolicy); err != nil {
×
392
                        klog.Errorf("failed to patch status for nat gw %s, %v", key, err)
×
393
                        return err
×
394
                }
×
395
        }
396

397
        if err := c.updateCrdNatGwLabels(gw.Name, gw.Spec.QoSPolicy); err != nil {
×
398
                err := fmt.Errorf("failed to update nat gw %s: %w", gw.Name, err)
×
399
                klog.Error(err)
×
400
                return err
×
401
        }
×
402

403
        c.updateVpcFloatingIPQueue.Add(key)
×
404
        c.updateVpcDnatQueue.Add(key)
×
405
        c.updateVpcSnatQueue.Add(key)
×
406
        c.updateVpcSubnetQueue.Add(key)
×
407
        c.updateVpcEipQueue.Add(key)
×
408

×
409
        patch := util.KVPatch{util.VpcNatGatewayInitAnnotation: "true"}
×
410
        if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(pod.Namespace), pod.Name, patch); err != nil {
×
411
                err := fmt.Errorf("failed to patch pod %s/%s: %w", pod.Namespace, pod.Name, err)
×
412
                klog.Error(err)
×
413
                return err
×
414
        }
×
415
        return nil
×
416
}
417

418
func (c *Controller) handleUpdateVpcFloatingIP(natGwKey string) error {
×
419
        if vpcNatEnabled != "true" {
×
420
                return errors.New("iptables nat gw not enable")
×
421
        }
×
422

423
        c.vpcNatGwKeyMutex.LockKey(natGwKey)
×
424
        defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(natGwKey) }()
×
425
        klog.Infof("handle update vpc fip %s", natGwKey)
×
426

×
427
        // refresh exist fips
×
428
        if err := c.initCreateAt(natGwKey); err != nil {
×
429
                err = fmt.Errorf("failed to init nat gw pod '%s' create at, %w", natGwKey, err)
×
430
                klog.Error(err)
×
431
                return err
×
432
        }
×
433

434
        fips, err := c.iptablesFipsLister.List(labels.SelectorFromSet(labels.Set{util.VpcNatGatewayNameLabel: natGwKey}))
×
435
        if err != nil {
×
436
                err := fmt.Errorf("failed to get all fips, %w", err)
×
437
                klog.Error(err)
×
438
                return err
×
439
        }
×
440

441
        for _, fip := range fips {
×
442
                if fip.Status.Redo != natGwCreatedAT {
×
443
                        klog.V(3).Infof("redo fip %s", fip.Name)
×
444
                        if err = c.redoFip(fip.Name, natGwCreatedAT, false); err != nil {
×
445
                                klog.Errorf("failed to update eip '%s' to re-apply, %v", fip.Spec.EIP, err)
×
446
                                return err
×
447
                        }
×
448
                }
449
        }
450
        return nil
×
451
}
452

453
func (c *Controller) handleUpdateVpcEip(natGwKey string) error {
×
454
        if vpcNatEnabled != "true" {
×
455
                return errors.New("iptables nat gw not enable")
×
456
        }
×
457

458
        c.vpcNatGwKeyMutex.LockKey(natGwKey)
×
459
        defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(natGwKey) }()
×
460
        klog.Infof("handle update vpc eip %s", natGwKey)
×
461

×
462
        // refresh exist fips
×
463
        if err := c.initCreateAt(natGwKey); err != nil {
×
464
                err = fmt.Errorf("failed to init nat gw pod '%s' create at, %w", natGwKey, err)
×
465
                klog.Error(err)
×
466
                return err
×
467
        }
×
468
        eips, err := c.iptablesEipsLister.List(labels.Everything())
×
469
        if err != nil {
×
470
                err = fmt.Errorf("failed to get eip list, %w", err)
×
471
                klog.Error(err)
×
472
                return err
×
473
        }
×
474
        for _, eip := range eips {
×
475
                if eip.Spec.NatGwDp == natGwKey && eip.Status.Redo != natGwCreatedAT {
×
476
                        klog.V(3).Infof("redo eip %s", eip.Name)
×
477
                        if err = c.patchEipStatus(eip.Name, "", natGwCreatedAT, "", false); err != nil {
×
478
                                klog.Errorf("failed to update eip '%s' to re-apply, %v", eip.Name, err)
×
479
                                return err
×
480
                        }
×
481
                }
482
        }
483
        return nil
×
484
}
485

486
func (c *Controller) handleUpdateVpcSnat(natGwKey string) error {
×
487
        if vpcNatEnabled != "true" {
×
488
                return errors.New("iptables nat gw not enable")
×
489
        }
×
490

491
        c.vpcNatGwKeyMutex.LockKey(natGwKey)
×
492
        defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(natGwKey) }()
×
493
        klog.Infof("handle update vpc snat %s", natGwKey)
×
494

×
495
        // refresh exist snats
×
496
        if err := c.initCreateAt(natGwKey); err != nil {
×
497
                err = fmt.Errorf("failed to init nat gw pod '%s' create at, %w", natGwKey, err)
×
498
                klog.Error(err)
×
499
                return err
×
500
        }
×
501
        snats, err := c.iptablesSnatRulesLister.List(labels.SelectorFromSet(labels.Set{util.VpcNatGatewayNameLabel: natGwKey}))
×
502
        if err != nil {
×
503
                err = fmt.Errorf("failed to get all snats, %w", err)
×
504
                klog.Error(err)
×
505
                return err
×
506
        }
×
507
        for _, snat := range snats {
×
508
                if snat.Status.Redo != natGwCreatedAT {
×
509
                        klog.V(3).Infof("redo snat %s", snat.Name)
×
510
                        if err = c.redoSnat(snat.Name, natGwCreatedAT, false); err != nil {
×
511
                                err = fmt.Errorf("failed to update eip '%s' to re-apply, %w", snat.Spec.EIP, err)
×
512
                                klog.Error(err)
×
513
                                return err
×
514
                        }
×
515
                }
516
        }
517
        return nil
×
518
}
519

520
func (c *Controller) handleUpdateVpcDnat(natGwKey string) error {
×
521
        if vpcNatEnabled != "true" {
×
522
                return errors.New("iptables nat gw not enable")
×
523
        }
×
524

525
        c.vpcNatGwKeyMutex.LockKey(natGwKey)
×
526
        defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(natGwKey) }()
×
527
        klog.Infof("handle update vpc dnat %s", natGwKey)
×
528

×
529
        // refresh exist dnats
×
530
        if err := c.initCreateAt(natGwKey); err != nil {
×
531
                err = fmt.Errorf("failed to init nat gw pod '%s' create at, %w", natGwKey, err)
×
532
                klog.Error(err)
×
533
                return err
×
534
        }
×
535

536
        dnats, err := c.iptablesDnatRulesLister.List(labels.SelectorFromSet(labels.Set{util.VpcNatGatewayNameLabel: natGwKey}))
×
537
        if err != nil {
×
538
                err = fmt.Errorf("failed to get all dnats, %w", err)
×
539
                klog.Error(err)
×
540
                return err
×
541
        }
×
542
        for _, dnat := range dnats {
×
543
                if dnat.Status.Redo != natGwCreatedAT {
×
544
                        klog.V(3).Infof("redo dnat %s", dnat.Name)
×
545
                        if err = c.redoDnat(dnat.Name, natGwCreatedAT, false); err != nil {
×
546
                                err := fmt.Errorf("failed to update dnat '%s' to redo, %w", dnat.Name, err)
×
547
                                klog.Error(err)
×
548
                                return err
×
549
                        }
×
550
                }
551
        }
552
        return nil
×
553
}
554

555
func (c *Controller) getIptablesVersion(pod *corev1.Pod) (version string, err error) {
×
556
        operation := getIptablesVersion
×
557
        cmd := "bash /kube-ovn/nat-gateway.sh " + operation
×
558
        klog.V(3).Info(cmd)
×
559
        stdOutput, errOutput, err := util.ExecuteCommandInContainer(c.config.KubeClient, c.config.KubeRestConfig, pod.Namespace, pod.Name, "vpc-nat-gw", []string{"/bin/bash", "-c", cmd}...)
×
560
        if err != nil {
×
561
                if len(errOutput) > 0 {
×
562
                        klog.Errorf("failed to ExecuteCommandInContainer, errOutput: %v", errOutput)
×
563
                }
×
564
                if len(stdOutput) > 0 {
×
565
                        klog.V(3).Infof("failed to ExecuteCommandInContainer, stdOutput: %v", stdOutput)
×
566
                }
×
567
                klog.Error(err)
×
568
                return "", err
×
569
        }
570

571
        if len(stdOutput) > 0 {
×
572
                klog.V(3).Infof("ExecuteCommandInContainer stdOutput: %v", stdOutput)
×
573
        }
×
574

575
        if len(errOutput) > 0 {
×
576
                klog.Errorf("failed to ExecuteCommandInContainer errOutput: %v", errOutput)
×
577
                return "", err
×
578
        }
×
579

580
        versionMatcher := regexp.MustCompile(`v([0-9]+(\.[0-9]+)+)`)
×
581
        match := versionMatcher.FindStringSubmatch(stdOutput)
×
582
        if match == nil {
×
583
                return "", fmt.Errorf("no iptables version found in string: %s", stdOutput)
×
584
        }
×
585
        return match[1], nil
×
586
}
587

588
func (c *Controller) handleUpdateNatGwSubnetRoute(natGwKey string) error {
×
589
        gw, err := c.vpcNatGatewayLister.Get(natGwKey)
×
590
        if err != nil {
×
591
                if k8serrors.IsNotFound(err) {
×
592
                        return nil
×
593
                }
×
594
                klog.Error(err)
×
595
                return err
×
596
        }
597

598
        if vpcNatEnabled != "true" {
×
599
                return errors.New("iptables nat gw not enable")
×
600
        }
×
601

602
        c.vpcNatGwKeyMutex.LockKey(natGwKey)
×
603
        defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(natGwKey) }()
×
604
        klog.Infof("handle update subnet route for nat gateway %s", natGwKey)
×
605

×
606
        pod, err := c.getNatGwPod(natGwKey)
×
607
        if err != nil {
×
608
                err = fmt.Errorf("failed to get nat gw '%s' pod, %w", natGwKey, err)
×
609
                klog.Error(err)
×
610
                return err
×
611
        }
×
612

613
        v4InternalGw, _, err := c.GetGwBySubnet(gw.Spec.Subnet)
×
614
        if err != nil {
×
615
                err = fmt.Errorf("failed to get gw, err: %w", err)
×
616
                klog.Error(err)
×
617
                return err
×
618
        }
×
619
        vpc, err := c.vpcsLister.Get(gw.Spec.Vpc)
×
620
        if err != nil {
×
621
                err = fmt.Errorf("failed to get vpc, err: %w", err)
×
622
                klog.Error(err)
×
623
                return err
×
624
        }
×
625

626
        // update route table
627
        var newCIDRS, oldCIDRs, toBeDelCIDRs []string
×
628
        // Map of subnet provider to CIDRs, used to generate/update runtime Pod annotations
×
629
        newProviderCIDRMap := make(map[string][]string)
×
630

×
631
        if len(vpc.Status.Subnets) > 0 {
×
632
                for _, s := range vpc.Status.Subnets {
×
633
                        subnet, err := c.subnetsLister.Get(s)
×
634
                        if err != nil {
×
635
                                err = fmt.Errorf("failed to get subnet, err: %w", err)
×
636
                                klog.Error(err)
×
637
                                return err
×
638
                        }
×
639
                        if subnet.Spec.Vlan != "" && !subnet.Spec.U2OInterconnection {
×
640
                                continue
×
641
                        }
642
                        if !isOvnSubnet(subnet) || !subnet.Status.IsValidated() {
×
643
                                continue
×
644
                        }
645
                        if v4Cidr, _ := util.SplitStringIP(subnet.Spec.CIDRBlock); v4Cidr != "" {
×
646
                                newCIDRS = append(newCIDRS, v4Cidr)
×
647
                                // Store the provider and CIDR for later use to update runtime Pod annotations
×
648
                                newProviderCIDRMap[subnet.Spec.Provider] = append(newProviderCIDRMap[subnet.Spec.Provider], v4Cidr)
×
649
                        }
×
650
                }
651
        }
652
        // Get all the CIDRs that are already in the runtime Pod annotations
653
        for annotation, value := range pod.Annotations {
×
654
                if strings.Contains(annotation, ".kubernetes.io/vpc_cidrs") {
×
655
                        var existingCIDR []string
×
656
                        if err = json.Unmarshal([]byte(value), &existingCIDR); err != nil {
×
657
                                klog.Error(err)
×
658
                                return err
×
659
                        }
×
660
                        // Defense in depth: validate CIDR format before using in shell commands
661
                        for _, cidr := range existingCIDR {
×
662
                                if err = util.CheckCidrs(cidr); err != nil {
×
663
                                        klog.Warningf("skipping invalid CIDR %q from annotation %q: %v", cidr, annotation, err)
×
664
                                        continue
×
665
                                }
666
                                oldCIDRs = append(oldCIDRs, cidr)
×
667
                        }
668
                }
669
        }
670
        for _, old := range oldCIDRs {
×
671
                if !slices.Contains(newCIDRS, old) {
×
672
                        toBeDelCIDRs = append(toBeDelCIDRs, old)
×
673
                }
×
674
        }
675

676
        if len(newCIDRS) > 0 {
×
677
                var rules []string
×
678
                for _, cidr := range newCIDRS {
×
679
                        if !util.CIDRContainIP(cidr, v4InternalGw) {
×
680
                                rules = append(rules, fmt.Sprintf("%s,%s", cidr, v4InternalGw))
×
681
                        }
×
682
                }
683
                if len(rules) > 0 {
×
684
                        if err = c.execNatGwRules(pod, natGwSubnetRouteAdd, rules); err != nil {
×
685
                                err = fmt.Errorf("failed to exec nat gateway rule, err: %w", err)
×
686
                                klog.Error(err)
×
687
                                return err
×
688
                        }
×
689
                }
690
        }
691

692
        if len(toBeDelCIDRs) > 0 {
×
693
                for _, cidr := range toBeDelCIDRs {
×
694
                        if err = c.execNatGwRules(pod, natGwSubnetRouteDel, []string{cidr}); err != nil {
×
695
                                err = fmt.Errorf("failed to exec nat gateway rule, err: %w", err)
×
696
                                klog.Error(err)
×
697
                                return err
×
698
                        }
×
699
                }
700
        }
701

702
        // Generate runtime Pod annotations for vpc_cidrs (one per subnet provider)
703
        patch := util.KVPatch{}
×
704

×
705
        // Track existing vpc_cidrs runtime Pod annotations to identify stale ones
×
706
        existingProviders := make(map[string]bool)
×
707
        for annotation := range pod.Annotations {
×
708
                if strings.Contains(annotation, ".kubernetes.io/vpc_cidrs") {
×
709
                        // Extract provider name from annotation key: <provider>.kubernetes.io/vpc_cidrs
×
710
                        parts := strings.Split(annotation, ".kubernetes.io/vpc_cidrs")
×
711
                        if len(parts) == 2 && parts[1] == "" {
×
712
                                provider := parts[0]
×
713
                                existingProviders[provider] = true
×
714
                        }
×
715
                }
716
        }
717

718
        // Add/update runtime Pod annotations for current providers
719
        for provider, cidrs := range newProviderCIDRMap {
×
720
                cidrBytes, err := json.Marshal(cidrs)
×
721
                if err != nil {
×
722
                        klog.Errorf("marshal vpc_cidrs annotation failed %v", err)
×
723
                        return err
×
724
                }
×
725
                patch[fmt.Sprintf(util.VpcCIDRsAnnotationTemplate, provider)] = string(cidrBytes)
×
726
                // Mark this provider as still active
×
727
                delete(existingProviders, provider)
×
728
        }
729

730
        // Remove stale runtime Pod annotations for providers no longer associated with the VPC
731
        for provider := range existingProviders {
×
732
                patch[fmt.Sprintf(util.VpcCIDRsAnnotationTemplate, provider)] = nil
×
733
                klog.V(3).Infof("Removing stale vpc_cidrs runtime annotation for provider %s from pod %s/%s", provider, pod.Namespace, pod.Name)
×
734
        }
×
735

736
        // Only patch if there are changes to make
737
        if len(patch) > 0 {
×
738
                if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(pod.Namespace), pod.Name, patch); err != nil {
×
739
                        err = fmt.Errorf("failed to patch pod %s/%s: %w", pod.Namespace, pod.Name, err)
×
740
                        klog.Error(err)
×
741
                        return err
×
742
                }
×
743
                klog.V(3).Infof("Successfully patched %d vpc_cidrs annotations on pod %s/%s", len(patch), pod.Namespace, pod.Name)
×
744
        }
745

746
        return nil
×
747
}
748

749
// TODO: Refactor to avoid shell command injection vulnerability.
750
// Current implementation uses "bash -c" with string concatenation, which could be exploited
751
// if any element in the rules slice contains shell metacharacters.
752
// Recommended fix: Pass arguments directly as a slice instead of joining them into a shell command:
753
//
754
//        args := append([]string{"/kube-ovn/nat-gateway.sh", operation}, rules...)
755
//        util.ExecuteCommandInContainer(..., args...)
756
//
757
// This requires updating nat-gateway.sh to accept arguments via $@ instead of parsing a single string.
758
// Current risk is mitigated by CIDR format validation on all data sources reaching this function.
759
func (c *Controller) execNatGwRules(pod *corev1.Pod, operation string, rules []string) error {
×
760
        lockKey := fmt.Sprintf("nat-gw-exec:%s/%s", pod.Namespace, pod.Name)
×
761

×
762
        c.vpcNatGwExecKeyMutex.LockKey(lockKey)
×
763
        defer func() {
×
764
                _ = c.vpcNatGwExecKeyMutex.UnlockKey(lockKey)
×
765
        }()
×
766

767
        cmd := fmt.Sprintf("bash /kube-ovn/nat-gateway.sh %s %s", operation, strings.Join(rules, " "))
×
768
        klog.V(3).Infof("executing NAT gateway command: %s", cmd)
×
769
        stdOutput, errOutput, err := util.ExecuteCommandInContainer(c.config.KubeClient, c.config.KubeRestConfig, pod.Namespace, pod.Name, "vpc-nat-gw", []string{"/bin/bash", "-c", cmd}...)
×
770
        if err != nil {
×
771
                if len(errOutput) > 0 {
×
772
                        klog.Errorf("NAT gateway command failed - stderr: %v", errOutput)
×
773
                }
×
774
                if len(stdOutput) > 0 {
×
775
                        klog.Infof("NAT gateway command failed - stdout: %v", stdOutput)
×
776
                }
×
777
                klog.Errorf("NAT gateway command execution error: %v", err)
×
778
                return err
×
779
        }
780

781
        if len(stdOutput) > 0 {
×
782
                klog.V(3).Infof("NAT gateway command succeeded - stdout: %v", stdOutput)
×
783
        }
×
784

785
        if len(errOutput) > 0 {
×
786
                // tc commands may output warnings to stderr (e.g., "Warning: sch_htb: quantum of class is big")
×
787
                // Filter out lines that are only warnings, but preserve actual errors
×
788
                lines := strings.Split(errOutput, "\n")
×
789
                var errorLines []string
×
790
                for _, line := range lines {
×
791
                        trimmedLine := strings.TrimSpace(line)
×
792
                        if trimmedLine == "" {
×
793
                                continue
×
794
                        }
795
                        // Skip lines that are just warnings
796
                        if strings.HasPrefix(trimmedLine, "Warning:") {
×
797
                                klog.Warningf("NAT gateway command warning: %v", trimmedLine)
×
798
                                continue
×
799
                        }
800
                        errorLines = append(errorLines, trimmedLine)
×
801
                }
802
                // If there are actual error lines (not just warnings), return error
803
                if len(errorLines) > 0 {
×
804
                        errMsg := strings.Join(errorLines, "; ")
×
805
                        klog.Errorf("failed to ExecuteCommandInContainer errOutput: %v", errMsg)
×
806
                        return errors.New(errMsg)
×
807
                }
×
808
        }
809
        return nil
×
810
}
811

812
// setNatGwAPIAccess modifies StatefulSet Pod template annotations to add an interface with API access to the NAT gateway.
813
// It attaches the standard externalNetwork to the gateway via a NetworkAttachmentDefinition (NAD) with a provider
814
// corresponding to one that is configured on a subnet part of the default VPC (the K8S apiserver runs in the default VPC).
815
func (c *Controller) setNatGwAPIAccess(annotations map[string]string) error {
×
816
        // Check the NetworkAttachmentDefinition provider exists, must be user-configured
×
817
        if vpcNatAPINadProvider == "" {
×
818
                return errors.New("no NetworkAttachmentDefinition provided to access apiserver, check configmap ovn-vpc-nat-config and field 'apiNadProvider'")
×
819
        }
×
820

821
        // Subdivide provider so we can infer the name of the NetworkAttachmentDefinition
822
        providerSplit := strings.Split(vpcNatAPINadProvider, ".")
×
823
        if len(providerSplit) != 3 || providerSplit[2] != util.OvnProvider {
×
824
                return fmt.Errorf("name of the provider must have syntax 'name.namespace.ovn', got %s", vpcNatAPINadProvider)
×
825
        }
×
826

827
        // Extract the name of the provider and its namespace
828
        name, namespace := providerSplit[0], providerSplit[1]
×
829

×
830
        // Craft the name of the NAD for the externalNetwork and the apiNetwork
×
831
        networkAttachments := []string{fmt.Sprintf("%s/%s", namespace, name)}
×
832
        if externalNetworkAttachment, ok := annotations[nadv1.NetworkAttachmentAnnot]; ok {
×
833
                networkAttachments = append([]string{externalNetworkAttachment}, networkAttachments...)
×
834
        }
×
835

836
        // Attach the NADs to the Pod by adding them to the special annotation
837
        annotations[nadv1.NetworkAttachmentAnnot] = strings.Join(networkAttachments, ",")
×
838

×
839
        // Set the network route to the API, so we can reach it
×
840
        return c.setNatGwAPIRoute(annotations, namespace, name)
×
841
}
842

843
// setNatGwAPIRoute modifies StatefulSet Pod template annotations to add routes for reaching the K8S API server.
844
func (c *Controller) setNatGwAPIRoute(annotations map[string]string, nadNamespace, nadName string) error {
×
845
        dst := os.Getenv(util.EnvKubernetesServiceHost)
×
846

×
847
        protocol := util.CheckProtocol(dst)
×
848
        if !strings.ContainsRune(dst, '/') {
×
849
                switch protocol {
×
850
                case kubeovnv1.ProtocolIPv4:
×
851
                        dst += "/32"
×
852
                case kubeovnv1.ProtocolIPv6:
×
853
                        dst += "/128"
×
854
                }
855
        }
856

857
        // Retrieve every subnet on the cluster
858
        subnets, err := c.subnetsLister.List(labels.Everything())
×
859
        if err != nil {
×
860
                return fmt.Errorf("failed to list subnets: %w", err)
×
861
        }
×
862

863
        // Retrieve the subnet connected to the NAD, this subnet should be in the VPC of the API
864
        apiSubnet, err := c.findSubnetByNetworkAttachmentDefinition(nadNamespace, nadName, subnets)
×
865
        if err != nil {
×
866
                return fmt.Errorf("failed to find api subnet using the nad %s/%s: %w", nadNamespace, nadName, err)
×
867
        }
×
868

869
        // Craft the route to reach the API from the subnet we've just retrieved
870
        for gw := range strings.SplitSeq(apiSubnet.Spec.Gateway, ",") {
×
871
                if util.CheckProtocol(gw) == protocol {
×
872
                        routes := []request.Route{{Destination: dst, Gateway: gw}}
×
873
                        buf, err := json.Marshal(routes)
×
874
                        if err != nil {
×
875
                                return fmt.Errorf("failed to marshal routes %+v: %w", routes, err)
×
876
                        }
×
877

878
                        annotations[fmt.Sprintf(util.RoutesAnnotationTemplate, vpcNatAPINadProvider)] = string(buf)
×
879
                        break
×
880
                }
881
        }
882

883
        return nil
×
884
}
885

886
func (c *Controller) GetSubnetProvider(subnetName string) (string, error) {
1✔
887
        subnet, err := c.subnetsLister.Get(subnetName)
1✔
888
        if err != nil {
2✔
889
                return "", fmt.Errorf("failed to get subnet %s: %w", subnetName, err)
1✔
890
        }
1✔
891
        // Make sure the subnet is an OVN subnet
892
        if !isOvnSubnet(subnet) {
1✔
893
                return "", fmt.Errorf("subnet %s is not an OVN subnet", subnetName)
×
894
        }
×
895
        return subnet.Spec.Provider, nil
1✔
896
}
897

898
func (c *Controller) genNatGwStatefulSet(gw *kubeovnv1.VpcNatGateway, oldSts *v1.StatefulSet, natGwPodContainerRestartCount int32) (*v1.StatefulSet, error) {
×
899
        externalNadNamespace, externalNadName, err := c.getExternalSubnetNad(gw)
×
900
        if err != nil {
×
901
                klog.Errorf("failed to get gw external subnet nad: %v", err)
×
902
                return nil, err
×
903
        }
×
904

905
        eth0SubnetProvider, err := c.GetSubnetProvider(gw.Spec.Subnet)
×
906
        if err != nil {
×
907
                klog.Errorf("failed to get gw eth0 valid subnet provider: %v", err)
×
908
                return nil, err
×
909
        }
×
910

911
        // Get additional networks specified by user in VpcNatGateway CR metadata.annotations (for secondary CNI mode)
912
        // TODO: the EnableNonPrimaryCNI check may not be necessary, as additional NADs could also
913
        // be useful in primary CNI mode. Consider removing this condition in the future.
914
        var additionalNetworks string
×
915
        if c.config.EnableNonPrimaryCNI && gw.Annotations != nil {
×
916
                additionalNetworks = gw.Annotations[nadv1.NetworkAttachmentAnnot]
×
917
        }
×
918

919
        // Generate StatefulSet Pod template annotations.
920
        // User-defined annotations (gw.Spec.Annotations) are used as base, system annotations are set on top.
921
        templateAnnotations, err := util.GenNatGwPodAnnotations(gw.Spec.Annotations, gw, externalNadNamespace, externalNadName, eth0SubnetProvider, additionalNetworks)
×
922
        if err != nil {
×
923
                klog.Errorf("vpc nat gateway annotation generation failed: %s", err.Error())
×
924
                return nil, err
×
925
        }
×
926

927
        // Restart logic to fix #5072
928
        if oldSts != nil && len(oldSts.Spec.Template.Annotations) != 0 {
×
929
                if _, ok := oldSts.Spec.Template.Annotations[util.VpcNatGatewayContainerRestartAnnotation]; !ok && natGwPodContainerRestartCount > 0 {
×
930
                        templateAnnotations[util.VpcNatGatewayContainerRestartAnnotation] = ""
×
931
                }
×
932
        }
933
        klog.V(3).Infof("%s templateAnnotations:%v", gw.Name, templateAnnotations)
×
934

×
935
        // Add an interface that can reach the API server, we need access to it to probe Kube-OVN resources
×
936
        if gw.Spec.BgpSpeaker.Enabled {
×
937
                if err := c.setNatGwAPIAccess(templateAnnotations); err != nil {
×
938
                        klog.Errorf("couldn't add an API interface to the NAT gateway: %v", err)
×
939
                        return nil, err
×
940
                }
×
941
        }
942

943
        // Retrieve all subnets in existence
944
        subnets, err := c.subnetsLister.List(labels.Everything())
×
945
        if err != nil {
×
946
                klog.Errorf("failed to list subnets: %v", err)
×
947
                return nil, err
×
948
        }
×
949

950
        // Configure eth0 (OVN internal network) routes
951
        // Retrieve the gateways of the subnet sitting behind the NAT gateway
952
        eth0V4Gateway, eth0V6Gateway, err := c.GetGwBySubnet(gw.Spec.Subnet)
×
953
        if err != nil {
×
954
                klog.Errorf("failed to get gateway ips for subnet %s: %v", gw.Spec.Subnet, err)
×
955
                return nil, err
×
956
        }
×
957

958
        // Add routes to join the services (is this still needed?)
959
        // It seems like the script inside the NAT GW already does that
960
        v4ClusterIPRange, v6ClusterIPRange := util.SplitStringIP(c.config.ServiceClusterIPRange)
×
961
        routes := make([]request.Route, 0, 2)
×
962
        if eth0V4Gateway != "" && v4ClusterIPRange != "" {
×
963
                routes = append(routes, request.Route{Destination: v4ClusterIPRange, Gateway: eth0V4Gateway})
×
964
        }
×
965
        if eth0V6Gateway != "" && v6ClusterIPRange != "" {
×
966
                routes = append(routes, request.Route{Destination: v6ClusterIPRange, Gateway: eth0V6Gateway})
×
967
        }
×
968

969
        // Add gateway to join every subnet in the same VPC? (is this still needed?)
970
        // Are we trying to give the NAT gateway access to every subnet in the VPC?
971
        // I suspect this is to solve a problem where a static route is inserted to redirect all the traffic
972
        // from a VPC into the NAT GW. When that happens, the GW has no return path to the other subnets.
973
        for _, subnet := range subnets {
×
974
                if subnet.Spec.Vpc != gw.Spec.Vpc || subnet.Name == gw.Spec.Subnet ||
×
975
                        !isOvnSubnet(subnet) || !subnet.Status.IsValidated() ||
×
976
                        (subnet.Spec.Vlan != "" && !subnet.Spec.U2OInterconnection) {
×
977
                        continue
×
978
                }
979
                cidrV4, cidrV6 := util.SplitStringIP(subnet.Spec.CIDRBlock)
×
980
                if cidrV4 != "" && eth0V4Gateway != "" {
×
981
                        routes = append(routes, request.Route{Destination: cidrV4, Gateway: eth0V4Gateway})
×
982
                }
×
983
                if cidrV6 != "" && eth0V6Gateway != "" {
×
984
                        routes = append(routes, request.Route{Destination: cidrV6, Gateway: eth0V6Gateway})
×
985
                }
×
986
        }
987

988
        // Users can specify custom routes to inject in the NAT GW
989
        for _, route := range gw.Spec.Routes {
×
990
                nexthop := route.NextHopIP
×
991

×
992
                // Users can specify "gateway" instead of an actual IP as the next hop, and
×
993
                // we will auto-determine the address of the gateway based on the protocol
×
994
                if nexthop == "gateway" {
×
995
                        if util.CheckProtocol(route.CIDR) == kubeovnv1.ProtocolIPv4 {
×
996
                                nexthop = eth0V4Gateway
×
997
                        } else {
×
998
                                nexthop = eth0V6Gateway
×
999
                        }
×
1000
                }
1001

1002
                routes = append(routes, request.Route{Destination: route.CIDR, Gateway: nexthop})
×
1003
        }
1004

1005
        if err = setPodRoutesAnnotation(templateAnnotations, eth0SubnetProvider, routes); err != nil {
×
1006
                klog.Error(err)
×
1007
                return nil, err
×
1008
        }
×
1009

1010
        // Set the default routes to the external network
1011
        net1Subnet, err := c.subnetsLister.Get(util.GetNatGwExternalNetwork(gw.Spec.ExternalSubnets))
×
1012
        if err != nil {
×
1013
                klog.Error(err)
×
1014
                return nil, err
×
1015
        }
×
1016

1017
        routes = routes[0:0]
×
1018
        net1V4Gateway, net1V6Gateway := util.SplitStringIP(net1Subnet.Spec.Gateway)
×
1019
        if net1V4Gateway != "" {
×
1020
                routes = append(routes, request.Route{Destination: "0.0.0.0/0", Gateway: net1V4Gateway})
×
1021
        }
×
1022
        if net1V6Gateway != "" {
×
1023
                routes = append(routes, request.Route{Destination: "::/0", Gateway: net1V6Gateway})
×
1024
        }
×
1025
        // TODO:// check NAD if has ipam to disable ipam
1026
        if !gw.Spec.NoDefaultEIP {
×
1027
                if err = setPodRoutesAnnotation(templateAnnotations, net1Subnet.Spec.Provider, routes); err != nil {
×
1028
                        klog.Error(err)
×
1029
                        return nil, err
×
1030
                }
×
1031
        } else {
×
1032
                // NAT gateway uses no-IPAM mode in network attachment definition when NoDefaultEIP is enabled
×
1033
                // This allows macvlan/other CNI plugins to work without IP allocation from Kube-OVN
×
1034
                klog.Infof("skipping IP allocation for NAT gateway %s (NoDefaultEIP enabled)", gw.Name)
×
1035
                templateAnnotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, net1Subnet.Spec.Provider)] = "true"
×
1036
        }
×
1037

1038
        selectors := util.GenNatGwSelectors(gw.Spec.Selector)
×
1039
        klog.V(3).Infof("prepare for vpc nat gateway pod, node selector: %v", selectors)
×
1040

×
1041
        labels := util.GenNatGwLabels(gw.Name)
×
1042

×
1043
        sts := &v1.StatefulSet{
×
1044
                ObjectMeta: metav1.ObjectMeta{
×
1045
                        Name:   util.GenNatGwName(gw.Name),
×
1046
                        Labels: labels,
×
1047
                },
×
1048
                Spec: v1.StatefulSetSpec{
×
1049
                        Replicas: ptr.To(int32(1)),
×
1050
                        Selector: &metav1.LabelSelector{
×
1051
                                MatchLabels: labels,
×
1052
                        },
×
1053
                        Template: corev1.PodTemplateSpec{
×
1054
                                ObjectMeta: metav1.ObjectMeta{
×
1055
                                        Labels:      labels,
×
1056
                                        Annotations: templateAnnotations,
×
1057
                                },
×
1058
                                Spec: corev1.PodSpec{
×
1059
                                        TerminationGracePeriodSeconds: ptr.To(int64(0)),
×
1060
                                        Containers: []corev1.Container{
×
1061
                                                {
×
1062
                                                        Name:    "vpc-nat-gw",
×
1063
                                                        Image:   vpcNatImage,
×
1064
                                                        Command: []string{"sleep", "infinity"},
×
1065
                                                        Lifecycle: &corev1.Lifecycle{
×
1066
                                                                PostStart: &corev1.LifecycleHandler{
×
1067
                                                                        Exec: &corev1.ExecAction{
×
1068
                                                                                Command: []string{"sh", "-c", "sysctl -w net.ipv4.ip_forward=1"},
×
1069
                                                                        },
×
1070
                                                                },
×
1071
                                                        },
×
1072
                                                        ImagePullPolicy: corev1.PullIfNotPresent,
×
1073
                                                        Env: []corev1.EnvVar{
×
1074
                                                                {
×
1075
                                                                        Name:  "GATEWAY_V4",
×
1076
                                                                        Value: net1V4Gateway,
×
1077
                                                                },
×
1078
                                                                {
×
1079
                                                                        Name:  "GATEWAY_V6",
×
1080
                                                                        Value: net1V6Gateway,
×
1081
                                                                },
×
1082
                                                        },
×
1083
                                                        SecurityContext: &corev1.SecurityContext{
×
1084
                                                                Privileged:               ptr.To(true),
×
1085
                                                                AllowPrivilegeEscalation: ptr.To(true),
×
1086
                                                        },
×
1087
                                                },
×
1088
                                        },
×
1089
                                        NodeSelector: selectors,
×
1090
                                        Tolerations:  gw.Spec.Tolerations,
×
1091
                                        Affinity:     &gw.Spec.Affinity,
×
1092
                                },
×
1093
                        },
×
1094
                        UpdateStrategy: v1.StatefulSetUpdateStrategy{
×
1095
                                Type: v1.RollingUpdateStatefulSetStrategyType,
×
1096
                        },
×
1097
                },
×
1098
        }
×
1099

×
1100
        // BGP speaker is enabled on this instance, add a BGP speaker to the statefulset
×
1101
        if gw.Spec.BgpSpeaker.Enabled {
×
1102
                // We need to connect to the K8S API to make the BGP speaker work, this implies a ServiceAccount
×
1103
                sts.Spec.Template.Spec.ServiceAccountName = "vpc-nat-gw"
×
1104
                sts.Spec.Template.Spec.AutomountServiceAccountToken = ptr.To(true)
×
1105

×
1106
                // Craft a BGP speaker container to add to our statefulset
×
1107
                bgpSpeakerContainer, err := util.GenNatGwBgpSpeakerContainer(gw.Spec.BgpSpeaker, vpcNatGwBgpSpeakerImage, gw.Name)
×
1108
                if err != nil {
×
1109
                        klog.Errorf("failed to create a BGP speaker container for gateway %s: %v", gw.Name, err)
×
1110
                        return nil, err
×
1111
                }
×
1112

1113
                // Add our container to the list of containers in the statefulset
1114
                sts.Spec.Template.Spec.Containers = append(sts.Spec.Template.Spec.Containers, *bgpSpeakerContainer)
×
1115
        }
1116

1117
        return sts, nil
×
1118
}
1119

1120
// getExternalSubnetNad returns the namespace and name of the NetworkAttachmentDefinition associated with
1121
// an external network attached to a NAT gateway
1122
func (c *Controller) getExternalSubnetNad(gw *kubeovnv1.VpcNatGateway) (string, string, error) {
1✔
1123
        externalNadNamespace := c.config.PodNamespace
1✔
1124
        // GetNatGwExternalNetwork returns the subnet name from ExternalSubnets, or "ovn-vpc-external-network" if empty
1✔
1125
        externalSubnetName := util.GetNatGwExternalNetwork(gw.Spec.ExternalSubnets)
1✔
1126

1✔
1127
        externalSubnet, err := c.subnetsLister.Get(externalSubnetName)
1✔
1128
        if err != nil {
2✔
1129
                err = fmt.Errorf("failed to get external subnet %s for NAT gateway %s: %w", externalSubnetName, gw.Name, err)
1✔
1130
                klog.Error(err)
1✔
1131
                return "", "", err
1✔
1132
        }
1✔
1133

1134
        // Try to parse NAD info from subnet's provider
1135
        if name, namespace, ok := util.GetNadBySubnetProvider(externalSubnet.Spec.Provider); ok {
2✔
1136
                return namespace, name, nil
1✔
1137
        }
1✔
1138

1139
        // Provider cannot be parsed to NAD info (e.g., provider is "ovn" or empty)
1140
        // Fall back to default NAD name which is the same as subnet name for external subnets
1141
        klog.Warningf("subnet %s provider %q cannot be parsed to NAD info, using default NAD %s/%s",
1✔
1142
                externalSubnetName, externalSubnet.Spec.Provider, externalNadNamespace, externalSubnetName)
1✔
1143
        return externalNadNamespace, externalSubnetName, nil
1✔
1144
}
1145

1146
func (c *Controller) cleanUpVpcNatGw() error {
×
1147
        gws, err := c.vpcNatGatewayLister.List(labels.Everything())
×
1148
        if err != nil {
×
1149
                klog.Errorf("failed to get vpc nat gateway, %v", err)
×
1150
                return err
×
1151
        }
×
1152
        for _, gw := range gws {
×
1153
                c.delVpcNatGatewayQueue.Add(gw.Name)
×
1154
        }
×
1155
        return nil
×
1156
}
1157

1158
func (c *Controller) getNatGwPod(name string) (*corev1.Pod, error) {
×
1159
        selector := labels.Set{"app": util.GenNatGwName(name), util.VpcNatGatewayLabel: "true"}.AsSelector()
×
1160
        pods, err := c.podsLister.Pods(c.config.PodNamespace).List(selector)
×
1161

×
1162
        switch {
×
1163
        case err != nil:
×
1164
                klog.Error(err)
×
1165
                return nil, err
×
1166
        case len(pods) == 0:
×
1167
                return nil, k8serrors.NewNotFound(v1.Resource("pod"), name)
×
1168
        case len(pods) != 1:
×
1169
                time.Sleep(5 * time.Second)
×
1170
                return nil, errors.New("too many pod")
×
1171
        case pods[0].Status.Phase != corev1.PodRunning:
×
1172
                time.Sleep(5 * time.Second)
×
1173
                return nil, errors.New("pod is not active now")
×
1174
        }
1175

1176
        return pods[0], nil
×
1177
}
1178

1179
func (c *Controller) initCreateAt(key string) (err error) {
×
1180
        if natGwCreatedAT != "" {
×
1181
                return nil
×
1182
        }
×
1183
        pod, err := c.getNatGwPod(key)
×
1184
        if err != nil {
×
1185
                klog.Error(err)
×
1186
                return err
×
1187
        }
×
1188
        natGwCreatedAT = pod.CreationTimestamp.Format("2006-01-02T15:04:05")
×
1189
        return nil
×
1190
}
1191

1192
func (c *Controller) updateCrdNatGwLabels(key, qos string) error {
×
1193
        oriGw, err := c.vpcNatGatewayLister.Get(key)
×
1194
        if err != nil {
×
1195
                errMsg := fmt.Errorf("failed to get vpc nat gw '%s', %w", key, err)
×
1196
                klog.Error(errMsg)
×
1197
                return errMsg
×
1198
        }
×
1199
        var needUpdateLabel bool
×
1200
        var op string
×
1201

×
1202
        // Create a new labels map to avoid modifying the informer cache
×
1203
        labels := make(map[string]string, len(oriGw.Labels)+3)
×
NEW
1204
        maps.Copy(labels, oriGw.Labels)
×
1205

×
UNCOV
1206
        // vpc nat gw label may lost
×
1207
        if len(oriGw.Labels) == 0 {
×
1208
                op = "add"
×
1209
                labels[util.SubnetNameLabel] = oriGw.Spec.Subnet
×
1210
                labels[util.VpcNameLabel] = oriGw.Spec.Vpc
×
1211
                labels[util.QoSLabel] = qos
×
1212
                needUpdateLabel = true
×
1213
        } else {
×
1214
                if oriGw.Labels[util.SubnetNameLabel] != oriGw.Spec.Subnet {
×
1215
                        op = "replace"
×
1216
                        labels[util.SubnetNameLabel] = oriGw.Spec.Subnet
×
1217
                        needUpdateLabel = true
×
1218
                }
×
1219
                if oriGw.Labels[util.VpcNameLabel] != oriGw.Spec.Vpc {
×
1220
                        op = "replace"
×
1221
                        labels[util.VpcNameLabel] = oriGw.Spec.Vpc
×
1222
                        needUpdateLabel = true
×
1223
                }
×
1224
                if oriGw.Labels[util.QoSLabel] != qos {
×
1225
                        op = "replace"
×
1226
                        labels[util.QoSLabel] = qos
×
1227
                        needUpdateLabel = true
×
1228
                }
×
1229
        }
1230
        if needUpdateLabel {
×
1231
                patchPayloadTemplate := `[{ "op": "%s", "path": "/metadata/labels", "value": %s }]`
×
1232
                raw, _ := json.Marshal(labels)
×
1233
                patchPayload := fmt.Sprintf(patchPayloadTemplate, op, raw)
×
1234
                if _, err := c.config.KubeOvnClient.KubeovnV1().VpcNatGateways().Patch(context.Background(), oriGw.Name, types.JSONPatchType,
×
1235
                        []byte(patchPayload), metav1.PatchOptions{}); err != nil {
×
1236
                        klog.Errorf("failed to patch vpc nat gw %s: %v", oriGw.Name, err)
×
1237
                        return err
×
1238
                }
×
1239
        }
1240
        return nil
×
1241
}
1242

1243
func (c *Controller) patchNatGwQoSStatus(key, qos string) error {
×
1244
        // add qos label to vpc nat gw
×
1245
        var changed bool
×
1246
        oriGw, err := c.vpcNatGatewayLister.Get(key)
×
1247
        if err != nil {
×
1248
                if k8serrors.IsNotFound(err) {
×
1249
                        return nil
×
1250
                }
×
1251
                klog.Errorf("failed to get vpc nat gw %s, %v", key, err)
×
1252
                return err
×
1253
        }
1254
        gw := oriGw.DeepCopy()
×
1255

×
1256
        // update status.qosPolicy
×
1257
        if gw.Status.QoSPolicy != qos {
×
1258
                gw.Status.QoSPolicy = qos
×
1259
                changed = true
×
1260
        }
×
1261

1262
        if changed {
×
1263
                bytes, err := gw.Status.Bytes()
×
1264
                if err != nil {
×
1265
                        klog.Errorf("failed to marshal vpc nat gw %s status, %v", gw.Name, err)
×
1266
                        return err
×
1267
                }
×
1268
                if _, err = c.config.KubeOvnClient.KubeovnV1().VpcNatGateways().Patch(context.Background(), gw.Name, types.MergePatchType,
×
1269
                        bytes, metav1.PatchOptions{}, "status"); err != nil {
×
1270
                        if k8serrors.IsNotFound(err) {
×
1271
                                return nil
×
1272
                        }
×
1273
                        klog.Errorf("failed to patch gw %s, %v", gw.Name, err)
×
1274
                        return err
×
1275
                }
1276
        }
1277
        return nil
×
1278
}
1279

1280
func (c *Controller) patchNatGwStatus(key string) error {
×
1281
        var changed bool
×
1282
        oriGw, err := c.vpcNatGatewayLister.Get(key)
×
1283
        if err != nil {
×
1284
                if k8serrors.IsNotFound(err) {
×
1285
                        return nil
×
1286
                }
×
1287
                klog.Errorf("failed to get vpc nat gw %s, %v", key, err)
×
1288
                return err
×
1289
        }
1290
        gw := oriGw.DeepCopy()
×
1291

×
1292
        if !slices.Equal(gw.Spec.ExternalSubnets, gw.Status.ExternalSubnets) {
×
1293
                gw.Status.ExternalSubnets = gw.Spec.ExternalSubnets
×
1294
                changed = true
×
1295
        }
×
1296
        if !slices.Equal(gw.Spec.Selector, gw.Status.Selector) {
×
1297
                gw.Status.Selector = gw.Spec.Selector
×
1298
                changed = true
×
1299
        }
×
1300
        if !reflect.DeepEqual(gw.Spec.Tolerations, gw.Status.Tolerations) {
×
1301
                gw.Status.Tolerations = gw.Spec.Tolerations
×
1302
                changed = true
×
1303
        }
×
1304
        if !reflect.DeepEqual(gw.Spec.Affinity, gw.Status.Affinity) {
×
1305
                gw.Status.Affinity = gw.Spec.Affinity
×
1306
                changed = true
×
1307
        }
×
1308

1309
        if changed {
×
1310
                bytes, err := gw.Status.Bytes()
×
1311
                if err != nil {
×
1312
                        klog.Error(err)
×
1313
                        return err
×
1314
                }
×
1315
                if _, err = c.config.KubeOvnClient.KubeovnV1().VpcNatGateways().Patch(context.Background(), gw.Name, types.MergePatchType,
×
1316
                        bytes, metav1.PatchOptions{}, "status"); err != nil {
×
1317
                        if k8serrors.IsNotFound(err) {
×
1318
                                return nil
×
1319
                        }
×
1320
                        klog.Errorf("failed to patch gw %s, %v", gw.Name, err)
×
1321
                        return err
×
1322
                }
1323
        }
1324
        return nil
×
1325
}
1326

1327
func (c *Controller) execNatGwQoS(gw *kubeovnv1.VpcNatGateway, qos, operation string) error {
×
1328
        qosPolicy, err := c.qosPoliciesLister.Get(qos)
×
1329
        if err != nil {
×
1330
                klog.Errorf("get qos policy %s failed: %v", qos, err)
×
1331
                return err
×
1332
        }
×
1333
        if !qosPolicy.Status.Shared {
×
1334
                err := fmt.Errorf("not support unshared qos policy %s to related to gw", qos)
×
1335
                klog.Error(err)
×
1336
                return err
×
1337
        }
×
1338
        if qosPolicy.Status.BindingType != kubeovnv1.QoSBindingTypeNatGw {
×
1339
                err := fmt.Errorf("not support qos policy %s binding type %s to related to gw", qos, qosPolicy.Status.BindingType)
×
1340
                klog.Error(err)
×
1341
                return err
×
1342
        }
×
1343
        return c.execNatGwBandwidthLimitRules(gw, qosPolicy.Status.BandwidthLimitRules, operation)
×
1344
}
1345

1346
func (c *Controller) execNatGwBandwidthLimitRules(gw *kubeovnv1.VpcNatGateway, rules kubeovnv1.QoSPolicyBandwidthLimitRules, operation string) error {
×
1347
        var err error
×
1348
        for _, rule := range rules {
×
1349
                if err = c.execNatGwQoSInPod(gw.Name, &rule, operation); err != nil {
×
1350
                        klog.Errorf("failed to %s %s gw '%s' qos in pod, %v", operation, rule.Direction, gw.Name, err)
×
1351
                        return err
×
1352
                }
×
1353
        }
1354
        return nil
×
1355
}
1356

1357
func (c *Controller) execNatGwQoSInPod(
1358
        dp string, r *kubeovnv1.QoSPolicyBandwidthLimitRule, operation string,
1359
) error {
×
1360
        gwPod, err := c.getNatGwPod(dp)
×
1361
        if err != nil {
×
1362
                klog.Errorf("failed to get nat gw pod, %v", err)
×
1363
                return err
×
1364
        }
×
1365
        var addRules []string
×
1366
        var classifierType, matchDirection, cidr string
×
1367
        switch r.MatchType {
×
1368
        case "ip":
×
1369
                classifierType = "u32"
×
1370
                // matchValue: dst xxx.xxx.xxx.xxx/32
×
1371
                splitStr := strings.Split(r.MatchValue, " ")
×
1372
                if len(splitStr) != 2 {
×
1373
                        err := fmt.Errorf("matchValue %s format error", r.MatchValue)
×
1374
                        klog.Error(err)
×
1375
                        return err
×
1376
                }
×
1377
                matchDirection = splitStr[0]
×
1378
                cidr = splitStr[1]
×
1379
        case "":
×
1380
                classifierType = "matchall"
×
1381
        default:
×
1382
                err := fmt.Errorf("MatchType %s format error", r.MatchType)
×
1383
                klog.Error(err)
×
1384
                return err
×
1385
        }
1386
        rule := fmt.Sprintf("%s,%s,%d,%s,%s,%s,%s,%s,%s",
×
1387
                r.Direction, r.Interface, r.Priority,
×
1388
                classifierType, r.MatchType, matchDirection,
×
1389
                cidr, r.RateMax, r.BurstMax)
×
1390
        addRules = append(addRules, rule)
×
1391

×
1392
        if err = c.execNatGwRules(gwPod, operation, addRules); err != nil {
×
1393
                err = fmt.Errorf("failed to exec nat gateway rule, err: %w", err)
×
1394
                klog.Error(err)
×
1395
                return err
×
1396
        }
×
1397
        return nil
×
1398
}
1399

1400
func (c *Controller) initVpcNatGw() error {
×
1401
        klog.Infof("init all vpc nat gateways")
×
1402
        gws, err := c.vpcNatGatewayLister.List(labels.Everything())
×
1403
        if err != nil {
×
1404
                err = fmt.Errorf("failed to get vpc nat gw list, %w", err)
×
1405
                klog.Error(err)
×
1406
                return err
×
1407
        }
×
1408
        if len(gws) == 0 {
×
1409
                return nil
×
1410
        }
×
1411

1412
        if vpcNatEnabled != "true" {
×
1413
                err := errors.New("iptables nat gw not enable")
×
1414
                klog.Warning(err)
×
1415
                return nil
×
1416
        }
×
1417

1418
        for _, gw := range gws {
×
1419
                pod, err := c.getNatGwPod(gw.Name)
×
1420
                if err != nil {
×
1421
                        // the nat gw maybe deleted
×
1422
                        err := fmt.Errorf("failed to get nat gw %s pod: %w", gw.Name, err)
×
1423
                        klog.Error(err)
×
1424
                        continue
×
1425
                }
1426

1427
                if isNatGateway, natGateway := c.checkIsPodVpcNatGw(pod); isNatGateway {
×
1428
                        if _, hasInit := pod.Annotations[util.VpcNatGatewayInitAnnotation]; hasInit {
×
1429
                                return nil
×
1430
                        }
×
1431
                        c.initVpcNatGatewayQueue.Add(natGateway)
×
1432
                }
1433
        }
1434
        return nil
×
1435
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc