• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 20688199896

04 Jan 2026 05:25AM UTC coverage: 21.424% (-0.03%) from 21.453%
20688199896

push

github

changluyi
Add open flow sync refer to ovn-k8s (#6117)

* add openflow sync refer to ovn-k8s

Signed-off-by: clyi <clyi@alauda.io>

0 of 180 new or added lines in 5 files covered. (0.0%)

1 existing line in 1 file now uncovered.

10637 of 49650 relevant lines covered (21.42%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/daemon/flow_sync_linux.go
1
package daemon
2

3
import (
4
        "time"
5

6
        openvswitch "github.com/digitalocean/go-openvswitch/ovs"
7
        "k8s.io/apimachinery/pkg/util/sets"
8
        "k8s.io/klog/v2"
9

10
        "github.com/kubeovn/kube-ovn/pkg/ovs"
11
        "github.com/kubeovn/kube-ovn/pkg/util"
12
)
13

14
const flowSyncPeriod = 15 * time.Second
15

16
var managedFlowCookieSet = sets.New[uint64](
17
        util.UnderlaySvcLocalOpenFlowCookieV4,
18
        util.UnderlaySvcLocalOpenFlowCookieV6,
19
)
20

NEW
21
func (c *Controller) requestFlowSync() {
×
NEW
22
        if c.flowChan == nil {
×
NEW
23
                util.LogFatalAndExit(nil, "flowChan is not initialized")
×
NEW
24
        }
×
25

NEW
26
        select {
×
NEW
27
        case c.flowChan <- struct{}{}:
×
NEW
28
                klog.V(5).Info("OpenFlow sync requested")
×
NEW
29
        default:
×
NEW
30
                klog.V(5).Info("OpenFlow sync already requested")
×
31
        }
32
}
33

NEW
34
func (c *Controller) syncFlows() {
×
NEW
35
        if c.ovsClient == nil {
×
NEW
36
                util.LogFatalAndExit(nil, "ovsClient is not initialized")
×
NEW
37
        }
×
38

NEW
39
        flowCacheByBridge := c.storeFlowCache()
×
NEW
40

×
NEW
41
        bridges, err := ovs.Bridges()
×
NEW
42
        if err != nil {
×
NEW
43
                klog.Errorf("failed to list bridges: %v", err)
×
NEW
44
                return
×
NEW
45
        }
×
46

NEW
47
        for _, bridgeName := range bridges {
×
NEW
48
                existing, err := ovs.DumpFlows(c.ovsClient, bridgeName)
×
NEW
49
                if err != nil {
×
NEW
50
                        klog.Errorf("failed to dump flows for bridge %s: %v", bridgeName, err)
×
NEW
51
                        continue
×
52
                }
53

NEW
54
                preserved := filterUnmanagedFlows(existing)
×
NEW
55
                cachedFlows := flowCacheByBridge[bridgeName]
×
NEW
56
                finalFlows := append(preserved, cachedFlows...)
×
NEW
57

×
NEW
58
                if err := ovs.ReplaceFlows(bridgeName, finalFlows); err != nil {
×
NEW
59
                        klog.Errorf("failed to replace flows for bridge %s: %v", bridgeName, err)
×
NEW
60
                        continue
×
61
                }
NEW
62
                if len(cachedFlows) == 0 {
×
NEW
63
                        klog.V(5).Infof("no cached flows for bridge %s", bridgeName)
×
NEW
64
                        continue
×
65
                }
NEW
66
                klog.V(3).Infof("synced %d cached flows on bridge %s", len(cachedFlows), bridgeName)
×
67
        }
68
}
69

NEW
70
func (c *Controller) storeFlowCache() map[string][]string {
×
NEW
71
        snapshot := make(map[string][]string)
×
NEW
72

×
NEW
73
        c.flowCacheMutex.RLock()
×
NEW
74
        defer c.flowCacheMutex.RUnlock()
×
NEW
75

×
NEW
76
        for bridgeName, entries := range c.flowCache {
×
NEW
77
                for _, flows := range entries {
×
NEW
78
                        snapshot[bridgeName] = append(snapshot[bridgeName], flows...)
×
NEW
79
                }
×
80
        }
81

NEW
82
        return snapshot
×
83
}
84

NEW
85
func filterUnmanagedFlows(flows []string) []string {
×
NEW
86
        filtered := make([]string, 0, len(flows))
×
NEW
87
        for _, flow := range flows {
×
NEW
88
                if isManagedFlow(flow) {
×
NEW
89
                        continue
×
90
                }
NEW
91
                filtered = append(filtered, flow)
×
92
        }
NEW
93
        return filtered
×
94
}
95

NEW
96
func isManagedFlow(flow string) bool {
×
NEW
97
        var f openvswitch.Flow
×
NEW
98
        if err := f.UnmarshalText([]byte(flow)); err != nil {
×
NEW
99
                return false
×
NEW
100
        }
×
NEW
101
        return managedFlowCookieSet.Has(f.Cookie)
×
102
}
103

NEW
104
func (c *Controller) runFlowSync(stopCh <-chan struct{}) {
×
NEW
105
        klog.Info("Starting OpenFlow sync loop")
×
NEW
106

×
NEW
107
        ticker := time.NewTicker(flowSyncPeriod)
×
NEW
108
        defer ticker.Stop()
×
NEW
109
        for {
×
NEW
110
                select {
×
NEW
111
                case <-ticker.C:
×
NEW
112
                        c.syncFlows()
×
NEW
113
                case <-c.flowChan:
×
NEW
114
                        klog.V(5).Info("Immediate OpenFlow sync triggered")
×
NEW
115
                        c.syncFlows()
×
NEW
116
                        ticker.Reset(flowSyncPeriod)
×
NEW
117
                case <-stopCh:
×
NEW
118
                        klog.Info("Stopping OpenFlow sync loop")
×
NEW
119
                        return
×
120
                }
121
        }
122
}
123

NEW
124
func (c *Controller) setFlowCache(cache map[string]map[string][]string, bridgeName, key string, flows []string) {
×
NEW
125
        c.flowCacheMutex.Lock()
×
NEW
126
        defer c.flowCacheMutex.Unlock()
×
NEW
127

×
NEW
128
        if cache[bridgeName] == nil {
×
NEW
129
                cache[bridgeName] = make(map[string][]string)
×
NEW
130
        }
×
NEW
131
        cache[bridgeName][key] = flows
×
132
}
133

NEW
134
func (c *Controller) deleteFlowCache(cache map[string]map[string][]string, bridgeName, key string) {
×
NEW
135
        c.flowCacheMutex.Lock()
×
NEW
136
        defer c.flowCacheMutex.Unlock()
×
NEW
137
        delete(cache[bridgeName], key)
×
NEW
138
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc