• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 8274

06 Dec 2023 10:05AM UTC coverage: 53.692% (-0.1%) from 53.79%
8274

Pull #1206

travis-pro

web-flow
Merge cbf87aef6 into afa25a6a5
Pull Request #1206: Added paremeter enable-opflex-agent-reconnect

5 of 29 new or added lines in 3 files covered. (17.24%)

17 existing lines in 3 files now uncovered.

13263 of 24702 relevant lines covered (53.69%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.83
/pkg/controller/nodes.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
// Handlers for node updates.
16

17
package controller
18

19
import (
20
        "crypto/rand"
21
        "encoding/json"
22
        "errors"
23
        "fmt"
24
        "net"
25
        "net/http"
26

27
        v1 "k8s.io/api/core/v1"
28
        kubeerr "k8s.io/apimachinery/pkg/api/errors"
29
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30
        "k8s.io/apimachinery/pkg/fields"
31
        "k8s.io/apimachinery/pkg/labels"
32
        "k8s.io/client-go/kubernetes"
33
        "k8s.io/client-go/tools/cache"
34

35
        "github.com/sirupsen/logrus"
36

37
        "github.com/noironetworks/aci-containers/pkg/apicapi"
38
        crdv1 "github.com/noironetworks/aci-containers/pkg/gbpcrd/apis/acipolicy/v1"
39
        "github.com/noironetworks/aci-containers/pkg/gbpserver/watchers"
40
        "github.com/noironetworks/aci-containers/pkg/ipam"
41
        "github.com/noironetworks/aci-containers/pkg/metadata"
42
        nodePodIf "github.com/noironetworks/aci-containers/pkg/nodepodif/apis/acipolicy/v1"
43
        "github.com/noironetworks/aci-containers/pkg/util"
44
)
45

46
const (
47
        tunnelIDIncr = 2
48
)
49

50
type tunnelState struct {
51
        stateDriver  *watchers.K8sStateDriver
52
        nodeToTunnel map[string]int64
53
        nextID       int64
54
}
55

56
func (cont *AciController) initNodeInformerFromClient(
57
        kubeClient *kubernetes.Clientset) {
×
58
        cont.initNodeInformerBase(
×
59
                cache.NewListWatchFromClient(
×
60
                        kubeClient.CoreV1().RESTClient(), "nodes",
×
61
                        metav1.NamespaceAll, fields.Everything()))
×
62
}
×
63

64
func (cont *AciController) initNodeInformerBase(listWatch *cache.ListWatch) {
1✔
65
        cont.nodeIndexer, cont.nodeInformer = cache.NewIndexerInformer(
1✔
66
                listWatch, &v1.Node{}, 0,
1✔
67
                cache.ResourceEventHandlerFuncs{
1✔
68
                        AddFunc: func(obj interface{}) {
2✔
69
                                cont.syncPodNet(obj) // update cache
1✔
70
                                cont.nodeChanged(obj)
1✔
71
                        },
1✔
72
                        UpdateFunc: func(_ interface{}, obj interface{}) {
1✔
73
                                cont.nodeChanged(obj)
1✔
74
                        },
1✔
75
                        DeleteFunc: func(obj interface{}) {
1✔
76
                                cont.nodeDeleted(obj)
1✔
77
                        },
1✔
78
                },
79
                cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
80
        )
81
}
82

83
func apicNodeNetPol(name string, tenantName string,
84
        nodeIps map[string]bool) apicapi.ApicObject {
1✔
85
        hpp := apicapi.NewHostprotPol(tenantName, name)
1✔
86
        hppDn := hpp.GetDn()
1✔
87
        nodeSubj := apicapi.NewHostprotSubj(hppDn, "local-node")
1✔
88
        if len(nodeIps) > 0 {
2✔
89
                nodeSubjDn := nodeSubj.GetDn()
1✔
90
                outbound := apicapi.NewHostprotRule(nodeSubjDn, "allow-all-egress")
1✔
91
                outbound.SetAttr("direction", "egress")
1✔
92
                outbound.SetAttr("ethertype", "ipv4")
1✔
93
                outbound.SetAttr("connTrack", "normal")
1✔
94

1✔
95
                inbound := apicapi.NewHostprotRule(nodeSubjDn, "allow-all-ingress")
1✔
96
                inbound.SetAttr("direction", "ingress")
1✔
97
                inbound.SetAttr("ethertype", "ipv4")
1✔
98
                inbound.SetAttr("connTrack", "normal")
1✔
99

1✔
100
                for ip := range nodeIps {
2✔
101
                        outbound.AddChild(apicapi.NewHostprotRemoteIp(outbound.GetDn(), ip))
1✔
102
                        inbound.AddChild(apicapi.NewHostprotRemoteIp(inbound.GetDn(), ip))
1✔
103
                }
1✔
104

105
                nodeSubj.AddChild(inbound)
1✔
106
                nodeSubj.AddChild(outbound)
1✔
107
        }
108
        hpp.AddChild(nodeSubj)
1✔
109
        return hpp
1✔
110
}
111

112
func (cont *AciController) createNetPolForNode(node *v1.Node) {
1✔
113
        nodeIps := make(map[string]bool)
1✔
114
        for _, a := range node.Status.Addresses {
2✔
115
                if a.Address != "" &&
1✔
116
                        (a.Type == "InternalIP" || a.Type == "ExternalIP") {
2✔
117
                        nodeIps[a.Address] = true
1✔
118
                }
1✔
119
        }
120

121
        sgName := cont.aciNameForKey("node", node.Name)
1✔
122
        cont.apicConn.WriteApicObjects(sgName,
1✔
123
                apicapi.ApicSlice{
1✔
124
                        apicNodeNetPol(sgName, cont.config.AciPolicyTenant, nodeIps),
1✔
125
                })
1✔
126
}
127

128
func (cont *AciController) createServiceEndpoint(existing, ep *metadata.ServiceEndpoint, deviceMac, nodeName string) error {
1✔
129
        _, err := net.ParseMAC(deviceMac)
1✔
130
        if err == nil && deviceMac != "00:00:00:00:00:00" {
2✔
131
                ep.Mac = deviceMac
1✔
132
        } else {
2✔
133
                _, err := net.ParseMAC(existing.Mac)
1✔
134
                if err == nil {
2✔
135
                        ep.Mac = existing.Mac
1✔
136
                } else {
2✔
137
                        var mac net.HardwareAddr = make([]byte, 6)
1✔
138
                        _, err := rand.Read(mac)
1✔
139
                        if err != nil {
1✔
140
                                return err
×
141
                        }
×
142

143
                        mac[0] = (mac[0] & 254) | 2
1✔
144
                        ep.Mac = mac.String()
1✔
145
                }
146
        }
147

148
        if ep.Ipv4 == nil && existing.Ipv4 != nil &&
1✔
149
                cont.nodeServiceIps.V4.RemoveIp(existing.Ipv4) {
2✔
150
                ep.Ipv4 = existing.Ipv4
1✔
151
        }
1✔
152

153
        if ep.Ipv4 == nil {
2✔
154
                ipv4, err := cont.nodeServiceIps.V4.GetIp()
1✔
155
                if err == nil {
2✔
156
                        ep.Ipv4 = ipv4
1✔
157
                } else {
2✔
158
                        ep.Ipv4 = nil
1✔
159
                }
1✔
160
        }
161

162
        if ep.Ipv6 == nil && existing.Ipv6 != nil &&
1✔
163
                cont.nodeServiceIps.V6.RemoveIp(existing.Ipv6) {
1✔
164
                ep.Ipv6 = existing.Ipv6
×
165
        }
×
166

167
        if ep.Ipv6 == nil {
2✔
168
                ipv6, err := cont.nodeServiceIps.V6.GetIp()
1✔
169
                if err == nil {
2✔
170
                        ep.Ipv6 = ipv6
1✔
171
                } else {
2✔
172
                        ep.Ipv6 = nil
1✔
173
                }
1✔
174
        }
175

176
        if ep.Ipv4 == nil && ep.Ipv6 == nil {
2✔
177
                return errors.New("No IP addresses available for service endpoint")
1✔
178
        }
1✔
179

180
        if (ep.HealthGroupDn == "") && (cont.config.AciServiceMonitorInterval > 0) {
2✔
181
                name := cont.aciNameForKey("svc", nodeName)
1✔
182
                healthGroupObj := apicapi.NewVnsRedirectHealthGroup(cont.config.AciVrfTenant, name)
1✔
183
                ep.HealthGroupDn = healthGroupObj.GetDn()
1✔
184
                cont.apicConn.WriteApicObjects(name, apicapi.ApicSlice{healthGroupObj})
1✔
185
        }
1✔
186

187
        return nil
1✔
188
}
189

190
func (cont *AciController) nodeFullSync() {
1✔
191
        cache.ListAll(cont.nodeIndexer, labels.Everything(),
1✔
192
                func(nodeobj interface{}) {
2✔
193
                        cont.syncPodNet(nodeobj) // update the cache
1✔
194
                        cont.nodeChanged(nodeobj)
1✔
195
                })
1✔
196
}
197

198
// syncPodNet syncs in the podnets from the node object's annotation
199
func (cont *AciController) syncPodNet(obj interface{}) {
1✔
200
        cont.indexMutex.Lock()
1✔
201
        defer cont.indexMutex.Unlock()
1✔
202

1✔
203
        node := obj.(*v1.Node)
1✔
204
        logger := cont.log.WithFields(logrus.Fields{
1✔
205
                "Node": node.ObjectMeta.Name,
1✔
206
        })
1✔
207

1✔
208
        if node.ObjectMeta.Annotations == nil {
1✔
209
                return // nothing to sync
×
210
        }
×
211

212
        netval, ok := node.ObjectMeta.Annotations[metadata.PodNetworkRangeAnnotation]
1✔
213
        if !ok {
2✔
214
                return // nothing to sync
1✔
215
        }
1✔
216

217
        nodePodNet := newNodePodNetMeta()
1✔
218
        cont.mergePodNet(nodePodNet, netval, logger)
1✔
219
        cont.nodePodNetCache[node.ObjectMeta.Name] = nodePodNet
1✔
220
}
221

222
func maxof(x1, x2 int64) int64 {
×
223
        if x1 > x2 {
×
224
                return x1
×
225
        }
×
226

227
        return x2
×
228
}
229

230
func (cont *AciController) getTunnelID(node *v1.Node) int64 {
1✔
231
        if cont.config.LBType == lbTypeAci {
2✔
232
                return 0
1✔
233
        }
1✔
234

235
        nodeIP := getNodeIP(node, v1.NodeInternalIP)
×
236
        if nodeIP == "" {
×
237
                cont.log.Errorf("Can't get IP for node %s", node.Name)
×
238
                return 0
×
239
        }
×
240

241
        if cont.tunnelGetter == nil {
×
242
                tunnelGetter := &tunnelState{}
×
243
                tunnelGetter.stateDriver = &watchers.K8sStateDriver{}
×
244
                err := tunnelGetter.stateDriver.Init(watchers.FieldTunnelID)
×
245
                if err != nil {
×
246
                        cont.log.Warnf("Can't get tunnelID for %s", node.Name)
×
247
                        return 0
×
248
                }
×
249

250
                gbps, err := tunnelGetter.stateDriver.Get()
×
251
                if err != nil {
×
252
                        cont.log.Warnf("Can't get tunnelID for %s", node.Name)
×
253
                        return 0
×
254
                }
×
255

256
                cont.tunnelGetter = tunnelGetter
×
257
                if gbps.Status.TunnelIDs == nil {
×
258
                        cont.log.Infof("Initializing tunnelIDs")
×
259
                        tunnelGetter.nodeToTunnel = make(map[string]int64)
×
260
                        cont.tunnelGetter.nextID = 0
×
261
                } else {
×
262
                        tunnelGetter.nodeToTunnel = gbps.Status.TunnelIDs
×
263
                        for _, val := range tunnelGetter.nodeToTunnel {
×
264
                                cont.tunnelGetter.nextID = maxof(cont.tunnelGetter.nextID, val)
×
265
                        }
×
266

267
                        cont.tunnelGetter.nextID += tunnelIDIncr
×
268
                }
269
        }
270

271
        id, found := cont.tunnelGetter.nodeToTunnel[nodeIP]
×
272
        if found {
×
273
                return id + int64(cont.config.CSRTunnelIDBase)
×
274
        }
×
275

276
        id = cont.tunnelGetter.nextID
×
277
        if id >= int64(cont.config.MaxCSRTunnels*tunnelIDIncr) {
×
278
                cont.log.Infof("Max tunnels %d reached", cont.config.MaxCSRTunnels)
×
279
                return 0
×
280
        }
×
281

282
        // each allocated id packs two tunnels.
283
        cont.tunnelGetter.nodeToTunnel[nodeIP] = id
×
284
        cont.tunnelGetter.nextID = id + tunnelIDIncr
×
285
        state := &crdv1.GBPSState{}
×
286
        state.Status.TunnelIDs = cont.tunnelGetter.nodeToTunnel
×
287
        cont.tunnelGetter.stateDriver.Update(state)
×
288
        return id + int64(cont.config.CSRTunnelIDBase)
×
289
}
290

291
func (cont *AciController) writeApicNode(node *v1.Node) {
1✔
292
        if cont.config.ChainedMode {
1✔
293
                return
×
294
        }
×
295
        tunnelID := cont.getTunnelID(node)
1✔
296
        key := cont.aciNameForKey("node-vmm", node.Name)
1✔
297
        aobj := apicapi.NewVmmInjectedHost(cont.vmmDomainProvider(),
1✔
298
                cont.config.AciVmmDomain, cont.config.AciVmmController,
1✔
299
                node.Name)
1✔
300
        aobj.SetAttr("mgmtIp", getNodeIP(node, v1.NodeInternalIP))
1✔
301
        aobj.SetAttr("os", node.Status.NodeInfo.OSImage)
1✔
302
        aobj.SetAttr("kernelVer", node.Status.NodeInfo.KernelVersion)
1✔
303
        if apicapi.ApicVersion >= "5.0" {
1✔
304
                aobj.SetAttr("id", fmt.Sprintf("%v", tunnelID))
×
305
        }
×
306
        cont.apicConn.WriteApicObjects(key, apicapi.ApicSlice{aobj})
1✔
307
}
308

309
func getNodeIP(node *v1.Node, aType v1.NodeAddressType) string {
1✔
310
        for _, a := range node.Status.Addresses {
2✔
311
                if a.Type == aType {
2✔
312
                        return a.Address
1✔
313
                }
1✔
314
        }
315

316
        return ""
1✔
317
}
318

319
func (cont *AciController) nodeChangedByName(nodeName string) {
1✔
320
        node, exists, err := cont.nodeIndexer.GetByKey(nodeName)
1✔
321
        if err != nil {
1✔
322
                cont.log.Error("Could not lookup node: ", err)
×
323
                return
×
324
        }
×
325
        if exists && node != nil {
2✔
326
                cont.nodeChanged(node)
1✔
327
        }
1✔
328
}
329

330
func (cont *AciController) nodeChanged(obj interface{}) {
1✔
331
        if cont.config.ChainedMode {
1✔
332
                return
×
333
        }
×
334
        cont.indexMutex.Lock()
1✔
335

1✔
336
        node := obj.(*v1.Node)
1✔
337
        logger := cont.log.WithFields(logrus.Fields{
1✔
338
                "Node": node.ObjectMeta.Name,
1✔
339
        })
1✔
340

1✔
341
        nodeUpdated := false
1✔
342
        if node.ObjectMeta.Annotations == nil {
1✔
343
                node.ObjectMeta.Annotations = make(map[string]string)
×
344
        }
×
345

346
        nodeMeta, metaok := cont.nodeServiceMetaCache[node.ObjectMeta.Name]
1✔
347
        epval, epok := node.ObjectMeta.Annotations[metadata.ServiceEpAnnotation]
1✔
348
        deviceMac, hasDevice := cont.deviceMacForNode(node.ObjectMeta.Name)
1✔
349

1✔
350
        if cont.nodeSyncEnabled && hasDevice {
2✔
351
                if !metaok {
2✔
352
                        nodeMeta = &nodeServiceMeta{}
1✔
353
                        cont.nodeServiceMetaCache[node.ObjectMeta.Name] = nodeMeta
1✔
354
                }
1✔
355

356
                existing := &metadata.ServiceEndpoint{}
1✔
357
                if epok {
2✔
358
                        err := json.Unmarshal([]byte(epval), existing)
1✔
359
                        if err != nil {
1✔
360
                                logger.WithFields(logrus.Fields{
×
361
                                        "epval": epval,
×
362
                                }).Warn("Could not parse existing node ",
×
363
                                        "service endpoint annotation: ", err)
×
364
                        }
×
365
                }
366

367
                cont.createServiceEndpoint(existing, &nodeMeta.serviceEp, deviceMac, node.ObjectMeta.Name)
1✔
368
                raw, err := json.Marshal(&nodeMeta.serviceEp)
1✔
369
                if err != nil {
1✔
370
                        logger.Error("Could not create node service endpoint annotation", err)
×
371
                } else {
1✔
372
                        serviceEpAnnotation := string(raw)
1✔
373
                        if !epok || serviceEpAnnotation != epval {
2✔
374
                                node.ObjectMeta.Annotations[metadata.ServiceEpAnnotation] =
1✔
375
                                        serviceEpAnnotation
1✔
376
                                nodeUpdated = true
1✔
377
                                cont.updateServicesForNode(node.ObjectMeta.Name)
1✔
378
                                cont.snatFullSync()
1✔
379
                        }
1✔
380
                }
381
        }
382

383
        nodePodNet, ok := cont.nodePodNetCache[node.ObjectMeta.Name]
1✔
384
        if !ok {
2✔
385
                nodePodNet = newNodePodNetMeta()
1✔
386
                cont.nodePodNetCache[node.ObjectMeta.Name] = nodePodNet
1✔
387
        }
1✔
388

389
        netval := node.ObjectMeta.Annotations[metadata.PodNetworkRangeAnnotation]
1✔
390
        if cont.nodeSyncEnabled {
2✔
391
                cont.checkNodePodNet(node.ObjectMeta.Name)
1✔
392
                if netval != nodePodNet.podNetIpsAnnotation {
2✔
393
                        logger.Debug("Overwriting existing pod network: ", netval)
1✔
394
                        node.ObjectMeta.Annotations[metadata.PodNetworkRangeAnnotation] =
1✔
395
                                nodePodNet.podNetIpsAnnotation
1✔
396
                        nodeUpdated = true
1✔
397
                }
1✔
398
        }
399

400
        if cont.config.EnableOpflexAgentReconnect {
1✔
NEW
401
                nodeAciPodAnnotation, ok := cont.nodeACIPodAnnot[node.ObjectMeta.Name]
×
NEW
402
                if ok {
×
NEW
403
                        nodeAciPod := nodeAciPodAnnotation.aciPod
×
NEW
404
                        aciPodAnn := node.ObjectMeta.Annotations[metadata.NodeAciPodAnnotation]
×
NEW
405
                        if cont.nodeSyncEnabled {
×
NEW
406
                                if aciPodAnn != nodeAciPod && nodeAciPod != "" {
×
NEW
407
                                        node.ObjectMeta.Annotations[metadata.NodeAciPodAnnotation] = nodeAciPod
×
NEW
408
                                        nodeUpdated = true
×
NEW
409
                                }
×
410
                        }
NEW
411
                } else {
×
NEW
412
                        var annot aciPodAnnot
×
NEW
413
                        cont.nodeACIPodAnnot[node.ObjectMeta.Name] = annot
×
UNCOV
414
                }
×
415
        }
416

417
        if cont.config.AciMultipod {
1✔
418
                nodeAciPodAnnot, ok := cont.nodeACIPod[node.ObjectMeta.Name]
×
419
                if ok {
×
420
                        nodeAciPod := nodeAciPodAnnot.aciPod
×
421
                        aciPodAnn := node.ObjectMeta.Annotations[metadata.AciPodAnnotation]
×
422
                        if cont.nodeSyncEnabled {
×
423
                                if aciPodAnn != nodeAciPod && nodeAciPod != "" {
×
424
                                        node.ObjectMeta.Annotations[metadata.AciPodAnnotation] = nodeAciPod
×
425
                                        nodeUpdated = true
×
426
                                }
×
427
                        }
428
                } else {
×
429
                        var annot aciPodAnnot
×
430
                        cont.nodeACIPod[node.ObjectMeta.Name] = annot
×
431
                }
×
432
        }
433
        cont.indexMutex.Unlock()
1✔
434

1✔
435
        cont.createNetPolForNode(node)
1✔
436
        cont.writeApicNode(node)
1✔
437

1✔
438
        if nodeUpdated {
2✔
439
                _, err := cont.updateNode(node)
1✔
440
                if err != nil {
1✔
441
                        var serr *kubeerr.StatusError
×
442
                        if errors.As(err, &serr) {
×
443
                                if serr.ErrStatus.Code == http.StatusConflict {
×
444
                                        logger.Debug("Conflict updating node; ",
×
445
                                                "will retry on next update")
×
446
                                        return
×
447
                                }
×
448
                        }
449
                        logger.Error("Failed to update node: ", err)
×
450
                } else {
1✔
451
                        logger.WithFields(logrus.Fields{
1✔
452
                                "ServiceEpAnnotation": node.
1✔
453
                                        ObjectMeta.Annotations[metadata.ServiceEpAnnotation],
1✔
454
                                "PodNetworkRangeAnnotation": node.
1✔
455
                                        ObjectMeta.Annotations[metadata.PodNetworkRangeAnnotation],
1✔
456
                        }).Info("Updated node annotations")
1✔
457
                }
1✔
458
        }
459
}
460

461
func (cont *AciController) nodeDeleted(obj interface{}) {
1✔
462
        if cont.config.ChainedMode {
1✔
463
                return
×
464
        }
×
465
        node, isNode := obj.(*v1.Node)
1✔
466
        if !isNode {
1✔
467
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
×
468
                if !ok {
×
469
                        cont.log.Error("Received unexpected object: ", obj)
×
470
                        return
×
471
                }
×
472
                node, ok = deletedState.Obj.(*v1.Node)
×
473
                if !ok {
×
474
                        cont.log.Error("DeletedFinalStateUnknown contained non-Node object: ", deletedState.Obj)
×
475
                        return
×
476
                }
×
477
        }
478
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("node", node.Name))
1✔
479
        cont.apicConn.ClearApicObjects(cont.aciNameForKey("node-vmm", node.Name))
1✔
480
        cont.log.Infof("Node deleted: %s", node.ObjectMeta.Name)
1✔
481

1✔
482
        cont.indexMutex.Lock()
1✔
483
        defer cont.indexMutex.Unlock()
1✔
484

1✔
485
        if existing, ok := cont.nodeServiceMetaCache[node.ObjectMeta.Name]; ok {
2✔
486
                if existing.serviceEp.Ipv4 != nil {
2✔
487
                        cont.nodeServiceIps.V4.AddIp(existing.serviceEp.Ipv4)
1✔
488
                }
1✔
489
                if existing.serviceEp.Ipv6 != nil {
1✔
490
                        cont.nodeServiceIps.V6.AddIp(existing.serviceEp.Ipv6)
×
491
                }
×
492
        }
493
        delete(cont.nodeServiceMetaCache, node.ObjectMeta.Name)
1✔
494
        cont.updateServicesForNode(node.ObjectMeta.Name)
1✔
495
        cont.snatFullSync()
1✔
496
        if _, ok := cont.snatNodeInfoCache[node.ObjectMeta.Name]; ok {
1✔
497
                env := cont.env.(*K8sEnvironment)
×
498
                nodeinfocl := env.nodeInfoClient
×
499
                //TODO Add reconcile here on failure
×
500
                if nodeinfocl != nil {
×
501
                        err := util.DeleteNodeInfoCR(*nodeinfocl, node.ObjectMeta.Name)
×
502
                        if err != nil {
×
503
                                cont.log.Error("Could not delete the NodeInfo", node.ObjectMeta.Name)
×
504
                                return
×
505
                        }
×
506
                        cont.log.Debug("Successfully Deleted NodeInfoCR for node: ", node.ObjectMeta.Name)
×
507
                }
508
                nodeinfo := cont.snatNodeInfoCache[node.ObjectMeta.Name]
×
509
                delete(cont.snatNodeInfoCache, node.ObjectMeta.Name)
×
510
                cont.log.Debug("Node deleted from snatNodeInfoCache: ", node.ObjectMeta.Name)
×
511
                nodeinfokey, _ := cache.MetaNamespaceKeyFunc(nodeinfo)
×
512
                cont.queueNodeInfoUpdateByKey(nodeinfokey)
×
513
        }
514

515
        if podnet, ok := cont.nodePodNetCache[node.ObjectMeta.Name]; ok {
2✔
516
                if podnet != nil && cont.podNetworkIps != nil {
2✔
517
                        if cont.podNetworkIps.V4 != nil && podnet.podNetIps.V4 != nil {
1✔
518
                                cont.podNetworkIps.V4.AddRanges(podnet.podNetIps.V4)
×
519
                        }
×
520
                        if cont.podNetworkIps.V4 != nil && podnet.podNetIps.V6 != nil {
1✔
521
                                cont.podNetworkIps.V6.AddRanges(podnet.podNetIps.V6)
×
522
                        }
×
523
                }
524
                delete(cont.nodePodNetCache, node.ObjectMeta.Name)
1✔
525
                cont.log.Debug("Node deleted from nodePodNetCache: ", node.ObjectMeta.Name)
1✔
526
        }
527

528
        if cont.config.EnableOpflexAgentReconnect {
1✔
NEW
529
                delete(cont.nodeACIPodAnnot, node.ObjectMeta.Name)
×
NEW
530
        }
×
531
        if cont.config.AciMultipod {
1✔
NEW
532
                delete(cont.nodeACIPod, node.ObjectMeta.Name)
×
NEW
533
        }
×
534

535
        np, ok := obj.(*nodePodIf.NodePodIF)
1✔
536
        if !ok {
2✔
537
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
1✔
538
                if !ok {
2✔
539
                        cont.log.Error("Received unexpected object: ", obj)
1✔
540
                        return
1✔
541
                }
1✔
542
                np, ok = deletedState.Obj.(*nodePodIf.NodePodIF)
×
543
                if !ok {
×
544
                        cont.log.Error("DeletedFinalStateUnknown contained non-NodePodIF object: ", deletedState.Obj)
×
545
                        return
×
546
                }
×
547
        }
548
        env := cont.env.(*K8sEnvironment)
×
549
        nodepodifc1 := env.nodePodifClient
×
550
        if nodepodifc1 != nil {
×
551
                err := util.DeleteNodePodIfCR(*nodepodifc1, np.ObjectMeta.Name)
×
552
                if err != nil {
×
553
                        cont.log.Error("Could not delete the NodePodIF", np.ObjectMeta.Name)
×
554
                        return
×
555
                }
×
556
        }
557
}
558

559
// must have index lock
560
func (cont *AciController) addPodToNode(nodename, key string) {
1✔
561
        existing, ok := cont.nodePodNetCache[nodename]
1✔
562
        if !ok {
2✔
563
                existing = newNodePodNetMeta()
1✔
564
                cont.nodePodNetCache[nodename] = existing
1✔
565
        }
1✔
566
        if _, ok = existing.nodePods[key]; !ok {
2✔
567
                existing.nodePods[key] = true
1✔
568
                cont.checkNodePodNet(nodename)
1✔
569
        }
1✔
570
}
571

572
// must have index lock
573
func (cont *AciController) removePodFromNode(nodename, key string) {
×
574
        if existing, ok := cont.nodePodNetCache[nodename]; ok {
×
575
                delete(existing.nodePods, key)
×
576
                cont.checkNodePodNet(nodename)
×
577
        }
×
578
}
579

580
func (cont *AciController) recomputePodNetAnnotation(podnet *nodePodNetMeta) {
1✔
581
        raw, err := json.Marshal(&podnet.podNetIps)
1✔
582
        if err != nil {
1✔
583
                cont.log.Error("Could not create node pod network ",
×
584
                        "annotation", err)
×
585
        }
×
586
        podnet.podNetIpsAnnotation = string(raw)
1✔
587
}
588

589
// must have index lock
590
func (cont *AciController) mergePodNet(podnet *nodePodNetMeta, existingAnnotation string, logger *logrus.Entry) {
1✔
591
        existing := &metadata.NetIps{}
1✔
592
        err := json.Unmarshal([]byte(existingAnnotation), existing)
1✔
593
        if err != nil {
1✔
594
                cont.log.Error("Could not parse existing pod network ",
×
595
                        "annotation", err)
×
596
                return
×
597
        }
×
598

599
        logger.Debug("Merging existing pod network: ", existingAnnotation)
1✔
600

1✔
601
        {
2✔
602
                v4 := ipam.NewFromRanges(podnet.podNetIps.V4)
1✔
603
                v4.AddRanges(existing.V4)
1✔
604
                annSize := v4.GetSize()
1✔
605

1✔
606
                // validate existing against configured range
1✔
607
                v4 = v4.Intersect(cont.configuredPodNetworkIps.V4)
1✔
608
                if v4.GetSize() != annSize {
2✔
609
                        logger.Warnf("intersect: %+v, config: %+v", v4, cont.configuredPodNetworkIps.V4)
1✔
610
                        logger.Warn("Existing annotation outside configured",
1✔
611
                                "range", existingAnnotation)
1✔
612
                }
1✔
613

614
                // mark the existing as allocated
615
                prevSize := cont.podNetworkIps.V4.GetSize()
1✔
616
                cont.podNetworkIps.V4.RemoveRanges(existing.V4)
1✔
617

1✔
618
                // verify allocation was successful
1✔
619
                newSize := cont.podNetworkIps.V4.GetSize()
1✔
620
                if (newSize + annSize) != prevSize {
2✔
621
                        logger.Warn("Existing annotation failed allocation: ",
1✔
622
                                existingAnnotation)
1✔
623
                }
1✔
624

625
                if len(v4.FreeList) > 0 {
2✔
626
                        podnet.podNetIps.V4 = v4.FreeList
1✔
627
                } else {
1✔
628
                        podnet.podNetIps.V4 = nil
×
629
                }
×
630
        }
631

632
        {
1✔
633
                v6 := ipam.NewFromRanges(podnet.podNetIps.V6)
1✔
634
                v6.AddRanges(existing.V6)
1✔
635
                v6 = v6.Intersect(cont.configuredPodNetworkIps.V6)
1✔
636
                cont.podNetworkIps.V6.RemoveRanges(existing.V6)
1✔
637
                if len(v6.FreeList) > 0 {
1✔
638
                        podnet.podNetIps.V6 = v6.FreeList
×
639
                } else {
1✔
640
                        podnet.podNetIps.V6 = nil
1✔
641
                }
1✔
642
        }
643

644
        cont.recomputePodNetAnnotation(podnet)
1✔
645
}
646

647
func (cont *AciController) allocateIpChunk(podnet *nodePodNetMeta, v4 bool) bool {
1✔
648
        var podnetipam, ipa *ipam.IpAlloc
1✔
649
        changed := false
1✔
650
        if v4 {
2✔
651
                podnetipam = ipam.NewFromRanges(podnet.podNetIps.V4)
1✔
652
                ipa = cont.podNetworkIps.V4
1✔
653
        } else {
2✔
654
                podnetipam = ipam.NewFromRanges(podnet.podNetIps.V6)
1✔
655
                ipa = cont.podNetworkIps.V6
1✔
656
        }
1✔
657
        size := podnetipam.GetSize()
1✔
658
        if int64(len(podnet.nodePods)) >
1✔
659
                size-int64(cont.config.PodIpPoolChunkSize)/2 {
2✔
660
                // we have half a chunk left or less; allocate a new chunk
1✔
661
                r, err := ipa.GetIpChunk(int64(cont.config.PodIpPoolChunkSize))
1✔
662
                if err != nil {
2✔
663
                        cont.log.Error("Could not allocate address chunk: ", err)
1✔
664
                } else {
2✔
665
                        podnetipam.AddRanges(r)
1✔
666
                        if v4 {
2✔
667
                                podnet.podNetIps.V4 = podnetipam.FreeList
1✔
668
                        } else {
2✔
669
                                podnet.podNetIps.V6 = podnetipam.FreeList
1✔
670
                        }
1✔
671
                        cont.recomputePodNetAnnotation(podnet)
1✔
672
                        changed = true
1✔
673
                }
674
        }
675
        return changed
1✔
676
}
677

678
// must have index lock
679
func (cont *AciController) checkNodePodNet(nodename string) {
1✔
680
        v4changed, v6changed := false, false
1✔
681
        if podnet, ok := cont.nodePodNetCache[nodename]; ok {
2✔
682
                if !cont.configuredPodNetworkIps.V4.Empty() {
2✔
683
                        v4changed = cont.allocateIpChunk(podnet, true)
1✔
684
                }
1✔
685
                if !cont.configuredPodNetworkIps.V6.Empty() {
2✔
686
                        v6changed = cont.allocateIpChunk(podnet, false)
1✔
687
                }
1✔
688
        }
689
        if v4changed || v6changed {
2✔
690
                go cont.env.NodePodNetworkChanged(nodename)
1✔
691
        }
1✔
692
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc