• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 27017056817

05 Jun 2026 01:15PM UTC coverage: 25.681% (+0.09%) from 25.587%
27017056817

push

github

web-flow
perf(daemon): list subnets and pods once per gateway setter (#6827)

* perf(daemon): list subnets and pods once per gateway setter

runGateway runs every 3s, and each of setIPSet/setPolicyRouting/setIptables
re-listed the whole subnet cache up to 15 times per dual-stack tick (and pods
2-4 times). List subnets/pods once per setter and thread the slice through the
helpers, dropping the now-redundant error returns whose only source was List.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>

* perf(daemon): log getCidrByProtocol errors instead of dropping them

Address review feedback: getSubnetsNeedNAT, getSubnetsDistributedGateway and
getDefaultVpcSubnetsCIDR silently skipped subnets when getCidrByProtocol
failed, and StartTProxyTCPPortProbe swallowed getTProxyConditionPod errors.
Log these errors to match the existing handling in reconcileNatOutGoingPolicyIPset
and generateNatOutgoingPolicyChainRules; the error path only triggers on
malformed CIDRs, not on normal single-stack subnets.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>

---------

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

0 of 80 new or added lines in 3 files covered. (0.0%)

433 existing lines in 5 files now uncovered.

14867 of 57891 relevant lines covered (25.68%)

0.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/daemon/tproxy_linux.go
1
package daemon
2

3
import (
4
        "errors"
5
        "fmt"
6
        "io"
7
        "net"
8
        "strconv"
9
        "sync"
10
        "syscall"
11

12
        "github.com/containernetworking/plugins/pkg/ns"
13
        "github.com/vishvananda/netlink"
14
        "golang.org/x/sys/unix"
15
        corev1 "k8s.io/api/core/v1"
16
        "k8s.io/apimachinery/pkg/labels"
17
        "k8s.io/apimachinery/pkg/util/intstr"
18
        "k8s.io/klog/v2"
19
        "k8s.io/utils/set"
20

21
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
22
        "github.com/kubeovn/kube-ovn/pkg/ovs"
23
        goTProxy "github.com/kubeovn/kube-ovn/pkg/tproxy"
24
        "github.com/kubeovn/kube-ovn/pkg/util"
25
)
26

27
var (
28
        customVPCPodIPToNs         sync.Map
29
        customVPCPodTCPProbeIPPort sync.Map
30
)
31

32
func (c *Controller) StartTProxyForwarding() {
×
33
        for _, addr := range util.GetDefaultListenAddr() {
×
34
                protocol := "tcp"
×
35
                if util.CheckProtocol(addr) == kubeovnv1.ProtocolIPv6 {
×
36
                        protocol = "tcp6"
×
37
                }
×
38

39
                go func() {
×
40
                        tcpListener, err := goTProxy.ListenTCP(protocol, &net.TCPAddr{IP: net.ParseIP(addr), Port: util.TProxyListenPort})
×
41
                        if err != nil {
×
42
                                klog.Fatalf("Encountered error while binding listener: %s", err)
×
43
                                return
×
44
                        }
×
45

46
                        defer func() {
×
47
                                if err := tcpListener.Close(); err != nil {
×
48
                                        klog.Errorf("Error tcpListener Close err: %v", err)
×
49
                                }
×
50
                        }()
51

52
                        for {
×
53
                                conn, err := tcpListener.Accept()
×
54
                                if err != nil {
×
55
                                        klog.Fatalf("Unrecoverable error while accepting connection: %s", err)
×
56
                                        return
×
57
                                }
×
58
                                go handleRedirectFlow(conn)
×
59
                        }
60
                }()
61
        }
62
}
63

64
func getProbePorts(pod *corev1.Pod) set.Set[int32] {
×
65
        ports := set.New[int32]()
×
66
        for _, container := range pod.Spec.Containers {
×
67
                for _, probe := range [...]*corev1.Probe{container.LivenessProbe, container.ReadinessProbe} {
×
68
                        if probe == nil {
×
69
                                continue
×
70
                        }
71
                        var port intstr.IntOrString
×
72
                        switch {
×
73
                        case probe.TCPSocket != nil:
×
74
                                port = probe.TCPSocket.Port
×
75
                        case probe.HTTPGet != nil:
×
76
                                port = probe.HTTPGet.Port
×
77
                        case probe.GRPC != nil:
×
78
                                port = intstr.FromInt32(probe.GRPC.Port)
×
79
                        default:
×
80
                                continue
×
81
                        }
82
                        if port.Type == intstr.Int {
×
83
                                ports.Insert(port.IntVal)
×
84
                                continue
×
85
                        }
86
                        for _, p := range container.Ports {
×
87
                                if p.Name == port.StrVal {
×
88
                                        ports.Insert(p.ContainerPort)
×
89
                                        break
×
90
                                }
91
                        }
92
                }
93
        }
94

95
        ports.Delete(0)
×
96
        klog.V(3).Infof("probe ports for pod %s/%s: %v", pod.Namespace, pod.Name, ports.SortedList())
×
97
        return ports
×
98
}
99

100
func (c *Controller) StartTProxyTCPPortProbe() {
×
NEW
101
        allPods, err := c.podsLister.List(labels.Everything())
×
102
        if err != nil {
×
NEW
103
                klog.Errorf("failed to list pods: %v", err)
×
NEW
104
                return
×
NEW
105
        }
×
NEW
106
        pods, err := c.getTProxyConditionPod(allPods, false)
×
NEW
107
        if err != nil {
×
NEW
108
                klog.Errorf("failed to get tproxy condition pods: %v", err)
×
109
                return
×
110
        }
×
111

112
        for _, pod := range pods {
×
113
                podName := pod.Name
×
114
                if vmName := pod.Annotations[util.VMAnnotation]; vmName != "" {
×
115
                        podName = vmName
×
116
                }
×
117
                iface := ovs.PodNameToPortName(podName, pod.Namespace, util.OvnProvider)
×
118
                nsName, err := ovs.GetInterfacePodNs(iface)
×
119
                if err != nil {
×
120
                        klog.Errorf("failed to get netns for pod %s/%s: %v", pod.Namespace, pod.Name, err)
×
121
                        continue
×
122
                }
123
                if nsName == "" {
×
124
                        klog.Infof("netns for pod %s/%s not found", pod.Namespace, pod.Name)
×
125
                        continue
×
126
                }
127

128
                ports := getProbePorts(pod)
×
129
                for _, podIP := range pod.Status.PodIPs {
×
130
                        customVPCPodIPToNs.Store(podIP.IP, nsName)
×
131
                        for _, port := range ports.UnsortedList() {
×
132
                                probePortInNs(podIP.IP, port, true, nil)
×
133
                        }
×
134
                }
135
        }
136
}
137

138
func (c *Controller) runTProxyConfigWorker() {
×
139
        protocols := getProtocols(c.protocol)
×
140
        for _, protocol := range protocols {
×
141
                c.reconcileTProxyRoutes(protocol)
×
142
        }
×
143
}
144

145
func (c *Controller) reconcileTProxyRoutes(protocol string) {
×
146
        family, err := util.ProtocolToFamily(protocol)
×
147
        if err != nil {
×
148
                klog.Errorf("get Protocol %s family failed", protocol)
×
149
                return
×
150
        }
×
151

152
        if err := addRuleIfNotExist(family, TProxyOutputMark, TProxyOutputMask, util.TProxyRouteTable); err != nil {
×
153
                klog.Errorf("add output rule failed: %v", err)
×
154
                return
×
155
        }
×
156

157
        if err := addRuleIfNotExist(family, TProxyPreroutingMark, TProxyPreroutingMask, util.TProxyRouteTable); err != nil {
×
158
                klog.Errorf("add prerouting rule failed: %v", err)
×
159
                return
×
160
        }
×
161

162
        dst := GetDefaultRouteDst(protocol)
×
163
        if err := addRouteIfNotExist(family, util.TProxyRouteTable, &dst); err != nil {
×
164
                klog.Errorf("add tproxy route failed: %v", err)
×
165
                return
×
166
        }
×
167
}
168

169
func (c *Controller) cleanTProxyConfig() {
×
170
        protocols := getProtocols(c.protocol)
×
171
        for _, protocol := range protocols {
×
172
                c.cleanTProxyRoutes(protocol)
×
173
                c.cleanTProxyIPTableRules(protocol)
×
174
        }
×
175
}
176

177
func (c *Controller) cleanTProxyRoutes(protocol string) {
×
178
        family, err := util.ProtocolToFamily(protocol)
×
179
        if err != nil {
×
180
                klog.Errorf("get Protocol %s family failed", protocol)
×
181
                return
×
182
        }
×
183

184
        if err := deleteRuleIfExists(family, TProxyOutputMark); err != nil {
×
185
                klog.Errorf("delete tproxy route rule mark %v failed err: %v", TProxyOutputMark, err)
×
186
        }
×
187

188
        if err := deleteRuleIfExists(family, TProxyPreroutingMark); err != nil {
×
189
                klog.Errorf("delete tproxy route rule mark %v failed err: %v", TProxyPreroutingMark, err)
×
190
        }
×
191

192
        dst := GetDefaultRouteDst(protocol)
×
193
        if err := delRouteIfExist(family, util.TProxyRouteTable, &dst); err != nil {
×
194
                klog.Errorf("delete tproxy route rule mark %v failed err: %v", TProxyPreroutingMark, err)
×
195
        }
×
196
}
197

198
func addRuleIfNotExist(family int, mark, mask uint32, table int) error {
×
199
        curRules, err := netlink.RuleListFiltered(family, &netlink.Rule{Mark: mark}, netlink.RT_FILTER_MARK)
×
200
        if err != nil {
×
201
                return fmt.Errorf("list rules with mark %x failed err: %w", mark, err)
×
202
        }
×
203

204
        if len(curRules) != 0 {
×
205
                return nil
×
206
        }
×
207

208
        rule := netlink.NewRule()
×
209
        rule.Mark = mark
×
210
        rule.Mask = new(mask)
×
211
        rule.Table = table
×
212
        rule.Family = family
×
213

×
214
        if err = netlink.RuleAdd(rule); err != nil && !errors.Is(err, syscall.EEXIST) {
×
215
                klog.Errorf("add rule %v failed with err %v", rule, err)
×
216
                return err
×
217
        }
×
218

219
        return nil
×
220
}
221

222
func deleteRuleIfExists(family int, mark uint32) error {
×
223
        curRules, err := netlink.RuleListFiltered(family, &netlink.Rule{Mark: mark}, netlink.RT_FILTER_MARK)
×
224
        if err != nil {
×
225
                return fmt.Errorf("list rules with mark %x failed err: %w", mark, err)
×
226
        }
×
227

228
        if len(curRules) != 0 {
×
229
                for _, r := range curRules {
×
230
                        if err := netlink.RuleDel(&r); err != nil && !errors.Is(err, syscall.ENOENT) {
×
231
                                return fmt.Errorf("delete rule %v failed with err: %w", r, err)
×
232
                        }
×
233
                }
234
        }
235
        return nil
×
236
}
237

238
func addRouteIfNotExist(family, table int, dst *net.IPNet) error {
×
239
        curRoutes, err := netlink.RouteListFiltered(family, &netlink.Route{Table: table, Dst: dst}, netlink.RT_FILTER_TABLE|netlink.RT_FILTER_DST)
×
240
        if err != nil {
×
241
                return fmt.Errorf("list routes with table %d failed with err: %w", table, err)
×
242
        }
×
243

244
        if len(curRoutes) != 0 {
×
245
                return nil
×
246
        }
×
247

248
        link, err := netlink.LinkByName("lo")
×
249
        if err != nil {
×
250
                return errors.New("can't find device lo")
×
251
        }
×
252

253
        route := netlink.Route{
×
254
                LinkIndex: link.Attrs().Index,
×
255
                Dst:       dst,
×
256
                Table:     table,
×
257
                Scope:     unix.RT_SCOPE_HOST,
×
258
                Type:      unix.RTN_LOCAL,
×
259
        }
×
260

×
261
        if err = netlink.RouteReplace(&route); err != nil && !errors.Is(err, syscall.EEXIST) {
×
262
                klog.Errorf("add route %v failed with err %v", route, err)
×
263
                return err
×
264
        }
×
265

266
        return nil
×
267
}
268

269
func delRouteIfExist(family, table int, dst *net.IPNet) error {
×
270
        curRoutes, err := netlink.RouteListFiltered(family, &netlink.Route{Table: table}, netlink.RT_FILTER_TABLE)
×
271
        if err != nil {
×
272
                klog.Errorf("list routes with table %d failed with err: %v", table, err)
×
273
                return err
×
274
        }
×
275

276
        if len(curRoutes) == 0 {
×
277
                return nil
×
278
        }
×
279

280
        link, err := netlink.LinkByName("lo")
×
281
        if err != nil {
×
282
                return errors.New("can't find device lo")
×
283
        }
×
284

285
        route := netlink.Route{
×
286
                LinkIndex: link.Attrs().Index,
×
287
                Dst:       dst,
×
288
                Table:     table,
×
289
                Scope:     unix.RT_SCOPE_HOST,
×
290
                Type:      unix.RTN_LOCAL,
×
291
        }
×
292

×
293
        if err = netlink.RouteDel(&route); err != nil && !errors.Is(err, syscall.ENOENT) {
×
294
                klog.Errorf("del route %v failed with err %v", route, err)
×
295
                return err
×
296
        }
×
297

298
        return nil
×
299
}
300

301
func handleRedirectFlow(conn net.Conn) {
×
302
        klog.V(5).Infof("accepting TCP connection from %s to %s", conn.RemoteAddr(), conn.LocalAddr())
×
303
        defer func() {
×
304
                if err := conn.Close(); err != nil {
×
305
                        klog.Errorf("conn Close err: %v", err)
×
306
                }
×
307
        }()
308

309
        podIPPort := conn.LocalAddr().String()
×
310
        podIP, probePort, err := net.SplitHostPort(podIPPort)
×
311
        if err != nil {
×
312
                klog.Errorf("Get %s Pod IP and Port failed err: %v", podIPPort, err)
×
313
                return
×
314
        }
×
315

316
        port, err := strconv.ParseInt(probePort, 10, 32)
×
317
        if err != nil {
×
318
                klog.Errorf("failed to parse port number %q: %v", probePort, err)
×
319
                return
×
320
        }
×
321

322
        probePortInNs(podIP, int32(port), false, conn) // #nosec G115
×
323
}
324

325
func probePortInNs(podIP string, probePort int32, isTProxyProbe bool, conn net.Conn) {
×
326
        podNs, ok := customVPCPodIPToNs.Load(podIP)
×
327
        if !ok {
×
328
                klog.V(3).Infof("failed to get netns for pod with ip %s", podIP)
×
329
                return
×
330
        }
×
331

332
        podNS, err := ns.GetNS(podNs.(string))
×
333
        if err != nil {
×
334
                customVPCPodIPToNs.Delete(podIP)
×
335
                klog.V(3).Infof("netns %s not found", podNs)
×
336
                return
×
337
        }
×
338
        defer podNS.Close()
×
339

×
340
        _ = ns.WithNetNSPath(podNS.Path(), func(_ ns.NetNS) error {
×
341
                // Packet's src and dst IP are both PodIP in netns
×
342
                localpodTCPAddr := net.TCPAddr{IP: net.ParseIP(podIP)}
×
343
                remotepodTCPAddr := net.TCPAddr{IP: net.ParseIP(podIP), Port: int(probePort)}
×
344

×
345
                remoteConn, err := goTProxy.DialTCP(&localpodTCPAddr, &remotepodTCPAddr, !isTProxyProbe)
×
346
                if err != nil {
×
347
                        if isTProxyProbe {
×
348
                                customVPCPodTCPProbeIPPort.Store(util.JoinHostPort(podIP, probePort), false)
×
349
                        }
×
350
                        return nil
×
351
                }
352

353
                if isTProxyProbe {
×
354
                        customVPCPodTCPProbeIPPort.Store(util.JoinHostPort(podIP, probePort), true)
×
355
                        return nil
×
356
                }
×
357

358
                defer func() {
×
359
                        if err := remoteConn.Close(); err != nil {
×
360
                                klog.Errorf("remoteConn %v Close err: %v", remoteConn, err)
×
361
                        }
×
362
                }()
363

364
                var streamWait sync.WaitGroup
×
365
                streamWait.Add(2)
×
366

×
367
                streamConn := func(dst io.Writer, src io.Reader) {
×
368
                        if _, err := io.Copy(dst, src); err != nil {
×
369
                                klog.Errorf("copy stream from dst %v to src %v failed err: %v", dst, src, err)
×
370
                        }
×
371

372
                        streamWait.Done()
×
373
                }
374

375
                go streamConn(remoteConn, conn)
×
376
                go streamConn(conn, remoteConn)
×
377

×
378
                streamWait.Wait()
×
379
                return nil
×
380
        })
381
}
382

383
func getProtocols(protocol string) []string {
×
384
        var protocols []string
×
385
        if protocol == kubeovnv1.ProtocolDual {
×
386
                protocols = append(protocols, kubeovnv1.ProtocolIPv4)
×
387
                protocols = append(protocols, kubeovnv1.ProtocolIPv6)
×
388
        } else {
×
389
                protocols = append(protocols, protocol)
×
390
        }
×
391
        return protocols
×
392
}
393

394
func GetDefaultRouteDst(protocol string) net.IPNet {
×
395
        var dst net.IPNet
×
396
        switch protocol {
×
397
        case kubeovnv1.ProtocolIPv4:
×
398
                dst = net.IPNet{
×
399
                        IP:   net.IPv4zero,
×
400
                        Mask: net.CIDRMask(0, 0),
×
401
                }
×
402
        case kubeovnv1.ProtocolIPv6:
×
403
                dst = net.IPNet{
×
404
                        IP:   net.IPv6zero,
×
405
                        Mask: net.CIDRMask(0, 0),
×
406
                }
×
407
        }
408
        return dst
×
409
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc