• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 13853401919

14 Mar 2025 09:12AM UTC coverage: 22.003% (-0.009%) from 22.012%
13853401919

push

github

web-flow
bind to pod ips when env variable ENABLE_BIND_LOCAL_IP is set to true (#5049)

Signed-off-by: zhangzujian <zhangzujian.7@gmail.com>

4 of 37 new or added lines in 4 files covered. (10.81%)

312 existing lines in 4 files now uncovered.

10261 of 46634 relevant lines covered (22.0%)

0.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/daemon/tproxy_linux.go
1
package daemon
2

3
import (
4
        "errors"
5
        "fmt"
6
        "io"
7
        "net"
8
        "strconv"
9
        "sync"
10
        "syscall"
11

12
        "github.com/containernetworking/plugins/pkg/ns"
13
        "github.com/vishvananda/netlink"
14
        "golang.org/x/sys/unix"
15
        corev1 "k8s.io/api/core/v1"
16
        "k8s.io/apimachinery/pkg/util/intstr"
17
        "k8s.io/klog/v2"
18
        "k8s.io/utils/ptr"
19
        "k8s.io/utils/set"
20

21
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
22
        "github.com/kubeovn/kube-ovn/pkg/ovs"
23
        goTProxy "github.com/kubeovn/kube-ovn/pkg/tproxy"
24
        "github.com/kubeovn/kube-ovn/pkg/util"
25
)
26

27
var (
28
        customVPCPodIPToNs         sync.Map
29
        customVPCPodTCPProbeIPPort sync.Map
30
)
31

32
func (c *Controller) StartTProxyForwarding() {
×
NEW
33
        for _, addr := range util.GetDefaultListenAddr() {
×
NEW
34
                protocol := "tcp"
×
NEW
35
                if util.CheckProtocol(addr) == kubeovnv1.ProtocolIPv6 {
×
NEW
36
                        protocol = "tcp6"
×
NEW
37
                }
×
38

NEW
39
                go func() {
×
NEW
40
                        tcpListener, err := goTProxy.ListenTCP(protocol, &net.TCPAddr{IP: net.ParseIP(addr), Port: util.TProxyListenPort})
×
NEW
41
                        if err != nil {
×
NEW
42
                                klog.Fatalf("Encountered error while binding listener: %s", err)
×
NEW
43
                                return
×
NEW
44
                        }
×
45

NEW
46
                        defer func() {
×
NEW
47
                                if err := tcpListener.Close(); err != nil {
×
NEW
48
                                        klog.Errorf("Error tcpListener Close err: %v", err)
×
NEW
49
                                }
×
50
                        }()
51

NEW
52
                        for {
×
NEW
53
                                conn, err := tcpListener.Accept()
×
NEW
54
                                if err != nil {
×
NEW
55
                                        klog.Fatalf("Unrecoverable error while accepting connection: %s", err)
×
NEW
56
                                        return
×
NEW
57
                                }
×
NEW
58
                                go handleRedirectFlow(conn)
×
59
                        }
60
                }()
61
        }
62
}
63

64
func getProbePorts(pod *corev1.Pod) set.Set[int32] {
×
65
        ports := set.New[int32]()
×
66
        for _, container := range pod.Spec.Containers {
×
67
                for _, probe := range [...]*corev1.Probe{container.LivenessProbe, container.ReadinessProbe} {
×
68
                        if probe == nil {
×
69
                                continue
×
70
                        }
71
                        var port intstr.IntOrString
×
72
                        switch {
×
73
                        case probe.TCPSocket != nil:
×
74
                                port = probe.TCPSocket.Port
×
75
                        case probe.HTTPGet != nil:
×
76
                                port = probe.HTTPGet.Port
×
77
                        case probe.GRPC != nil:
×
78
                                port = intstr.FromInt32(probe.GRPC.Port)
×
79
                        default:
×
80
                                continue
×
81
                        }
82
                        if port.Type == intstr.Int {
×
83
                                ports.Insert(port.IntVal)
×
84
                                continue
×
85
                        }
86
                        for _, p := range container.Ports {
×
87
                                if p.Name == port.StrVal {
×
88
                                        ports.Insert(p.ContainerPort)
×
89
                                        break
×
90
                                }
91
                        }
92
                }
93
        }
94

95
        ports.Delete(0)
×
NEW
96
        klog.V(3).Infof("probe ports for pod %s/%s: %v", pod.Namespace, pod.Name, ports.SortedList())
×
97
        return ports
×
98
}
99

100
func (c *Controller) StartTProxyTCPPortProbe() {
×
101
        pods, err := c.getTProxyConditionPod(false)
×
102
        if err != nil {
×
103
                return
×
104
        }
×
105

106
        for _, pod := range pods {
×
107
                iface := ovs.PodNameToPortName(pod.Name, pod.Namespace, util.OvnProvider)
×
108
                nsName, err := ovs.GetInterfacePodNs(iface)
×
109
                if err != nil {
×
110
                        klog.Errorf("failed to get netns for pod %s/%s: %v", pod.Namespace, pod.Name, err)
×
111
                        continue
×
112
                }
113
                if nsName == "" {
×
114
                        klog.Infof("netns for pod %s/%s not found", pod.Namespace, pod.Name)
×
115
                        continue
×
116
                }
117

118
                ports := getProbePorts(pod)
×
119
                for _, podIP := range pod.Status.PodIPs {
×
120
                        customVPCPodIPToNs.Store(podIP.IP, nsName)
×
121
                        for _, port := range ports.UnsortedList() {
×
122
                                probePortInNs(podIP.IP, port, true, nil)
×
123
                        }
×
124
                }
125
        }
126
}
127

128
func (c *Controller) runTProxyConfigWorker() {
×
129
        protocols := getProtocols(c.protocol)
×
130
        for _, protocol := range protocols {
×
131
                c.reconcileTProxyRoutes(protocol)
×
132
        }
×
133
}
134

135
func (c *Controller) reconcileTProxyRoutes(protocol string) {
×
136
        family, err := util.ProtocolToFamily(protocol)
×
137
        if err != nil {
×
138
                klog.Errorf("get Protocol %s family failed", protocol)
×
139
                return
×
140
        }
×
141

142
        if err := addRuleIfNotExist(family, TProxyOutputMark, TProxyOutputMask, util.TProxyRouteTable); err != nil {
×
143
                klog.Errorf("add output rule failed: %v", err)
×
144
                return
×
145
        }
×
146

147
        if err := addRuleIfNotExist(family, TProxyPreroutingMark, TProxyPreroutingMask, util.TProxyRouteTable); err != nil {
×
148
                klog.Errorf("add prerouting rule failed: %v", err)
×
149
                return
×
150
        }
×
151

152
        dst := GetDefaultRouteDst(protocol)
×
153
        if err := addRouteIfNotExist(family, util.TProxyRouteTable, &dst); err != nil {
×
154
                klog.Errorf("add tproxy route failed: %v", err)
×
155
                return
×
156
        }
×
157
}
158

159
func (c *Controller) cleanTProxyConfig() {
×
160
        protocols := getProtocols(c.protocol)
×
161
        for _, protocol := range protocols {
×
162
                c.cleanTProxyRoutes(protocol)
×
163
                c.cleanTProxyIPTableRules(protocol)
×
164
        }
×
165
}
166

167
func (c *Controller) cleanTProxyRoutes(protocol string) {
×
168
        family, err := util.ProtocolToFamily(protocol)
×
169
        if err != nil {
×
170
                klog.Errorf("get Protocol %s family failed", protocol)
×
171
                return
×
172
        }
×
173

174
        if err := deleteRuleIfExists(family, TProxyOutputMark); err != nil {
×
175
                klog.Errorf("delete tproxy route rule mark %v failed err: %v", TProxyOutputMark, err)
×
176
        }
×
177

178
        if err := deleteRuleIfExists(family, TProxyPreroutingMark); err != nil {
×
179
                klog.Errorf("delete tproxy route rule mark %v failed err: %v", TProxyPreroutingMark, err)
×
180
        }
×
181

182
        dst := GetDefaultRouteDst(protocol)
×
183
        if err := delRouteIfExist(family, util.TProxyRouteTable, &dst); err != nil {
×
184
                klog.Errorf("delete tproxy route rule mark %v failed err: %v", TProxyPreroutingMark, err)
×
185
        }
×
186
}
187

188
func addRuleIfNotExist(family int, mark, mask uint32, table int) error {
×
189
        curRules, err := netlink.RuleListFiltered(family, &netlink.Rule{Mark: mark}, netlink.RT_FILTER_MARK)
×
190
        if err != nil {
×
191
                return fmt.Errorf("list rules with mark %x failed err: %w", mark, err)
×
192
        }
×
193

194
        if len(curRules) != 0 {
×
195
                return nil
×
196
        }
×
197

198
        rule := netlink.NewRule()
×
199
        rule.Mark = mark
×
200
        rule.Mask = ptr.To(mask)
×
201
        rule.Table = table
×
202
        rule.Family = family
×
203

×
204
        if err = netlink.RuleAdd(rule); err != nil && !errors.Is(err, syscall.EEXIST) {
×
205
                klog.Errorf("add rule %v failed with err %v", rule, err)
×
206
                return err
×
207
        }
×
208

209
        return nil
×
210
}
211

212
func deleteRuleIfExists(family int, mark uint32) error {
×
213
        curRules, err := netlink.RuleListFiltered(family, &netlink.Rule{Mark: mark}, netlink.RT_FILTER_MARK)
×
214
        if err != nil {
×
215
                return fmt.Errorf("list rules with mark %x failed err: %w", mark, err)
×
216
        }
×
217

218
        if len(curRules) != 0 {
×
219
                for _, r := range curRules {
×
220
                        if err := netlink.RuleDel(&r); err != nil && !errors.Is(err, syscall.ENOENT) {
×
221
                                return fmt.Errorf("delete rule %v failed with err: %w", r, err)
×
222
                        }
×
223
                }
224
        }
225
        return nil
×
226
}
227

228
func addRouteIfNotExist(family, table int, dst *net.IPNet) error {
×
229
        curRoutes, err := netlink.RouteListFiltered(family, &netlink.Route{Table: table, Dst: dst}, netlink.RT_FILTER_TABLE|netlink.RT_FILTER_DST)
×
230
        if err != nil {
×
231
                return fmt.Errorf("list routes with table %d failed with err: %w", table, err)
×
232
        }
×
233

234
        if len(curRoutes) != 0 {
×
235
                return nil
×
236
        }
×
237

238
        link, err := netlink.LinkByName("lo")
×
239
        if err != nil {
×
240
                return errors.New("can't find device lo")
×
241
        }
×
242

243
        route := netlink.Route{
×
244
                LinkIndex: link.Attrs().Index,
×
245
                Dst:       dst,
×
246
                Table:     table,
×
247
                Scope:     unix.RT_SCOPE_HOST,
×
248
                Type:      unix.RTN_LOCAL,
×
249
        }
×
250

×
251
        if err = netlink.RouteReplace(&route); err != nil && !errors.Is(err, syscall.EEXIST) {
×
252
                klog.Errorf("add route %v failed with err %v", route, err)
×
253
                return err
×
254
        }
×
255

256
        return nil
×
257
}
258

259
func delRouteIfExist(family, table int, dst *net.IPNet) error {
×
260
        curRoutes, err := netlink.RouteListFiltered(family, &netlink.Route{Table: table}, netlink.RT_FILTER_TABLE)
×
261
        if err != nil {
×
262
                klog.Errorf("list routes with table %d failed with err: %v", table, err)
×
263
                return err
×
264
        }
×
265

266
        if len(curRoutes) == 0 {
×
267
                return nil
×
268
        }
×
269

270
        link, err := netlink.LinkByName("lo")
×
271
        if err != nil {
×
272
                return errors.New("can't find device lo")
×
273
        }
×
274

275
        route := netlink.Route{
×
276
                LinkIndex: link.Attrs().Index,
×
277
                Dst:       dst,
×
278
                Table:     table,
×
279
                Scope:     unix.RT_SCOPE_HOST,
×
280
                Type:      unix.RTN_LOCAL,
×
281
        }
×
282

×
283
        if err = netlink.RouteDel(&route); err != nil && !errors.Is(err, syscall.ENOENT) {
×
284
                klog.Errorf("del route %v failed with err %v", route, err)
×
285
                return err
×
286
        }
×
287

288
        return nil
×
289
}
290

291
func handleRedirectFlow(conn net.Conn) {
×
292
        klog.V(5).Infof("accepting TCP connection from %s to %s", conn.RemoteAddr(), conn.LocalAddr())
×
293
        defer func() {
×
294
                if err := conn.Close(); err != nil {
×
295
                        klog.Errorf("conn Close err: %v", err)
×
296
                }
×
297
        }()
298

299
        podIPPort := conn.LocalAddr().String()
×
300
        podIP, probePort, err := net.SplitHostPort(podIPPort)
×
301
        if err != nil {
×
302
                klog.Errorf("Get %s Pod IP and Port failed err: %v", podIPPort, err)
×
303
                return
×
304
        }
×
305

306
        port, err := strconv.ParseInt(probePort, 10, 32)
×
307
        if err != nil {
×
308
                klog.Errorf("failed to parse port number %q: %v", probePort, err)
×
309
                return
×
310
        }
×
311

312
        probePortInNs(podIP, int32(port), false, conn) // #nosec G115
×
313
}
314

315
func probePortInNs(podIP string, probePort int32, isTProxyProbe bool, conn net.Conn) {
×
316
        podNs, ok := customVPCPodIPToNs.Load(podIP)
×
317
        if !ok {
×
318
                klog.V(3).Infof("failed to get netns for pod with ip %s", podIP)
×
319
                return
×
320
        }
×
321

322
        podNS, err := ns.GetNS(podNs.(string))
×
323
        if err != nil {
×
324
                customVPCPodIPToNs.Delete(podIP)
×
325
                klog.V(3).Infof("netns %s not found", podNs)
×
326
                return
×
327
        }
×
328

329
        _ = ns.WithNetNSPath(podNS.Path(), func(_ ns.NetNS) error {
×
330
                // Packet's src and dst IP are both PodIP in netns
×
331
                localpodTCPAddr := net.TCPAddr{IP: net.ParseIP(podIP)}
×
332
                remotepodTCPAddr := net.TCPAddr{IP: net.ParseIP(podIP), Port: int(probePort)}
×
333

×
334
                remoteConn, err := goTProxy.DialTCP(&localpodTCPAddr, &remotepodTCPAddr, !isTProxyProbe)
×
335
                if err != nil {
×
336
                        if isTProxyProbe {
×
337
                                customVPCPodTCPProbeIPPort.Store(util.JoinHostPort(podIP, probePort), false)
×
338
                        }
×
339
                        return nil
×
340
                }
341

342
                if isTProxyProbe {
×
343
                        customVPCPodTCPProbeIPPort.Store(util.JoinHostPort(podIP, probePort), true)
×
344
                        return nil
×
345
                }
×
346

347
                defer func() {
×
348
                        if err := remoteConn.Close(); err != nil {
×
349
                                klog.Errorf("remoteConn %v Close err: %v", remoteConn, err)
×
350
                        }
×
351
                }()
352

353
                var streamWait sync.WaitGroup
×
354
                streamWait.Add(2)
×
355

×
356
                streamConn := func(dst io.Writer, src io.Reader) {
×
357
                        if _, err := io.Copy(dst, src); err != nil {
×
358
                                klog.Errorf("copy stream from dst %v to src %v failed err: %v", dst, src, err)
×
359
                        }
×
360

361
                        streamWait.Done()
×
362
                }
363

364
                go streamConn(remoteConn, conn)
×
365
                go streamConn(conn, remoteConn)
×
366

×
367
                streamWait.Wait()
×
368
                return nil
×
369
        })
370
}
371

372
func getProtocols(protocol string) []string {
×
373
        var protocols []string
×
374
        if protocol == kubeovnv1.ProtocolDual {
×
375
                protocols = append(protocols, kubeovnv1.ProtocolIPv4)
×
376
                protocols = append(protocols, kubeovnv1.ProtocolIPv6)
×
377
        } else {
×
378
                protocols = append(protocols, protocol)
×
379
        }
×
380
        return protocols
×
381
}
382

383
func GetDefaultRouteDst(protocol string) net.IPNet {
×
384
        var dst net.IPNet
×
385
        if protocol == kubeovnv1.ProtocolIPv4 {
×
386
                dst = net.IPNet{
×
387
                        IP:   net.IPv4zero,
×
388
                        Mask: net.CIDRMask(0, 0),
×
389
                }
×
390
        } else if protocol == kubeovnv1.ProtocolIPv6 {
×
391
                dst = net.IPNet{
×
392
                        IP:   net.IPv6zero,
×
393
                        Mask: net.CIDRMask(0, 0),
×
394
                }
×
395
        }
×
396
        return dst
×
397
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc