• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 17228354799

26 Aug 2025 04:55AM UTC coverage: 21.341% (-0.2%) from 21.508%
17228354799

push

github

oilbeater
handle delete final state unknown object in enqueue handler (#5649)

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>

0 of 443 new or added lines in 27 files covered. (0.0%)

1 existing line in 1 file now uncovered.

10514 of 49267 relevant lines covered (21.34%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

5.9
/pkg/controller/security_group.go
1
package controller
2

3
import (
4
        "context"
5
        "encoding/hex"
6
        "errors"
7
        "fmt"
8
        "net"
9
        "reflect"
10
        "slices"
11
        "strings"
12

13
        "github.com/cnf/structhash"
14
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
15
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16
        "k8s.io/apimachinery/pkg/labels"
17
        "k8s.io/apimachinery/pkg/types"
18
        "k8s.io/client-go/tools/cache"
19
        "k8s.io/klog/v2"
20

21
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
22
        "github.com/kubeovn/kube-ovn/pkg/ovs"
23
        "github.com/kubeovn/kube-ovn/pkg/ovsdb/ovnnb"
24
        "github.com/kubeovn/kube-ovn/pkg/util"
25
)
26

27
func (c *Controller) enqueueAddSg(obj any) {
×
28
        key := cache.MetaObjectToName(obj.(*kubeovnv1.SecurityGroup)).String()
×
29
        klog.V(3).Infof("enqueue add securityGroup %s", key)
×
30
        c.addOrUpdateSgQueue.Add(key)
×
31
}
×
32

33
func (c *Controller) enqueueUpdateSg(oldObj, newObj any) {
×
34
        oldSg := oldObj.(*kubeovnv1.SecurityGroup)
×
35
        newSg := newObj.(*kubeovnv1.SecurityGroup)
×
36
        if !reflect.DeepEqual(oldSg.Spec, newSg.Spec) {
×
37
                key := cache.MetaObjectToName(newSg).String()
×
38
                klog.V(3).Infof("enqueue update securityGroup %s", key)
×
39
                c.addOrUpdateSgQueue.Add(key)
×
40
        }
×
41
}
42

43
func (c *Controller) enqueueDeleteSg(obj any) {
×
NEW
44
        var sg *kubeovnv1.SecurityGroup
×
NEW
45
        switch t := obj.(type) {
×
NEW
46
        case *kubeovnv1.SecurityGroup:
×
NEW
47
                sg = t
×
NEW
48
        case cache.DeletedFinalStateUnknown:
×
NEW
49
                s, ok := t.Obj.(*kubeovnv1.SecurityGroup)
×
NEW
50
                if !ok {
×
NEW
51
                        klog.Warningf("unexpected object type: %T", t.Obj)
×
NEW
52
                        return
×
NEW
53
                }
×
NEW
54
                sg = s
×
NEW
55
        default:
×
NEW
56
                klog.Warningf("unexpected type: %T", obj)
×
NEW
57
                return
×
58
        }
59

NEW
60
        key := cache.MetaObjectToName(sg).String()
×
61
        klog.V(3).Infof("enqueue delete securityGroup %s", key)
×
62
        c.delSgQueue.Add(key)
×
63
}
64

65
func (c *Controller) initDefaultDenyAllSecurityGroup() error {
×
66
        pgName := ovs.GetSgPortGroupName(util.DenyAllSecurityGroup)
×
67
        if err := c.OVNNbClient.CreatePortGroup(pgName, map[string]string{
×
68
                "type": "security_group",
×
69
                sgKey:  util.DenyAllSecurityGroup,
×
70
        }); err != nil {
×
71
                klog.Errorf("create port group for sg %s: %v", util.DenyAllSecurityGroup, err)
×
72
                return err
×
73
        }
×
74

75
        if err := c.OVNNbClient.CreateSgDenyAllACL(util.DenyAllSecurityGroup); err != nil {
×
76
                klog.Errorf("create deny all acl for sg %s: %v", util.DenyAllSecurityGroup, err)
×
77
                return err
×
78
        }
×
79

80
        c.addOrUpdateSgQueue.Add(util.DenyAllSecurityGroup)
×
81
        return nil
×
82
}
83

84
func (c *Controller) syncSecurityGroup() error {
×
85
        sgs, err := c.sgsLister.List(labels.Everything())
×
86
        if err != nil {
×
87
                klog.Errorf("failed to list security groups: %v", err)
×
88
                return err
×
89
        }
×
90
        for _, sg := range sgs {
×
91
                lost, err := c.OVNNbClient.SGLostACL(sg)
×
92
                if err != nil {
×
93
                        err = fmt.Errorf("failed to check if security group %s lost acl: %w", sg.Name, err)
×
94
                        klog.Error(err)
×
95
                        return err
×
96
                }
×
97
                if lost {
×
98
                        if err := c.handleAddOrUpdateSg(sg.Name, true); err != nil {
×
99
                                klog.Errorf("failed to sync security group %s: %v", sg.Name, err)
×
100
                        }
×
101
                }
102
        }
103
        return nil
×
104
}
105

106
// updateDenyAllSgPorts set lsp to deny which security_groups is not empty
107
func (c *Controller) updateDenyAllSgPorts() error {
×
108
        // list all lsp which security_groups is not empty
×
109
        lsps, err := c.OVNNbClient.ListNormalLogicalSwitchPorts(true, map[string]string{sgsKey: ""})
×
110
        if err != nil {
×
111
                klog.Errorf("list logical switch ports with security_groups is not empty: %v", err)
×
112
                return err
×
113
        }
×
114

115
        addPorts := make([]string, 0, len(lsps))
×
116
        for _, lsp := range lsps {
×
117
                /* skip lsp which security_group does not exist */
×
118
                // sgs format: sg1/sg2/sg3
×
119
                sgs := strings.Split(lsp.ExternalIDs[sgsKey], "/")
×
120
                allNotExist, err := c.securityGroupAllNotExist(sgs)
×
121
                if err != nil {
×
122
                        klog.Error(err)
×
123
                        return err
×
124
                }
×
125

126
                if allNotExist {
×
127
                        continue
×
128
                }
129

130
                addPorts = append(addPorts, lsp.Name)
×
131
        }
132
        pgName := ovs.GetSgPortGroupName(util.DenyAllSecurityGroup)
×
133

×
134
        klog.V(6).Infof("setting ports of port group %s to %v", pgName, addPorts)
×
135
        if err = c.OVNNbClient.PortGroupSetPorts(pgName, addPorts); err != nil {
×
136
                klog.Error(err)
×
137
                return err
×
138
        }
×
139

140
        return nil
×
141
}
142

143
func (c *Controller) handleAddOrUpdateSg(key string, force bool) error {
×
144
        c.sgKeyMutex.LockKey(key)
×
145
        defer func() { _ = c.sgKeyMutex.UnlockKey(key) }()
×
146
        klog.Infof("handle add/update security group %s", key)
×
147

×
148
        // set 'deny all' for port associated with security group
×
149
        if key == util.DenyAllSecurityGroup {
×
150
                if err := c.updateDenyAllSgPorts(); err != nil {
×
151
                        klog.Errorf("update sg deny all policy failed. %v", err)
×
152
                        return err
×
153
                }
×
154
        }
155

156
        cachedSg, err := c.sgsLister.Get(key)
×
157
        if err != nil {
×
158
                if k8serrors.IsNotFound(err) {
×
159
                        return nil
×
160
                }
×
161
                klog.Error(err)
×
162
                return err
×
163
        }
164
        sg := cachedSg.DeepCopy()
×
165

×
166
        if err = c.validateSgRule(sg); err != nil {
×
167
                klog.Error(err)
×
168
                return err
×
169
        }
×
170

171
        pgName := ovs.GetSgPortGroupName(sg.Name)
×
172
        if err := c.OVNNbClient.CreatePortGroup(pgName, map[string]string{
×
173
                "type": "security_group",
×
174
                sgKey:  sg.Name,
×
175
        }); err != nil {
×
176
                klog.Errorf("create port group for sg %s: %v", sg.Name, err)
×
177
                return err
×
178
        }
×
179

180
        v4AsName := ovs.GetSgV4AssociatedName(sg.Name)
×
181
        v6AsName := ovs.GetSgV6AssociatedName(sg.Name)
×
182
        externalIDs := map[string]string{
×
183
                sgKey: sg.Name,
×
184
        }
×
185

×
186
        if err = c.OVNNbClient.CreateAddressSet(v4AsName, externalIDs); err != nil {
×
187
                klog.Errorf("create address set %s for sg %s: %v", v4AsName, key, err)
×
188
                return err
×
189
        }
×
190

191
        if err = c.OVNNbClient.CreateAddressSet(v6AsName, externalIDs); err != nil {
×
192
                klog.Errorf("create address set %s for sg %s: %v", v6AsName, key, err)
×
193
                return err
×
194
        }
×
195

196
        var ingressNeedUpdate, egressNeedUpdate bool
×
197
        var newIngressMd5, newEgressMd5 string
×
198
        if force {
×
199
                klog.Infof("force update sg %s", sg.Name)
×
200
                ingressNeedUpdate = true
×
201
                egressNeedUpdate = true
×
202
        } else {
×
203
                // check md5
×
204
                newIngressMd5 = hex.EncodeToString(structhash.Md5(sg.Spec.IngressRules, 1))
×
205
                if !sg.Status.IngressLastSyncSuccess || newIngressMd5 != sg.Status.IngressMd5 {
×
206
                        klog.Infof("ingress need update, sg:%s", sg.Name)
×
207
                        ingressNeedUpdate = true
×
208
                }
×
209
                newEgressMd5 = hex.EncodeToString(structhash.Md5(sg.Spec.EgressRules, 1))
×
210
                if !sg.Status.EgressLastSyncSuccess || newEgressMd5 != sg.Status.EgressMd5 {
×
211
                        klog.Infof("egress need update, sg:%s", sg.Name)
×
212
                        egressNeedUpdate = true
×
213
                }
×
214

215
                // check allowSameGroupTraffic switch
216
                if sg.Status.AllowSameGroupTraffic != sg.Spec.AllowSameGroupTraffic {
×
217
                        klog.Infof("both ingress && egress need update, sg:%s", sg.Name)
×
218
                        ingressNeedUpdate = true
×
219
                        egressNeedUpdate = true
×
220
                }
×
221
        }
222

223
        // update sg rule
224
        if ingressNeedUpdate {
×
225
                if err = c.OVNNbClient.UpdateSgACL(sg, ovnnb.ACLDirectionToLport); err != nil {
×
226
                        sg.Status.IngressLastSyncSuccess = false
×
227
                        c.patchSgStatus(sg)
×
228
                        klog.Error(err)
×
229
                        return err
×
230
                }
×
231

232
                if err := c.OVNNbClient.CreateSgBaseACL(sg.Name, ovnnb.ACLDirectionToLport); err != nil {
×
233
                        klog.Error(err)
×
234
                        return err
×
235
                }
×
236
                sg.Status.IngressMd5 = newIngressMd5
×
237
                sg.Status.IngressLastSyncSuccess = true
×
238
                c.patchSgStatus(sg)
×
239
        }
240
        if egressNeedUpdate {
×
241
                if err = c.OVNNbClient.UpdateSgACL(sg, ovnnb.ACLDirectionFromLport); err != nil {
×
242
                        sg.Status.IngressLastSyncSuccess = false
×
243
                        c.patchSgStatus(sg)
×
244
                        klog.Error(err)
×
245
                        return err
×
246
                }
×
247

248
                if err := c.OVNNbClient.CreateSgBaseACL(sg.Name, ovnnb.ACLDirectionFromLport); err != nil {
×
249
                        klog.Error(err)
×
250
                        return err
×
251
                }
×
252

253
                sg.Status.EgressMd5 = newEgressMd5
×
254
                sg.Status.EgressLastSyncSuccess = true
×
255
                c.patchSgStatus(sg)
×
256
        }
257

258
        // update status
259
        sg.Status.PortGroup = ovs.GetSgPortGroupName(sg.Name)
×
260
        sg.Status.AllowSameGroupTraffic = sg.Spec.AllowSameGroupTraffic
×
261
        c.patchSgStatus(sg)
×
262
        c.syncSgPortsQueue.Add(key)
×
263
        return nil
×
264
}
265

266
func (c *Controller) validateSgRule(sg *kubeovnv1.SecurityGroup) error {
×
267
        // check sg rules
×
268
        allRules := append(sg.Spec.IngressRules, sg.Spec.EgressRules...)
×
269
        for _, rule := range allRules {
×
270
                if rule.IPVersion != "ipv4" && rule.IPVersion != "ipv6" {
×
271
                        return errors.New("IPVersion should be 'ipv4' or 'ipv6'")
×
272
                }
×
273

274
                if rule.Priority < 1 || rule.Priority > 200 {
×
275
                        return fmt.Errorf("priority '%d' is not in the range of 1 to 200", rule.Priority)
×
276
                }
×
277

278
                switch rule.RemoteType {
×
279
                case kubeovnv1.SgRemoteTypeAddress:
×
280
                        if strings.Contains(rule.RemoteAddress, "/") {
×
281
                                if _, _, err := net.ParseCIDR(rule.RemoteAddress); err != nil {
×
282
                                        return fmt.Errorf("invalid CIDR '%s'", rule.RemoteAddress)
×
283
                                }
×
284
                        } else {
×
285
                                if net.ParseIP(rule.RemoteAddress) == nil {
×
286
                                        return fmt.Errorf("invalid ip address '%s'", rule.RemoteAddress)
×
287
                                }
×
288
                        }
289
                case kubeovnv1.SgRemoteTypeSg:
×
290
                        _, err := c.sgsLister.Get(rule.RemoteSecurityGroup)
×
291
                        if err != nil {
×
292
                                return fmt.Errorf("failed to get remote sg '%s', %w", rule.RemoteSecurityGroup, err)
×
293
                        }
×
294
                default:
×
295
                        return fmt.Errorf("not support sgRemoteType '%s'", rule.RemoteType)
×
296
                }
297

298
                if rule.Protocol == kubeovnv1.SgProtocolTCP || rule.Protocol == kubeovnv1.SgProtocolUDP {
×
299
                        if rule.PortRangeMin < 1 || rule.PortRangeMin > 65535 || rule.PortRangeMax < 1 || rule.PortRangeMax > 65535 {
×
300
                                return errors.New("portRange is out of range")
×
301
                        }
×
302
                        if rule.PortRangeMin > rule.PortRangeMax {
×
303
                                return errors.New("portRange err, range Minimum value greater than maximum value")
×
304
                        }
×
305
                }
306
        }
307
        return nil
×
308
}
309

310
func (c *Controller) patchSgStatus(sg *kubeovnv1.SecurityGroup) {
×
311
        bytes, err := sg.Status.Bytes()
×
312
        if err != nil {
×
313
                klog.Error(err)
×
314
                return
×
315
        }
×
316
        if _, err = c.config.KubeOvnClient.KubeovnV1().SecurityGroups().Patch(context.Background(), sg.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "status"); err != nil {
×
317
                klog.Error("patch security group status failed", err)
×
318
        }
×
319
}
320

321
func (c *Controller) handleDeleteSg(key string) error {
×
322
        c.sgKeyMutex.LockKey(key)
×
323
        defer func() { _ = c.sgKeyMutex.UnlockKey(key) }()
×
324
        klog.Infof("handle delete security group %s", key)
×
325

×
326
        if err := c.OVNNbClient.DeleteSecurityGroup(key); err != nil {
×
327
                klog.Errorf("delete sg %s: %v", key, err)
×
328
                return err
×
329
        }
×
330

331
        return nil
×
332
}
333

334
func (c *Controller) syncSgLogicalPort(key string) error {
×
335
        c.sgKeyMutex.LockKey(key)
×
336
        defer func() { _ = c.sgKeyMutex.UnlockKey(key) }()
×
337
        klog.Infof("sync lsp for security group %s", key)
×
338

×
339
        sgPorts, err := c.OVNNbClient.ListLogicalSwitchPorts(false, map[string]string{"associated_sg_" + key: "true"}, nil)
×
340
        if err != nil {
×
341
                klog.Errorf("failed to find logical port, %v", err)
×
342
                return err
×
343
        }
×
344

345
        var ports, v4s, v6s, addresses []string
×
346
        for _, lsp := range sgPorts {
×
347
                ports = append(ports, lsp.Name)
×
348
                if len(lsp.PortSecurity) != 0 {
×
349
                        addresses = lsp.PortSecurity
×
350
                } else {
×
351
                        addresses = lsp.Addresses
×
352
                }
×
353
                for _, as := range addresses {
×
354
                        fields := strings.Fields(as)
×
355
                        if len(fields) < 2 {
×
356
                                continue
×
357
                        }
358
                        for _, address := range fields[1:] {
×
359
                                if strings.Contains(address, ":") {
×
360
                                        v6s = append(v6s, address)
×
361
                                } else {
×
362
                                        v4s = append(v4s, address)
×
363
                                }
×
364
                        }
365
                }
366
        }
367

368
        sg, err := c.sgsLister.Get(key)
×
369
        if err != nil {
×
370
                if k8serrors.IsNotFound(err) {
×
371
                        klog.Warningf("no security group %s", key)
×
372
                        return nil
×
373
                }
×
374
                klog.Errorf("failed to get security group %s: %v", key, err)
×
375
                return err
×
376
        }
377

378
        if err = c.OVNNbClient.PortGroupSetPorts(sg.Status.PortGroup, ports); err != nil {
×
379
                klog.Errorf("add ports to port group %s: %v", sg.Status.PortGroup, err)
×
380
                return err
×
381
        }
×
382

383
        v4AsName := ovs.GetSgV4AssociatedName(key)
×
384
        if err := c.OVNNbClient.AddressSetUpdateAddress(v4AsName, v4s...); err != nil {
×
385
                klog.Errorf("set ips to address set %s: %v", v4AsName, err)
×
386
                return err
×
387
        }
×
388

389
        v6AsName := ovs.GetSgV6AssociatedName(key)
×
390
        if err := c.OVNNbClient.AddressSetUpdateAddress(v6AsName, v6s...); err != nil {
×
391
                klog.Errorf("set ips to address set %s: %v", v6AsName, err)
×
392
                return err
×
393
        }
×
394

395
        c.addOrUpdateSgQueue.Add(util.DenyAllSecurityGroup)
×
396
        return nil
×
397
}
398

399
func (c *Controller) getPortSg(port *ovnnb.LogicalSwitchPort) ([]string, error) {
1✔
400
        var sgList []string
1✔
401
        for key, value := range port.ExternalIDs {
2✔
402
                if strings.HasPrefix(key, "associated_sg_") && value == "true" {
2✔
403
                        sgName := strings.ReplaceAll(key, "associated_sg_", "")
1✔
404
                        sgList = append(sgList, sgName)
1✔
405
                }
1✔
406
        }
407
        return sgList, nil
1✔
408
}
409

410
func (c *Controller) reconcilePortSg(portName, securityGroups string) error {
×
411
        port, err := c.OVNNbClient.GetLogicalSwitchPort(portName, false)
×
412
        if err != nil {
×
413
                klog.Errorf("failed to get logical switch port %s: %v", portName, err)
×
414
                return err
×
415
        }
×
416
        oldSgList, err := c.getPortSg(port)
×
417
        if err != nil {
×
418
                klog.Errorf("get port sg failed, %v", err)
×
419
                return err
×
420
        }
×
421
        klog.Infof("reconcile port sg, port='%s', oldSgList='%s', newSgList='%s'", portName, oldSgList, securityGroups)
×
422

×
423
        newSgList := strings.Split(securityGroups, ",")
×
424
        diffSgList := util.DiffStringSlice(oldSgList, newSgList)
×
425
        for _, sgName := range diffSgList {
×
426
                if sgName == "" {
×
427
                        continue
×
428
                }
429
                needAssociated := "false"
×
430
                if slices.Contains(newSgList, sgName) {
×
431
                        needAssociated = "true"
×
432
                }
×
433

434
                if err = c.OVNNbClient.SetLogicalSwitchPortExternalIDs(portName, map[string]string{"associated_sg_" + sgName: needAssociated}); err != nil {
×
435
                        klog.Errorf("set logical switch port %s external_ids: %v", portName, err)
×
436
                        return err
×
437
                }
×
438
                c.syncSgPortsQueue.Add(sgName)
×
439
        }
440

441
        if err = c.OVNNbClient.SetLogicalSwitchPortExternalIDs(portName, map[string]string{"security_groups": strings.ReplaceAll(securityGroups, ",", "/")}); err != nil {
×
442
                klog.Errorf("set logical switch port %s external_ids: %v", portName, err)
×
443
                return err
×
444
        }
×
445

446
        return nil
×
447
}
448

449
// securityGroupAllNotExist return true if all sgs does not exist
450
func (c *Controller) securityGroupAllNotExist(sgs []string) (bool, error) {
1✔
451
        if len(sgs) == 0 {
2✔
452
                return true, nil
1✔
453
        }
1✔
454

455
        notExistsCount := 0
1✔
456
        // sgs format: sg1/sg2/sg3
1✔
457
        for _, sg := range sgs {
2✔
458
                ok, err := c.OVNNbClient.PortGroupExists(ovs.GetSgPortGroupName(sg))
1✔
459
                if err != nil {
1✔
460
                        klog.Error(err)
×
461
                        return true, err
×
462
                }
×
463

464
                if !ok {
2✔
465
                        notExistsCount++
1✔
466
                }
1✔
467
        }
468

469
        return notExistsCount == len(sgs), nil
1✔
470
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc