• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 8234

23 Nov 2023 06:48PM UTC coverage: 53.808% (-0.07%) from 53.873%
8234

push

travis-pro

web-flow
Merge pull request #1202 from noironetworks/platformconfig

Inform opflex-agent that opflexOdev is deleted

26 of 110 new or added lines in 6 files covered. (23.64%)

1 existing line in 1 file now uncovered.

13275 of 24671 relevant lines covered (53.81%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.77
/pkg/controller/nodes.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
// Handlers for node updates.
16

17
package controller
18

19
import (
20
        "crypto/rand"
21
        "encoding/json"
22
        "errors"
23
        "fmt"
24
        "net"
25
        "net/http"
26

27
        v1 "k8s.io/api/core/v1"
28
        kubeerr "k8s.io/apimachinery/pkg/api/errors"
29
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30
        "k8s.io/apimachinery/pkg/fields"
31
        "k8s.io/apimachinery/pkg/labels"
32
        "k8s.io/client-go/kubernetes"
33
        "k8s.io/client-go/tools/cache"
34

35
        "github.com/sirupsen/logrus"
36

37
        "github.com/noironetworks/aci-containers/pkg/apicapi"
38
        crdv1 "github.com/noironetworks/aci-containers/pkg/gbpcrd/apis/acipolicy/v1"
39
        "github.com/noironetworks/aci-containers/pkg/gbpserver/watchers"
40
        "github.com/noironetworks/aci-containers/pkg/ipam"
41
        "github.com/noironetworks/aci-containers/pkg/metadata"
42
        nodePodIf "github.com/noironetworks/aci-containers/pkg/nodepodif/apis/acipolicy/v1"
43
        "github.com/noironetworks/aci-containers/pkg/util"
44
)
45

46
const (
47
        tunnelIDIncr = 2
48
)
49

50
type tunnelState struct {
51
        stateDriver  *watchers.K8sStateDriver
52
        nodeToTunnel map[string]int64
53
        nextID       int64
54
}
55

56
func (cont *AciController) initNodeInformerFromClient(
57
        kubeClient *kubernetes.Clientset) {
×
58
        cont.initNodeInformerBase(
×
59
                cache.NewListWatchFromClient(
×
60
                        kubeClient.CoreV1().RESTClient(), "nodes",
×
61
                        metav1.NamespaceAll, fields.Everything()))
×
62
}
×
63

64
func (cont *AciController) initNodeInformerBase(listWatch *cache.ListWatch) {
1✔
65
        cont.nodeIndexer, cont.nodeInformer = cache.NewIndexerInformer(
1✔
66
                listWatch, &v1.Node{}, 0,
1✔
67
                cache.ResourceEventHandlerFuncs{
1✔
68
                        AddFunc: func(obj interface{}) {
2✔
69
                                cont.syncPodNet(obj) // update cache
1✔
70
                                cont.nodeChanged(obj)
1✔
71
                        },
1✔
72
                        UpdateFunc: func(_ interface{}, obj interface{}) {
1✔
73
                                cont.nodeChanged(obj)
1✔
74
                        },
1✔
75
                        DeleteFunc: func(obj interface{}) {
1✔
76
                                cont.nodeDeleted(obj)
1✔
77
                        },
1✔
78
                },
79
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
80
        )
81
}
82

83
func apicNodeNetPol(name string, tenantName string,
84
        nodeIps map[string]bool) apicapi.ApicObject {
1✔
85
        hpp := apicapi.NewHostprotPol(tenantName, name)
1✔
86
        hppDn := hpp.GetDn()
1✔
87
        nodeSubj := apicapi.NewHostprotSubj(hppDn, "local-node")
1✔
88
        if len(nodeIps) > 0 {
2✔
89
                nodeSubjDn := nodeSubj.GetDn()
1✔
90
                outbound := apicapi.NewHostprotRule(nodeSubjDn, "allow-all-egress")
1✔
91
                outbound.SetAttr("direction", "egress")
1✔
92
                outbound.SetAttr("ethertype", "ipv4")
1✔
93
                outbound.SetAttr("connTrack", "normal")
1✔
94

1✔
95
                inbound := apicapi.NewHostprotRule(nodeSubjDn, "allow-all-ingress")
1✔
96
                inbound.SetAttr("direction", "ingress")
1✔
97
                inbound.SetAttr("ethertype", "ipv4")
1✔
98
                inbound.SetAttr("connTrack", "normal")
1✔
99

1✔
100
                for ip := range nodeIps {
2✔
101
                        outbound.AddChild(apicapi.NewHostprotRemoteIp(outbound.GetDn(), ip))
1✔
102
                        inbound.AddChild(apicapi.NewHostprotRemoteIp(inbound.GetDn(), ip))
1✔
103
                }
1✔
104

105
                nodeSubj.AddChild(inbound)
1✔
106
                nodeSubj.AddChild(outbound)
1✔
107
        }
108
        hpp.AddChild(nodeSubj)
1✔
109
        return hpp
1✔
110
}
111

112
func (cont *AciController) createNetPolForNode(node *v1.Node) {
1✔
113
        nodeIps := make(map[string]bool)
1✔
114
        for _, a := range node.Status.Addresses {
2✔
115
                if a.Address != "" &&
1✔
116
                        (a.Type == "InternalIP" || a.Type == "ExternalIP") {
2✔
117
                        nodeIps[a.Address] = true
1✔
118
                }
1✔
119
        }
120

121
        sgName := cont.aciNameForKey("node", node.Name)
1✔
122
        cont.apicConn.WriteApicObjects(sgName,
1✔
123
                apicapi.ApicSlice{
1✔
124
                        apicNodeNetPol(sgName, cont.config.AciPolicyTenant, nodeIps),
1✔
125
                })
1✔
126
}
127

128
func (cont *AciController) createServiceEndpoint(existing, ep *metadata.ServiceEndpoint, deviceMac, nodeName string) error {
1✔
129
        _, err := net.ParseMAC(deviceMac)
1✔
130
        if err == nil && deviceMac != "00:00:00:00:00:00" {
2✔
131
                ep.Mac = deviceMac
1✔
132
        } else {
2✔
133
                _, err := net.ParseMAC(existing.Mac)
1✔
134
                if err == nil {
2✔
135
                        ep.Mac = existing.Mac
1✔
136
                } else {
2✔
137
                        var mac net.HardwareAddr = make([]byte, 6)
1✔
138
                        _, err := rand.Read(mac)
1✔
139
                        if err != nil {
1✔
140
                                return err
×
141
                        }
×
142

143
                        mac[0] = (mac[0] & 254) | 2
1✔
144
                        ep.Mac = mac.String()
1✔
145
                }
146
        }
147

148
        if ep.Ipv4 == nil && existing.Ipv4 != nil &&
1✔
149
                cont.nodeServiceIps.V4.RemoveIp(existing.Ipv4) {
2✔
150
                ep.Ipv4 = existing.Ipv4
1✔
151
        }
1✔
152

153
        if ep.Ipv4 == nil {
2✔
154
                ipv4, err := cont.nodeServiceIps.V4.GetIp()
1✔
155
                if err == nil {
2✔
156
                        ep.Ipv4 = ipv4
1✔
157
                } else {
2✔
158
                        ep.Ipv4 = nil
1✔
159
                }
1✔
160
        }
161

162
        if ep.Ipv6 == nil && existing.Ipv6 != nil &&
1✔
163
                cont.nodeServiceIps.V6.RemoveIp(existing.Ipv6) {
1✔
164
                ep.Ipv6 = existing.Ipv6
×
165
        }
×
166

167
        if ep.Ipv6 == nil {
2✔
168
                ipv6, err := cont.nodeServiceIps.V6.GetIp()
1✔
169
                if err == nil {
2✔
170
                        ep.Ipv6 = ipv6
1✔
171
                } else {
2✔
172
                        ep.Ipv6 = nil
1✔
173
                }
1✔
174
        }
175

176
        if ep.Ipv4 == nil && ep.Ipv6 == nil {
2✔
177
                return errors.New("No IP addresses available for service endpoint")
1✔
178
        }
1✔
179

180
        if (ep.HealthGroupDn == "") && (cont.config.AciServiceMonitorInterval > 0) {
2✔
181
                name := cont.aciNameForKey("svc", nodeName)
1✔
182
                healthGroupObj := apicapi.NewVnsRedirectHealthGroup(cont.config.AciVrfTenant, name)
1✔
183
                ep.HealthGroupDn = healthGroupObj.GetDn()
1✔
184
                cont.apicConn.WriteApicObjects(name, apicapi.ApicSlice{healthGroupObj})
1✔
185
        }
1✔
186

187
        return nil
1✔
188
}
189

190
func (cont *AciController) nodeFullSync() {
1✔
191
        cache.ListAll(cont.nodeIndexer, labels.Everything(),
1✔
192
                func(nodeobj interface{}) {
2✔
193
                        cont.syncPodNet(nodeobj) // update the cache
1✔
194
                        cont.nodeChanged(nodeobj)
1✔
195
                })
1✔
196
}
197

198
// syncPodNet syncs in the podnets from the node object's annotation
199
func (cont *AciController) syncPodNet(obj interface{}) {
1✔
200
        cont.indexMutex.Lock()
1✔
201
        defer cont.indexMutex.Unlock()
1✔
202

1✔
203
        node := obj.(*v1.Node)
1✔
204
        logger := cont.log.WithFields(logrus.Fields{
1✔
205
                "Node": node.ObjectMeta.Name,
1✔
206
        })
1✔
207

1✔
208
        if node.ObjectMeta.Annotations == nil {
1✔
209
                return // nothing to sync
×
210
        }
×
211

212
        netval, ok := node.ObjectMeta.Annotations[metadata.PodNetworkRangeAnnotation]
1✔
213
        if !ok {
2✔
214
                return // nothing to sync
1✔
215
        }
1✔
216

217
        nodePodNet := newNodePodNetMeta()
1✔
218
        cont.mergePodNet(nodePodNet, netval, logger)
1✔
219
        cont.nodePodNetCache[node.ObjectMeta.Name] = nodePodNet
1✔
220
}
221

222
func maxof(x1, x2 int64) int64 {
×
223
        if x1 > x2 {
×
224
                return x1
×
225
        }
×
226

227
        return x2
×
228
}
229

230
func (cont *AciController) getTunnelID(node *v1.Node) int64 {
1✔
231
        if cont.config.LBType == lbTypeAci {
2✔
232
                return 0
1✔
233
        }
1✔
234

235
        nodeIP := getNodeIP(node, v1.NodeInternalIP)
×
236
        if nodeIP == "" {
×
237
                cont.log.Errorf("Can't get IP for node %s", node.Name)
×
238
                return 0
×
239
        }
×
240

241
        if cont.tunnelGetter == nil {
×
242
                tunnelGetter := &tunnelState{}
×
243
                tunnelGetter.stateDriver = &watchers.K8sStateDriver{}
×
244
                err := tunnelGetter.stateDriver.Init(watchers.FieldTunnelID)
×
245
                if err != nil {
×
246
                        cont.log.Warnf("Can't get tunnelID for %s", node.Name)
×
247
                        return 0
×
248
                }
×
249

250
                gbps, err := tunnelGetter.stateDriver.Get()
×
251
                if err != nil {
×
252
                        cont.log.Warnf("Can't get tunnelID for %s", node.Name)
×
253
                        return 0
×
254
                }
×
255

256
                cont.tunnelGetter = tunnelGetter
×
257
                if gbps.Status.TunnelIDs == nil {
×
258
                        cont.log.Infof("Initializing tunnelIDs")
×
259
                        tunnelGetter.nodeToTunnel = make(map[string]int64)
×
260
                        cont.tunnelGetter.nextID = 0
×
261
                } else {
×
262
                        tunnelGetter.nodeToTunnel = gbps.Status.TunnelIDs
×
263
                        for _, val := range tunnelGetter.nodeToTunnel {
×
264
                                cont.tunnelGetter.nextID = maxof(cont.tunnelGetter.nextID, val)
×
265
                        }
×
266

267
                        cont.tunnelGetter.nextID += tunnelIDIncr
×
268
                }
269
        }
270

271
        id, found := cont.tunnelGetter.nodeToTunnel[nodeIP]
×
272
        if found {
×
273
                return id + int64(cont.config.CSRTunnelIDBase)
×
274
        }
×
275

276
        id = cont.tunnelGetter.nextID
×
277
        if id >= int64(cont.config.MaxCSRTunnels*tunnelIDIncr) {
×
278
                cont.log.Infof("Max tunnels %d reached", cont.config.MaxCSRTunnels)
×
279
                return 0
×
280
        }
×
281

282
        // each allocated id packs two tunnels.
283
        cont.tunnelGetter.nodeToTunnel[nodeIP] = id
×
284
        cont.tunnelGetter.nextID = id + tunnelIDIncr
×
285
        state := &crdv1.GBPSState{}
×
286
        state.Status.TunnelIDs = cont.tunnelGetter.nodeToTunnel
×
287
        cont.tunnelGetter.stateDriver.Update(state)
×
288
        return id + int64(cont.config.CSRTunnelIDBase)
×
289
}
290

291
func (cont *AciController) writeApicNode(node *v1.Node) {
1✔
292
        if cont.config.ChainedMode {
1✔
293
                return
×
294
        }
×
295
        tunnelID := cont.getTunnelID(node)
1✔
296
        key := cont.aciNameForKey("node-vmm", node.Name)
1✔
297
        aobj := apicapi.NewVmmInjectedHost(cont.vmmDomainProvider(),
1✔
298
                cont.config.AciVmmDomain, cont.config.AciVmmController,
1✔
299
                node.Name)
1✔
300
        aobj.SetAttr("mgmtIp", getNodeIP(node, v1.NodeInternalIP))
1✔
301
        aobj.SetAttr("os", node.Status.NodeInfo.OSImage)
1✔
302
        aobj.SetAttr("kernelVer", node.Status.NodeInfo.KernelVersion)
1✔
303
        if apicapi.ApicVersion >= "5.0" {
1✔
304
                aobj.SetAttr("id", fmt.Sprintf("%v", tunnelID))
×
305
        }
×
306
        cont.apicConn.WriteApicObjects(key, apicapi.ApicSlice{aobj})
1✔
307
}
308

309
func getNodeIP(node *v1.Node, aType v1.NodeAddressType) string {
1✔
310
        for _, a := range node.Status.Addresses {
2✔
311
                if a.Type == aType {
2✔
312
                        return a.Address
1✔
313
                }
1✔
314
        }
315

316
        return ""
1✔
317
}
318

319
func (cont *AciController) nodeChangedByName(nodeName string) {
1✔
320
        node, exists, err := cont.nodeIndexer.GetByKey(nodeName)
1✔
321
        if err != nil {
1✔
322
                cont.log.Error("Could not lookup node: ", err)
×
323
                return
×
324
        }
×
325
        if exists && node != nil {
2✔
326
                cont.nodeChanged(node)
1✔
327
        }
1✔
328
}
329

330
func (cont *AciController) nodeChanged(obj interface{}) {
1✔
331
        if cont.config.ChainedMode {
1✔
332
                return
×
333
        }
×
334
        cont.indexMutex.Lock()
1✔
335

1✔
336
        node := obj.(*v1.Node)
1✔
337
        logger := cont.log.WithFields(logrus.Fields{
1✔
338
                "Node": node.ObjectMeta.Name,
1✔
339
        })
1✔
340

1✔
341
        nodeUpdated := false
1✔
342
        if node.ObjectMeta.Annotations == nil {
1✔
343
                node.ObjectMeta.Annotations = make(map[string]string)
×
344
        }
×
345

346
        nodeMeta, metaok := cont.nodeServiceMetaCache[node.ObjectMeta.Name]
1✔
347
        epval, epok := node.ObjectMeta.Annotations[metadata.ServiceEpAnnotation]
1✔
348
        deviceMac, hasDevice := cont.deviceMacForNode(node.ObjectMeta.Name)
1✔
349

1✔
350
        if cont.nodeSyncEnabled && hasDevice {
2✔
351
                if !metaok {
2✔
352
                        nodeMeta = &nodeServiceMeta{}
1✔
353
                        cont.nodeServiceMetaCache[node.ObjectMeta.Name] = nodeMeta
1✔
354
                }
1✔
355

356
                existing := &metadata.ServiceEndpoint{}
1✔
357
                if epok {
2✔
358
                        err := json.Unmarshal([]byte(epval), existing)
1✔
359
                        if err != nil {
1✔
360
                                logger.WithFields(logrus.Fields{
×
361
                                        "epval": epval,
×
362
                                }).Warn("Could not parse existing node ",
×
363
                                        "service endpoint annotation: ", err)
×
364
                        }
×
365
                }
366

367
                cont.createServiceEndpoint(existing, &nodeMeta.serviceEp, deviceMac, node.ObjectMeta.Name)
1✔
368
                raw, err := json.Marshal(&nodeMeta.serviceEp)
1✔
369
                if err != nil {
1✔
370
                        logger.Error("Could not create node service endpoint annotation", err)
×
371
                } else {
1✔
372
                        serviceEpAnnotation := string(raw)
1✔
373
                        if !epok || serviceEpAnnotation != epval {
2✔
374
                                node.ObjectMeta.Annotations[metadata.ServiceEpAnnotation] =
1✔
375
                                        serviceEpAnnotation
1✔
376
                                nodeUpdated = true
1✔
377
                                cont.updateServicesForNode(node.ObjectMeta.Name)
1✔
378
                                cont.snatFullSync()
1✔
379
                        }
1✔
380
                }
381
        }
382

383
        nodePodNet, ok := cont.nodePodNetCache[node.ObjectMeta.Name]
1✔
384
        if !ok {
2✔
385
                nodePodNet = newNodePodNetMeta()
1✔
386
                cont.nodePodNetCache[node.ObjectMeta.Name] = nodePodNet
1✔
387
        }
1✔
388

389
        netval := node.ObjectMeta.Annotations[metadata.PodNetworkRangeAnnotation]
1✔
390
        if cont.nodeSyncEnabled {
2✔
391
                cont.checkNodePodNet(node.ObjectMeta.Name)
1✔
392
                if netval != nodePodNet.podNetIpsAnnotation {
2✔
393
                        logger.Debug("Overwriting existing pod network: ", netval)
1✔
394
                        node.ObjectMeta.Annotations[metadata.PodNetworkRangeAnnotation] =
1✔
395
                                nodePodNet.podNetIpsAnnotation
1✔
396
                        nodeUpdated = true
1✔
397
                }
1✔
398
        }
399

400
        nodeAciPodAnnotation, ok := cont.nodeACIPodAnnot[node.ObjectMeta.Name]
1✔
401
        if ok {
2✔
402
                nodeAciPod := nodeAciPodAnnotation.aciPod
1✔
403
                aciPodAnn := node.ObjectMeta.Annotations[metadata.NodeAciPodAnnotation]
1✔
404
                if cont.nodeSyncEnabled {
2✔
405
                        if aciPodAnn != nodeAciPod && nodeAciPod != "" {
1✔
NEW
406
                                node.ObjectMeta.Annotations[metadata.NodeAciPodAnnotation] = nodeAciPod
×
NEW
407
                                nodeUpdated = true
×
NEW
408
                        }
×
409
                }
410
        } else {
1✔
411
                var annot aciPodAnnot
1✔
412
                cont.nodeACIPodAnnot[node.ObjectMeta.Name] = annot
1✔
413
        }
1✔
414

415
        if cont.config.AciMultipod {
1✔
416
                nodeAciPodAnnot, ok := cont.nodeACIPod[node.ObjectMeta.Name]
×
417
                if ok {
×
418
                        nodeAciPod := nodeAciPodAnnot.aciPod
×
419
                        aciPodAnn := node.ObjectMeta.Annotations[metadata.AciPodAnnotation]
×
420
                        if cont.nodeSyncEnabled {
×
421
                                if aciPodAnn != nodeAciPod && nodeAciPod != "" {
×
422
                                        node.ObjectMeta.Annotations[metadata.AciPodAnnotation] = nodeAciPod
×
423
                                        nodeUpdated = true
×
424
                                }
×
425
                        }
426
                } else {
×
427
                        var annot aciPodAnnot
×
428
                        cont.nodeACIPod[node.ObjectMeta.Name] = annot
×
429
                }
×
430
        }
431
        cont.indexMutex.Unlock()
1✔
432

1✔
433
        cont.createNetPolForNode(node)
1✔
434
        cont.writeApicNode(node)
1✔
435

1✔
436
        if nodeUpdated {
2✔
437
                _, err := cont.updateNode(node)
1✔
438
                if err != nil {
1✔
439
                        var serr *kubeerr.StatusError
×
440
                        if errors.As(err, &serr) {
×
441
                                if serr.ErrStatus.Code == http.StatusConflict {
×
442
                                        logger.Debug("Conflict updating node; ",
×
443
                                                "will retry on next update")
×
444
                                        return
×
445
                                }
×
446
                        }
447
                        logger.Error("Failed to update node: ", err)
×
448
                } else {
1✔
449
                        logger.WithFields(logrus.Fields{
1✔
450
                                "ServiceEpAnnotation": node.
1✔
451
                                        ObjectMeta.Annotations[metadata.ServiceEpAnnotation],
1✔
452
                                "PodNetworkRangeAnnotation": node.
1✔
453
                                        ObjectMeta.Annotations[metadata.PodNetworkRangeAnnotation],
1✔
454
                        }).Info("Updated node annotations")
1✔
455
                }
1✔
456
        }
457
}
458

459
func (cont *AciController) nodeDeleted(obj interface{}) {
1✔
460
        if cont.config.ChainedMode {
1✔
461
                return
×
462
        }
×
463
        node, isNode := obj.(*v1.Node)
1✔
464
        if !isNode {
1✔
465
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
466
                if !ok {
×
467
                        cont.log.Error("Received unexpected object: ", obj)
×
468
                        return
×
469
                }
×
470
                node, ok = deletedState.Obj.(*v1.Node)
×
471
                if !ok {
×
472
                        cont.log.Error("DeletedFinalStateUnknown contained non-Node object: ", deletedState.Obj)
×
473
                        return
×
474
                }
×
475
        }
476
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("node", node.Name))
1✔
477
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("node-vmm", node.Name))
1✔
478
        cont.log.Infof("Node deleted: %s", node.ObjectMeta.Name)
1✔
479

1✔
480
        cont.indexMutex.Lock()
1✔
481
        defer cont.indexMutex.Unlock()
1✔
482

1✔
483
        if existing, ok := cont.nodeServiceMetaCache[node.ObjectMeta.Name]; ok {
2✔
484
                if existing.serviceEp.Ipv4 != nil {
2✔
485
                        cont.nodeServiceIps.V4.AddIp(existing.serviceEp.Ipv4)
1✔
486
                }
1✔
487
                if existing.serviceEp.Ipv6 != nil {
1✔
488
                        cont.nodeServiceIps.V6.AddIp(existing.serviceEp.Ipv6)
×
489
                }
×
490
        }
491
        delete(cont.nodeServiceMetaCache, node.ObjectMeta.Name)
1✔
492
        cont.updateServicesForNode(node.ObjectMeta.Name)
1✔
493
        cont.snatFullSync()
1✔
494
        if _, ok := cont.snatNodeInfoCache[node.ObjectMeta.Name]; ok {
1✔
495
                env := cont.env.(*K8sEnvironment)
×
496
                nodeinfocl := env.nodeInfoClient
×
497
                //TODO Add reconcile here on failure
×
498
                if nodeinfocl != nil {
×
499
                        err := util.DeleteNodeInfoCR(*nodeinfocl, node.ObjectMeta.Name)
×
500
                        if err != nil {
×
501
                                cont.log.Error("Could not delete the NodeInfo", node.ObjectMeta.Name)
×
502
                                return
×
503
                        }
×
504
                        cont.log.Debug("Successfully Deleted NodeInfoCR for node: ", node.ObjectMeta.Name)
×
505
                }
506
                nodeinfo := cont.snatNodeInfoCache[node.ObjectMeta.Name]
×
507
                delete(cont.snatNodeInfoCache, node.ObjectMeta.Name)
×
508
                cont.log.Debug("Node deleted from snatNodeInfoCache: ", node.ObjectMeta.Name)
×
509
                nodeinfokey, _ := cache.MetaNamespaceKeyFunc(nodeinfo)
×
510
                cont.queueNodeInfoUpdateByKey(nodeinfokey)
×
511
        }
512

513
        if podnet, ok := cont.nodePodNetCache[node.ObjectMeta.Name]; ok {
2✔
514
                if podnet != nil && cont.podNetworkIps != nil {
2✔
515
                        if cont.podNetworkIps.V4 != nil && podnet.podNetIps.V4 != nil {
1✔
516
                                cont.podNetworkIps.V4.AddRanges(podnet.podNetIps.V4)
×
517
                        }
×
518
                        if cont.podNetworkIps.V4 != nil && podnet.podNetIps.V6 != nil {
1✔
519
                                cont.podNetworkIps.V6.AddRanges(podnet.podNetIps.V6)
×
520
                        }
×
521
                }
522
                delete(cont.nodePodNetCache, node.ObjectMeta.Name)
1✔
523
                cont.log.Debug("Node deleted from nodePodNetCache: ", node.ObjectMeta.Name)
1✔
524
        }
525

526
        delete(cont.nodeACIPodAnnot, node.ObjectMeta.Name)
1✔
527
        np, ok := obj.(*nodePodIf.NodePodIF)
1✔
528
        if !ok {
2✔
529
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
1✔
530
                if !ok {
2✔
531
                        cont.log.Error("Received unexpected object: ", obj)
1✔
532
                        return
1✔
533
                }
1✔
534
                np, ok = deletedState.Obj.(*nodePodIf.NodePodIF)
×
535
                if !ok {
×
536
                        cont.log.Error("DeletedFinalStateUnknown contained non-NodePodIF object: ", deletedState.Obj)
×
537
                        return
×
538
                }
×
539
        }
540
        env := cont.env.(*K8sEnvironment)
×
541
        nodepodifc1 := env.nodePodifClient
×
542
        if nodepodifc1 != nil {
×
543
                err := util.DeleteNodePodIfCR(*nodepodifc1, np.ObjectMeta.Name)
×
544
                if err != nil {
×
545
                        cont.log.Error("Could not delete the NodePodIF", np.ObjectMeta.Name)
×
546
                        return
×
547
                }
×
548
        }
549

550
        if cont.config.AciMultipod {
×
551
                delete(cont.nodeACIPod, node.ObjectMeta.Name)
×
552
        }
×
553
}
554

555
// must have index lock
556
func (cont *AciController) addPodToNode(nodename, key string) {
1✔
557
        existing, ok := cont.nodePodNetCache[nodename]
1✔
558
        if !ok {
2✔
559
                existing = newNodePodNetMeta()
1✔
560
                cont.nodePodNetCache[nodename] = existing
1✔
561
        }
1✔
562
        if _, ok = existing.nodePods[key]; !ok {
2✔
563
                existing.nodePods[key] = true
1✔
564
                cont.checkNodePodNet(nodename)
1✔
565
        }
1✔
566
}
567

568
// must have index lock
569
func (cont *AciController) removePodFromNode(nodename, key string) {
×
570
        if existing, ok := cont.nodePodNetCache[nodename]; ok {
×
571
                delete(existing.nodePods, key)
×
572
                cont.checkNodePodNet(nodename)
×
573
        }
×
574
}
575

576
func (cont *AciController) recomputePodNetAnnotation(podnet *nodePodNetMeta) {
1✔
577
        raw, err := json.Marshal(&podnet.podNetIps)
1✔
578
        if err != nil {
1✔
579
                cont.log.Error("Could not create node pod network ",
×
580
                        "annotation", err)
×
581
        }
×
582
        podnet.podNetIpsAnnotation = string(raw)
1✔
583
}
584

585
// must have index lock
586
func (cont *AciController) mergePodNet(podnet *nodePodNetMeta, existingAnnotation string, logger *logrus.Entry) {
1✔
587
        existing := &metadata.NetIps{}
1✔
588
        err := json.Unmarshal([]byte(existingAnnotation), existing)
1✔
589
        if err != nil {
1✔
590
                cont.log.Error("Could not parse existing pod network ",
×
591
                        "annotation", err)
×
592
                return
×
593
        }
×
594

595
        logger.Debug("Merging existing pod network: ", existingAnnotation)
1✔
596

1✔
597
        {
2✔
598
                v4 := ipam.NewFromRanges(podnet.podNetIps.V4)
1✔
599
                v4.AddRanges(existing.V4)
1✔
600
                annSize := v4.GetSize()
1✔
601

1✔
602
                // validate existing against configured range
1✔
603
                v4 = v4.Intersect(cont.configuredPodNetworkIps.V4)
1✔
604
                if v4.GetSize() != annSize {
2✔
605
                        logger.Warnf("intersect: %+v, config: %+v", v4, cont.configuredPodNetworkIps.V4)
1✔
606
                        logger.Warn("Existing annotation outside configured",
1✔
607
                                "range", existingAnnotation)
1✔
608
                }
1✔
609

610
                // mark the existing as allocated
611
                prevSize := cont.podNetworkIps.V4.GetSize()
1✔
612
                cont.podNetworkIps.V4.RemoveRanges(existing.V4)
1✔
613

1✔
614
                // verify allocation was successful
1✔
615
                newSize := cont.podNetworkIps.V4.GetSize()
1✔
616
                if (newSize + annSize) != prevSize {
2✔
617
                        logger.Warn("Existing annotation failed allocation: ",
1✔
618
                                existingAnnotation)
1✔
619
                }
1✔
620

621
                if len(v4.FreeList) > 0 {
2✔
622
                        podnet.podNetIps.V4 = v4.FreeList
1✔
623
                } else {
1✔
624
                        podnet.podNetIps.V4 = nil
×
625
                }
×
626
        }
627

628
        {
1✔
629
                v6 := ipam.NewFromRanges(podnet.podNetIps.V6)
1✔
630
                v6.AddRanges(existing.V6)
1✔
631
                v6 = v6.Intersect(cont.configuredPodNetworkIps.V6)
1✔
632
                cont.podNetworkIps.V6.RemoveRanges(existing.V6)
1✔
633
                if len(v6.FreeList) > 0 {
1✔
634
                        podnet.podNetIps.V6 = v6.FreeList
×
635
                } else {
1✔
636
                        podnet.podNetIps.V6 = nil
1✔
637
                }
1✔
638
        }
639

640
        cont.recomputePodNetAnnotation(podnet)
1✔
641
}
642

643
func (cont *AciController) allocateIpChunk(podnet *nodePodNetMeta, v4 bool) bool {
1✔
644
        var podnetipam, ipa *ipam.IpAlloc
1✔
645
        changed := false
1✔
646
        if v4 {
2✔
647
                podnetipam = ipam.NewFromRanges(podnet.podNetIps.V4)
1✔
648
                ipa = cont.podNetworkIps.V4
1✔
649
        } else {
2✔
650
                podnetipam = ipam.NewFromRanges(podnet.podNetIps.V6)
1✔
651
                ipa = cont.podNetworkIps.V6
1✔
652
        }
1✔
653
        size := podnetipam.GetSize()
1✔
654
        if int64(len(podnet.nodePods)) >
1✔
655
                size-int64(cont.config.PodIpPoolChunkSize)/2 {
2✔
656
                // we have half a chunk left or less; allocate a new chunk
1✔
657
                r, err := ipa.GetIpChunk(int64(cont.config.PodIpPoolChunkSize))
1✔
658
                if err != nil {
2✔
659
                        cont.log.Error("Could not allocate address chunk: ", err)
1✔
660
                } else {
2✔
661
                        podnetipam.AddRanges(r)
1✔
662
                        if v4 {
2✔
663
                                podnet.podNetIps.V4 = podnetipam.FreeList
1✔
664
                        } else {
2✔
665
                                podnet.podNetIps.V6 = podnetipam.FreeList
1✔
666
                        }
1✔
667
                        cont.recomputePodNetAnnotation(podnet)
1✔
668
                        changed = true
1✔
669
                }
670
        }
671
        return changed
1✔
672
}
673

674
// must have index lock
675
func (cont *AciController) checkNodePodNet(nodename string) {
1✔
676
        v4changed, v6changed := false, false
1✔
677
        if podnet, ok := cont.nodePodNetCache[nodename]; ok {
2✔
678
                if !cont.configuredPodNetworkIps.V4.Empty() {
2✔
679
                        v4changed = cont.allocateIpChunk(podnet, true)
1✔
680
                }
1✔
681
                if !cont.configuredPodNetworkIps.V6.Empty() {
2✔
682
                        v6changed = cont.allocateIpChunk(podnet, false)
1✔
683
                }
1✔
684
        }
685
        if v4changed || v6changed {
2✔
686
                go cont.env.NodePodNetworkChanged(nodename)
1✔
687
        }
1✔
688
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc