• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 15277293151

27 May 2025 01:57PM UTC coverage: 21.827% (-0.02%) from 21.847%
15277293151

push

github

web-flow
base: add CAP_NET_ADMIN to traceroute (#5275)

Signed-off-by: zhangzujian <zhangzujian.7@gmail.com>

10367 of 47497 relevant lines covered (21.83%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/daemon/tproxy_linux.go
1
package daemon
2

3
import (
4
        "errors"
5
        "fmt"
6
        "io"
7
        "net"
8
        "strconv"
9
        "sync"
10
        "syscall"
11

12
        "github.com/containernetworking/plugins/pkg/ns"
13
        "github.com/vishvananda/netlink"
14
        "golang.org/x/sys/unix"
15
        corev1 "k8s.io/api/core/v1"
16
        "k8s.io/apimachinery/pkg/util/intstr"
17
        "k8s.io/klog/v2"
18
        "k8s.io/utils/ptr"
19
        "k8s.io/utils/set"
20

21
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
22
        "github.com/kubeovn/kube-ovn/pkg/ovs"
23
        goTProxy "github.com/kubeovn/kube-ovn/pkg/tproxy"
24
        "github.com/kubeovn/kube-ovn/pkg/util"
25
)
26

27
var (
28
        customVPCPodIPToNs         sync.Map
29
        customVPCPodTCPProbeIPPort sync.Map
30
)
31

32
func (c *Controller) StartTProxyForwarding() {
×
33
        for _, addr := range util.GetDefaultListenAddr() {
×
34
                protocol := "tcp"
×
35
                if util.CheckProtocol(addr) == kubeovnv1.ProtocolIPv6 {
×
36
                        protocol = "tcp6"
×
37
                }
×
38

39
                go func() {
×
40
                        tcpListener, err := goTProxy.ListenTCP(protocol, &net.TCPAddr{IP: net.ParseIP(addr), Port: util.TProxyListenPort})
×
41
                        if err != nil {
×
42
                                klog.Fatalf("Encountered error while binding listener: %s", err)
×
43
                                return
×
44
                        }
×
45

46
                        defer func() {
×
47
                                if err := tcpListener.Close(); err != nil {
×
48
                                        klog.Errorf("Error tcpListener Close err: %v", err)
×
49
                                }
×
50
                        }()
51

52
                        for {
×
53
                                conn, err := tcpListener.Accept()
×
54
                                if err != nil {
×
55
                                        klog.Fatalf("Unrecoverable error while accepting connection: %s", err)
×
56
                                        return
×
57
                                }
×
58
                                go handleRedirectFlow(conn)
×
59
                        }
60
                }()
61
        }
62
}
63

64
func getProbePorts(pod *corev1.Pod) set.Set[int32] {
×
65
        ports := set.New[int32]()
×
66
        for _, container := range pod.Spec.Containers {
×
67
                for _, probe := range [...]*corev1.Probe{container.LivenessProbe, container.ReadinessProbe} {
×
68
                        if probe == nil {
×
69
                                continue
×
70
                        }
71
                        var port intstr.IntOrString
×
72
                        switch {
×
73
                        case probe.TCPSocket != nil:
×
74
                                port = probe.TCPSocket.Port
×
75
                        case probe.HTTPGet != nil:
×
76
                                port = probe.HTTPGet.Port
×
77
                        case probe.GRPC != nil:
×
78
                                port = intstr.FromInt32(probe.GRPC.Port)
×
79
                        default:
×
80
                                continue
×
81
                        }
82
                        if port.Type == intstr.Int {
×
83
                                ports.Insert(port.IntVal)
×
84
                                continue
×
85
                        }
86
                        for _, p := range container.Ports {
×
87
                                if p.Name == port.StrVal {
×
88
                                        ports.Insert(p.ContainerPort)
×
89
                                        break
×
90
                                }
91
                        }
92
                }
93
        }
94

95
        ports.Delete(0)
×
96
        klog.V(3).Infof("probe ports for pod %s/%s: %v", pod.Namespace, pod.Name, ports.SortedList())
×
97
        return ports
×
98
}
99

100
func (c *Controller) StartTProxyTCPPortProbe() {
×
101
        pods, err := c.getTProxyConditionPod(false)
×
102
        if err != nil {
×
103
                return
×
104
        }
×
105

106
        for _, pod := range pods {
×
107
                podName := pod.Name
×
108
                if vmName := pod.Annotations[util.VMAnnotation]; vmName != "" {
×
109
                        podName = vmName
×
110
                }
×
111
                iface := ovs.PodNameToPortName(podName, pod.Namespace, util.OvnProvider)
×
112
                nsName, err := ovs.GetInterfacePodNs(iface)
×
113
                if err != nil {
×
114
                        klog.Errorf("failed to get netns for pod %s/%s: %v", pod.Namespace, pod.Name, err)
×
115
                        continue
×
116
                }
117
                if nsName == "" {
×
118
                        klog.Infof("netns for pod %s/%s not found", pod.Namespace, pod.Name)
×
119
                        continue
×
120
                }
121

122
                ports := getProbePorts(pod)
×
123
                for _, podIP := range pod.Status.PodIPs {
×
124
                        customVPCPodIPToNs.Store(podIP.IP, nsName)
×
125
                        for _, port := range ports.UnsortedList() {
×
126
                                probePortInNs(podIP.IP, port, true, nil)
×
127
                        }
×
128
                }
129
        }
130
}
131

132
func (c *Controller) runTProxyConfigWorker() {
×
133
        protocols := getProtocols(c.protocol)
×
134
        for _, protocol := range protocols {
×
135
                c.reconcileTProxyRoutes(protocol)
×
136
        }
×
137
}
138

139
func (c *Controller) reconcileTProxyRoutes(protocol string) {
×
140
        family, err := util.ProtocolToFamily(protocol)
×
141
        if err != nil {
×
142
                klog.Errorf("get Protocol %s family failed", protocol)
×
143
                return
×
144
        }
×
145

146
        if err := addRuleIfNotExist(family, TProxyOutputMark, TProxyOutputMask, util.TProxyRouteTable); err != nil {
×
147
                klog.Errorf("add output rule failed: %v", err)
×
148
                return
×
149
        }
×
150

151
        if err := addRuleIfNotExist(family, TProxyPreroutingMark, TProxyPreroutingMask, util.TProxyRouteTable); err != nil {
×
152
                klog.Errorf("add prerouting rule failed: %v", err)
×
153
                return
×
154
        }
×
155

156
        dst := GetDefaultRouteDst(protocol)
×
157
        if err := addRouteIfNotExist(family, util.TProxyRouteTable, &dst); err != nil {
×
158
                klog.Errorf("add tproxy route failed: %v", err)
×
159
                return
×
160
        }
×
161
}
162

163
func (c *Controller) cleanTProxyConfig() {
×
164
        protocols := getProtocols(c.protocol)
×
165
        for _, protocol := range protocols {
×
166
                c.cleanTProxyRoutes(protocol)
×
167
                c.cleanTProxyIPTableRules(protocol)
×
168
        }
×
169
}
170

171
func (c *Controller) cleanTProxyRoutes(protocol string) {
×
172
        family, err := util.ProtocolToFamily(protocol)
×
173
        if err != nil {
×
174
                klog.Errorf("get Protocol %s family failed", protocol)
×
175
                return
×
176
        }
×
177

178
        if err := deleteRuleIfExists(family, TProxyOutputMark); err != nil {
×
179
                klog.Errorf("delete tproxy route rule mark %v failed err: %v", TProxyOutputMark, err)
×
180
        }
×
181

182
        if err := deleteRuleIfExists(family, TProxyPreroutingMark); err != nil {
×
183
                klog.Errorf("delete tproxy route rule mark %v failed err: %v", TProxyPreroutingMark, err)
×
184
        }
×
185

186
        dst := GetDefaultRouteDst(protocol)
×
187
        if err := delRouteIfExist(family, util.TProxyRouteTable, &dst); err != nil {
×
188
                klog.Errorf("delete tproxy route rule mark %v failed err: %v", TProxyPreroutingMark, err)
×
189
        }
×
190
}
191

192
func addRuleIfNotExist(family int, mark, mask uint32, table int) error {
×
193
        curRules, err := netlink.RuleListFiltered(family, &netlink.Rule{Mark: mark}, netlink.RT_FILTER_MARK)
×
194
        if err != nil {
×
195
                return fmt.Errorf("list rules with mark %x failed err: %w", mark, err)
×
196
        }
×
197

198
        if len(curRules) != 0 {
×
199
                return nil
×
200
        }
×
201

202
        rule := netlink.NewRule()
×
203
        rule.Mark = mark
×
204
        rule.Mask = ptr.To(mask)
×
205
        rule.Table = table
×
206
        rule.Family = family
×
207

×
208
        if err = netlink.RuleAdd(rule); err != nil && !errors.Is(err, syscall.EEXIST) {
×
209
                klog.Errorf("add rule %v failed with err %v", rule, err)
×
210
                return err
×
211
        }
×
212

213
        return nil
×
214
}
215

216
func deleteRuleIfExists(family int, mark uint32) error {
×
217
        curRules, err := netlink.RuleListFiltered(family, &netlink.Rule{Mark: mark}, netlink.RT_FILTER_MARK)
×
218
        if err != nil {
×
219
                return fmt.Errorf("list rules with mark %x failed err: %w", mark, err)
×
220
        }
×
221

222
        if len(curRules) != 0 {
×
223
                for _, r := range curRules {
×
224
                        if err := netlink.RuleDel(&r); err != nil && !errors.Is(err, syscall.ENOENT) {
×
225
                                return fmt.Errorf("delete rule %v failed with err: %w", r, err)
×
226
                        }
×
227
                }
228
        }
229
        return nil
×
230
}
231

232
func addRouteIfNotExist(family, table int, dst *net.IPNet) error {
×
233
        curRoutes, err := netlink.RouteListFiltered(family, &netlink.Route{Table: table, Dst: dst}, netlink.RT_FILTER_TABLE|netlink.RT_FILTER_DST)
×
234
        if err != nil {
×
235
                return fmt.Errorf("list routes with table %d failed with err: %w", table, err)
×
236
        }
×
237

238
        if len(curRoutes) != 0 {
×
239
                return nil
×
240
        }
×
241

242
        link, err := netlink.LinkByName("lo")
×
243
        if err != nil {
×
244
                return errors.New("can't find device lo")
×
245
        }
×
246

247
        route := netlink.Route{
×
248
                LinkIndex: link.Attrs().Index,
×
249
                Dst:       dst,
×
250
                Table:     table,
×
251
                Scope:     unix.RT_SCOPE_HOST,
×
252
                Type:      unix.RTN_LOCAL,
×
253
        }
×
254

×
255
        if err = netlink.RouteReplace(&route); err != nil && !errors.Is(err, syscall.EEXIST) {
×
256
                klog.Errorf("add route %v failed with err %v", route, err)
×
257
                return err
×
258
        }
×
259

260
        return nil
×
261
}
262

263
func delRouteIfExist(family, table int, dst *net.IPNet) error {
×
264
        curRoutes, err := netlink.RouteListFiltered(family, &netlink.Route{Table: table}, netlink.RT_FILTER_TABLE)
×
265
        if err != nil {
×
266
                klog.Errorf("list routes with table %d failed with err: %v", table, err)
×
267
                return err
×
268
        }
×
269

270
        if len(curRoutes) == 0 {
×
271
                return nil
×
272
        }
×
273

274
        link, err := netlink.LinkByName("lo")
×
275
        if err != nil {
×
276
                return errors.New("can't find device lo")
×
277
        }
×
278

279
        route := netlink.Route{
×
280
                LinkIndex: link.Attrs().Index,
×
281
                Dst:       dst,
×
282
                Table:     table,
×
283
                Scope:     unix.RT_SCOPE_HOST,
×
284
                Type:      unix.RTN_LOCAL,
×
285
        }
×
286

×
287
        if err = netlink.RouteDel(&route); err != nil && !errors.Is(err, syscall.ENOENT) {
×
288
                klog.Errorf("del route %v failed with err %v", route, err)
×
289
                return err
×
290
        }
×
291

292
        return nil
×
293
}
294

295
func handleRedirectFlow(conn net.Conn) {
×
296
        klog.V(5).Infof("accepting TCP connection from %s to %s", conn.RemoteAddr(), conn.LocalAddr())
×
297
        defer func() {
×
298
                if err := conn.Close(); err != nil {
×
299
                        klog.Errorf("conn Close err: %v", err)
×
300
                }
×
301
        }()
302

303
        podIPPort := conn.LocalAddr().String()
×
304
        podIP, probePort, err := net.SplitHostPort(podIPPort)
×
305
        if err != nil {
×
306
                klog.Errorf("Get %s Pod IP and Port failed err: %v", podIPPort, err)
×
307
                return
×
308
        }
×
309

310
        port, err := strconv.ParseInt(probePort, 10, 32)
×
311
        if err != nil {
×
312
                klog.Errorf("failed to parse port number %q: %v", probePort, err)
×
313
                return
×
314
        }
×
315

316
        probePortInNs(podIP, int32(port), false, conn) // #nosec G115
×
317
}
318

319
func probePortInNs(podIP string, probePort int32, isTProxyProbe bool, conn net.Conn) {
×
320
        podNs, ok := customVPCPodIPToNs.Load(podIP)
×
321
        if !ok {
×
322
                klog.V(3).Infof("failed to get netns for pod with ip %s", podIP)
×
323
                return
×
324
        }
×
325

326
        podNS, err := ns.GetNS(podNs.(string))
×
327
        if err != nil {
×
328
                customVPCPodIPToNs.Delete(podIP)
×
329
                klog.V(3).Infof("netns %s not found", podNs)
×
330
                return
×
331
        }
×
332

333
        _ = ns.WithNetNSPath(podNS.Path(), func(_ ns.NetNS) error {
×
334
                // Packet's src and dst IP are both PodIP in netns
×
335
                localpodTCPAddr := net.TCPAddr{IP: net.ParseIP(podIP)}
×
336
                remotepodTCPAddr := net.TCPAddr{IP: net.ParseIP(podIP), Port: int(probePort)}
×
337

×
338
                remoteConn, err := goTProxy.DialTCP(&localpodTCPAddr, &remotepodTCPAddr, !isTProxyProbe)
×
339
                if err != nil {
×
340
                        if isTProxyProbe {
×
341
                                customVPCPodTCPProbeIPPort.Store(util.JoinHostPort(podIP, probePort), false)
×
342
                        }
×
343
                        return nil
×
344
                }
345

346
                if isTProxyProbe {
×
347
                        customVPCPodTCPProbeIPPort.Store(util.JoinHostPort(podIP, probePort), true)
×
348
                        return nil
×
349
                }
×
350

351
                defer func() {
×
352
                        if err := remoteConn.Close(); err != nil {
×
353
                                klog.Errorf("remoteConn %v Close err: %v", remoteConn, err)
×
354
                        }
×
355
                }()
356

357
                var streamWait sync.WaitGroup
×
358
                streamWait.Add(2)
×
359

×
360
                streamConn := func(dst io.Writer, src io.Reader) {
×
361
                        if _, err := io.Copy(dst, src); err != nil {
×
362
                                klog.Errorf("copy stream from dst %v to src %v failed err: %v", dst, src, err)
×
363
                        }
×
364

365
                        streamWait.Done()
×
366
                }
367

368
                go streamConn(remoteConn, conn)
×
369
                go streamConn(conn, remoteConn)
×
370

×
371
                streamWait.Wait()
×
372
                return nil
×
373
        })
374
}
375

376
func getProtocols(protocol string) []string {
×
377
        var protocols []string
×
378
        if protocol == kubeovnv1.ProtocolDual {
×
379
                protocols = append(protocols, kubeovnv1.ProtocolIPv4)
×
380
                protocols = append(protocols, kubeovnv1.ProtocolIPv6)
×
381
        } else {
×
382
                protocols = append(protocols, protocol)
×
383
        }
×
384
        return protocols
×
385
}
386

387
func GetDefaultRouteDst(protocol string) net.IPNet {
×
388
        var dst net.IPNet
×
389
        switch protocol {
×
390
        case kubeovnv1.ProtocolIPv4:
×
391
                dst = net.IPNet{
×
392
                        IP:   net.IPv4zero,
×
393
                        Mask: net.CIDRMask(0, 0),
×
394
                }
×
395
        case kubeovnv1.ProtocolIPv6:
×
396
                dst = net.IPNet{
×
397
                        IP:   net.IPv6zero,
×
398
                        Mask: net.CIDRMask(0, 0),
×
399
                }
×
400
        }
401
        return dst
×
402
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc