• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 11703

13 Mar 2026 06:42AM UTC coverage: 62.818% (-0.1%) from 62.92%
11703

Pull #1686

travis-pro

web-flow
Merge a82785c84 into c594361b4
Pull Request #1686: Fix iptables runtime by setting XTABLES_LIBDIR and installing nftables

13475 of 21451 relevant lines covered (62.82%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.58
/pkg/hostagent/agent.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package hostagent
16

17
import (
18
        "context"
19
        "net"
20
        "os"
21
        "sync"
22
        "sync/atomic"
23
        "time"
24

25
        "github.com/containernetworking/cni/pkg/types"
26
        "github.com/fsnotify/fsnotify"
27
        "github.com/sirupsen/logrus"
28
        "github.com/vishvananda/netlink"
29
        "golang.org/x/time/rate"
30
        v1 "k8s.io/api/core/v1"
31
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32
        "k8s.io/apimachinery/pkg/util/wait"
33
        "k8s.io/client-go/kubernetes"
34
        "k8s.io/client-go/rest"
35
        "k8s.io/client-go/tools/cache"
36
        "k8s.io/client-go/util/workqueue"
37

38
        fabattclset "github.com/noironetworks/aci-containers/pkg/fabricattachment/clientset/versioned"
39
        fabattv1 "github.com/noironetworks/aci-containers/pkg/fabricattachment/clientset/versioned/typed/aci.fabricattachment/v1"
40
        crdclientset "github.com/noironetworks/aci-containers/pkg/gbpcrd/clientset/versioned"
41
        aciv1 "github.com/noironetworks/aci-containers/pkg/gbpcrd/clientset/versioned/typed/acipolicy/v1"
42
        "github.com/noironetworks/aci-containers/pkg/index"
43
        "github.com/noironetworks/aci-containers/pkg/ipam"
44
        md "github.com/noironetworks/aci-containers/pkg/metadata"
45
        nodepodifclset "github.com/noironetworks/aci-containers/pkg/nodepodif/clientset/versioned"
46
        nodepodifv1 "github.com/noironetworks/aci-containers/pkg/nodepodif/clientset/versioned/typed/acipolicy/v1"
47
        snatpolicy "github.com/noironetworks/aci-containers/pkg/snatpolicy/apis/aci.snat/v1"
48
)
49

50
// Name of the taint set by Controller
51
const (
52
        ACIContainersTaintName string = "aci-containers-host/unavailable"
53
)
54

55
var GbpConfig *GBPConfig
56

57
type HostAgent struct {
58
        log    *logrus.Logger
59
        config *HostAgentConfig
60
        env    Environment
61

62
        indexMutex           sync.Mutex
63
        vethMutex            sync.Mutex
64
        ipamMutex            sync.Mutex
65
        epfileMutex          sync.Mutex
66
        snatPolicyLabelMutex sync.RWMutex
67
        snatPolicyCacheMutex sync.RWMutex
68
        proactiveConfMutex   sync.Mutex
69

70
        opflexEps              map[string][]*opflexEndpoint
71
        opflexServices         map[string]*opflexService
72
        epMetadata             map[string]map[string]*md.ContainerMetadata
73
        podNetworkMetadata     map[string]map[string]map[string]*md.ContainerMetadata
74
        primaryNetworkName     string
75
        podIpToName            map[string]string
76
        cniToPodID             map[string]string
77
        podUidToName           map[string]string
78
        podToNetAttachDef      map[string][]string
79
        serviceEp              md.ServiceEndpoint
80
        crdClient              aciv1.AciV1Interface
81
        nodePodIFClient        nodepodifv1.AciV1Interface
82
        fabAttClient           fabattv1.AciV1Interface
83
        podInformer            cache.SharedIndexInformer
84
        endpointsInformer      cache.SharedIndexInformer
85
        serviceInformer        cache.SharedIndexInformer
86
        nodeInformer           cache.SharedIndexInformer
87
        nsInformer             cache.SharedIndexInformer
88
        netPolInformer         cache.SharedIndexInformer
89
        depInformer            cache.SharedIndexInformer
90
        rcInformer             cache.SharedIndexInformer
91
        snatGlobalInformer     cache.SharedIndexInformer
92
        controllerInformer     cache.SharedIndexInformer
93
        snatPolicyInformer     cache.SharedIndexInformer
94
        qosPolicyInformer      cache.SharedIndexInformer
95
        rdConfigInformer       cache.SharedIndexInformer
96
        qosPolPods             *index.PodSelectorIndex
97
        endpointSliceInformer  cache.SharedIndexInformer
98
        netPolPods             *index.PodSelectorIndex
99
        depPods                *index.PodSelectorIndex
100
        rcPods                 *index.PodSelectorIndex
101
        podNetAnnotation       string
102
        aciPodAnnotation       string
103
        nodeAciPodAnnotation   string
104
        podIps                 *ipam.IpCache
105
        usedIPs                map[string]string
106
        netAttDefInformer      cache.SharedIndexInformer
107
        nadVlanMapInformer     cache.SharedIndexInformer
108
        fabricVlanPoolInformer cache.SharedIndexInformer
109
        hppInformer            cache.SharedIndexInformer
110
        hppRemoteIpInformer    cache.SharedIndexInformer
111
        hppMoIndex             map[string][]*gbpBaseMo
112
        proactiveConfInformer  cache.SharedIndexInformer
113

114
        syncEnabled         bool
115
        opflexConfigWritten bool
116
        syncQueue           workqueue.RateLimitingInterface
117
        epSyncQueue         workqueue.RateLimitingInterface
118
        portSyncQueue       workqueue.RateLimitingInterface
119
        hppLocalMoSyncQueue workqueue.RateLimitingInterface
120
        syncProcessors      map[string]func() bool
121

122
        ignoreOvsPorts        map[string][]string
123
        netNsFuncChan         chan func()
124
        vtepIP                string
125
        gbpServerIP           string
126
        opflexSnatGlobalInfos map[string][]*opflexSnatGlobalInfo
127
        opflexSnatLocalInfos  map[string]*opflexSnatLocalInfo
128
        //snatpods per snat policy
129
        snatPods map[string]map[string]ResourceType
130
        //Object Key and list of labels active for snatpolicy
131
        snatPolicyLabels map[string]map[string]map[ResourceType]struct{}
132
        snatPolicyCache  map[string]*snatpolicy.SnatPolicy
133
        rdConfig         *opflexRdConfig
134
        poster           *EventPoster
135
        ocServices       []opflexOcService // OpenShiftservices
136
        serviceEndPoints ServiceEndPointType
137
        // Service to pod uids to track EPfiles aded with clusterIp
138
        servicetoPodUids map[string]map[string]struct{}
139
        // reverse map to get ServiceIp's from poduid
140
        podtoServiceUids map[string]map[string][]string
141
        nodePodIfEPs     map[string]*opflexEndpoint
142
        // integration test checker
143
        integ_test *string `json:",omitempty"`
144
        //network attachment definition map
145
        netattdefmap            map[string]*NetworkAttachmentData
146
        netattdefifacemap       map[string]*NetworkAttachmentData
147
        deviceIdMap             map[string][]string
148
        nadVlanMap              map[string]*nadVlanMatchData
149
        fabricDiscoveryRegistry map[int]FabricDiscoveryAgent
150
        // Namespace and poolname to vlanList
151
        fabricVlanPoolMap map[string]map[string]string
152
        orphanNadMap      map[string]*NetworkAttachmentData
153
        // Pod info
154
        podNameToTimeStamps map[string]*epTimeStamps
155
        completedSyncTypes  map[string]struct{}
156
        taintRemoved        atomic.Value
157
}
158

159
type ServiceEndPointType interface {
160
        InitClientInformer(kubeClient *kubernetes.Clientset)
161
        Run(stopCh <-chan struct{})
162
        SetOpflexService(ofas *opflexService, as *v1.Service,
163
                external bool, key string, sp *v1.ServicePort) bool
164
}
165

166
type serviceEndpoint struct {
167
        agent *HostAgent
168
}
169
type serviceEndpointSlice struct {
170
        agent *HostAgent
171
}
172

173
func (sep *serviceEndpoint) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
174
        sep.agent.initEndpointsInformerFromClient(kubeClient)
×
175
}
×
176

177
func (seps *serviceEndpointSlice) InitClientInformer(kubeClient *kubernetes.Clientset) {
×
178
        seps.agent.initEndpointSliceInformerFromClient(kubeClient)
×
179
}
×
180

181
func (sep *serviceEndpoint) Run(stopCh <-chan struct{}) {
1✔
182
        go sep.agent.endpointsInformer.Run(stopCh)
1✔
183
        cache.WaitForCacheSync(stopCh, sep.agent.endpointsInformer.HasSynced)
1✔
184
}
1✔
185

186
func (seps *serviceEndpointSlice) Run(stopCh <-chan struct{}) {
1✔
187
        go seps.agent.endpointSliceInformer.Run(stopCh)
1✔
188
        cache.WaitForCacheSync(stopCh, seps.agent.endpointSliceInformer.HasSynced)
1✔
189
}
1✔
190

191
func NewHostAgent(config *HostAgentConfig, env Environment, log *logrus.Logger) *HostAgent {
1✔
192
        ha := &HostAgent{
1✔
193
                log:            log,
1✔
194
                config:         config,
1✔
195
                env:            env,
1✔
196
                opflexEps:      make(map[string][]*opflexEndpoint),
1✔
197
                opflexServices: make(map[string]*opflexService),
1✔
198
                epMetadata:     make(map[string]map[string]*md.ContainerMetadata),
1✔
199
                podIpToName:    make(map[string]string),
1✔
200
                cniToPodID:     make(map[string]string),
1✔
201
                podUidToName:   make(map[string]string),
1✔
202
                nodePodIfEPs:   make(map[string]*opflexEndpoint),
1✔
203

1✔
204
                podNameToTimeStamps: make(map[string]*epTimeStamps),
1✔
205

1✔
206
                podIps: ipam.NewIpCache(),
1✔
207

1✔
208
                ignoreOvsPorts: make(map[string][]string),
1✔
209

1✔
210
                netNsFuncChan:         make(chan func()),
1✔
211
                opflexSnatGlobalInfos: make(map[string][]*opflexSnatGlobalInfo),
1✔
212
                opflexSnatLocalInfos:  make(map[string]*opflexSnatLocalInfo),
1✔
213
                snatPods:              make(map[string]map[string]ResourceType),
1✔
214
                snatPolicyLabels:      make(map[string]map[string]map[ResourceType]struct{}),
1✔
215
                snatPolicyCache:       make(map[string]*snatpolicy.SnatPolicy),
1✔
216
                servicetoPodUids:      make(map[string]map[string]struct{}),
1✔
217
                podtoServiceUids:      make(map[string]map[string][]string),
1✔
218
                netattdefmap:          make(map[string]*NetworkAttachmentData),
1✔
219
                netattdefifacemap:     make(map[string]*NetworkAttachmentData),
1✔
220
                deviceIdMap:           make(map[string][]string),
1✔
221
                nadVlanMap:            make(map[string]*nadVlanMatchData),
1✔
222
                fabricVlanPoolMap:     make(map[string]map[string]string),
1✔
223
                orphanNadMap:          make(map[string]*NetworkAttachmentData),
1✔
224
                podToNetAttachDef:     make(map[string][]string),
1✔
225
                podNetworkMetadata:    make(map[string]map[string]map[string]*md.ContainerMetadata),
1✔
226
                completedSyncTypes:    make(map[string]struct{}),
1✔
227
                hppMoIndex:            make(map[string][]*gbpBaseMo),
1✔
228
                syncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
229
                        &workqueue.BucketRateLimiter{
1✔
230
                                Limiter: rate.NewLimiter(rate.Limit(10), int(10)),
1✔
231
                        }, "sync"),
1✔
232
                epSyncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
233
                        &workqueue.BucketRateLimiter{
1✔
234
                                Limiter: rate.NewLimiter(rate.Limit(10), int(10)),
1✔
235
                        }, "epsync"),
1✔
236
                portSyncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
237
                        &workqueue.BucketRateLimiter{
1✔
238
                                Limiter: rate.NewLimiter(rate.Limit(10), int(10)),
1✔
239
                        }, "portsync"),
1✔
240
                hppLocalMoSyncQueue: workqueue.NewNamedRateLimitingQueue(
1✔
241
                        &workqueue.BucketRateLimiter{
1✔
242
                                Limiter: rate.NewLimiter(rate.Limit(10), int(10)),
1✔
243
                        }, "hpplocalmosync"),
1✔
244
                ocServices: []opflexOcService{
1✔
245
                        {
1✔
246
                                RouterInternalDefault,
1✔
247
                                OpenShiftIngressNs,
1✔
248
                        },
1✔
249
                },
1✔
250
        }
1✔
251

1✔
252
        ha.syncProcessors = map[string]func() bool{
1✔
253
                "eps":           ha.syncEps,
1✔
254
                "services":      ha.syncServices,
1✔
255
                "opflexServer":  ha.syncOpflexServer,
1✔
256
                "snat":          ha.syncSnat,
1✔
257
                "snatnodeInfo":  ha.syncSnatNodeInfo,
1✔
258
                "rdconfig":      ha.syncRdConfig,
1✔
259
                "snatLocalInfo": ha.UpdateLocalInfoCr,
1✔
260
                "nodepodifs":    ha.syncNodePodIfs,
1✔
261
                "ports":         ha.syncPortsQ,
1✔
262
                "hpp":           ha.syncLocalHppMo}
1✔
263

1✔
264
        if ha.config.EPRegistry == "k8s" {
1✔
265
                cfg, err := rest.InClusterConfig()
×
266
                if err != nil {
×
267
                        log.Errorf("ERROR getting cluster config: %v", err)
×
268
                        return ha
×
269
                }
×
270
                if !ha.config.ChainedMode {
×
271
                        aciawClient, err := crdclientset.NewForConfig(cfg)
×
272
                        if err != nil {
×
273
                                log.Errorf("ERROR getting crd client for registry: %v", err)
×
274
                                return ha
×
275
                        }
×
276
                        ha.crdClient = aciawClient.AciV1()
×
277
                }
278
        }
279
        if !ha.config.ChainedMode {
2✔
280
                if ha.config.EnableNodePodIF {
1✔
281
                        cfg, err := rest.InClusterConfig()
×
282
                        if err != nil {
×
283
                                log.Errorf("ERROR getting cluster config: %v", err)
×
284
                                return ha
×
285
                        }
×
286
                        nodepodifClient, err := nodepodifclset.NewForConfig(cfg)
×
287
                        if err != nil {
×
288
                                log.Errorf("ERROR getting nodepodif client for enabling NodePodIF: %v", err)
×
289
                                return ha
×
290
                        }
×
291
                        ha.nodePodIFClient = nodepodifClient.AciV1()
×
292
                }
293
        }
294
        cfg, err := rest.InClusterConfig()
1✔
295
        if err != nil {
2✔
296
                log.Errorf("ERROR getting cluster config: %v", err)
1✔
297
                return ha
1✔
298
        }
1✔
299
        fabAttClient, err := fabattclset.NewForConfig(cfg)
×
300
        if err != nil {
×
301
                log.Errorf("ERROR getting fabric attachment client: %v", err)
×
302
                return ha
×
303
        }
×
304
        ha.fabAttClient = fabAttClient.AciV1()
×
305

×
306
        return ha
×
307
}
308

309
func addPodRoute(ipn types.IPNet, dev, src string) error {
×
310
        link, err := netlink.LinkByName(dev)
×
311
        if err != nil {
×
312
                return err
×
313
        }
×
314
        if err := netlink.LinkSetUp(link); err != nil {
×
315
                return err
×
316
        }
×
317
        ipsrc := net.ParseIP(src)
×
318
        dst := &net.IPNet{
×
319
                IP:   ipn.IP,
×
320
                Mask: ipn.Mask,
×
321
        }
×
322
        route := netlink.Route{LinkIndex: link.Attrs().Index, Dst: dst, Src: ipsrc}
×
323
        return netlink.RouteAdd(&route)
×
324
}
325

326
func (agent *HostAgent) ReadSnatPolicyLabel(key string) (map[string]map[ResourceType]struct{}, bool) {
1✔
327
        agent.snatPolicyLabelMutex.RLock()
1✔
328
        defer agent.snatPolicyLabelMutex.RUnlock()
1✔
329
        value, ok := agent.snatPolicyLabels[key]
1✔
330
        return value, ok
1✔
331
}
1✔
332

333
func (agent *HostAgent) WriteSnatPolicyLabel(key, policy string, res ResourceType) {
1✔
334
        agent.snatPolicyLabelMutex.Lock()
1✔
335
        defer agent.snatPolicyLabelMutex.Unlock()
1✔
336
        if _, ok := agent.snatPolicyLabels[key][policy]; !ok {
2✔
337
                agent.snatPolicyLabels[key][policy] = make(map[ResourceType]struct{})
1✔
338
        }
1✔
339
        agent.snatPolicyLabels[key][policy][res] = struct{}{}
1✔
340
}
341

342
func (agent *HostAgent) WriteNewSnatPolicyLabel(key string) {
1✔
343
        agent.snatPolicyLabelMutex.Lock()
1✔
344
        defer agent.snatPolicyLabelMutex.Unlock()
1✔
345
        agent.snatPolicyLabels[key] = make(map[string]map[ResourceType]struct{})
1✔
346
}
1✔
347

348
func (agent *HostAgent) DeleteSnatPolicyLabelEntryResource(key, policy string, res ResourceType) {
×
349
        agent.snatPolicyLabelMutex.Lock()
×
350
        defer agent.snatPolicyLabelMutex.Unlock()
×
351
        delete(agent.snatPolicyLabels[key][policy], res)
×
352
}
×
353

354
func (agent *HostAgent) DeleteSnatPolicyLabelEntry(key, policy string) {
1✔
355
        agent.snatPolicyLabelMutex.Lock()
1✔
356
        defer agent.snatPolicyLabelMutex.Unlock()
1✔
357
        delete(agent.snatPolicyLabels[key], policy)
1✔
358
}
1✔
359

360
func (agent *HostAgent) DeleteSnatPolicyLabel(key string) {
1✔
361
        agent.snatPolicyLabelMutex.Lock()
1✔
362
        defer agent.snatPolicyLabelMutex.Unlock()
1✔
363
        delete(agent.snatPolicyLabels, key)
1✔
364
}
1✔
365

366
func (agent *HostAgent) DeleteMatchingSnatPolicyLabel(policy string) {
1✔
367
        agent.snatPolicyLabelMutex.Lock()
1✔
368
        defer agent.snatPolicyLabelMutex.Unlock()
1✔
369
        for key, v := range agent.snatPolicyLabels {
2✔
370
                if _, ok := v[policy]; ok {
2✔
371
                        delete(agent.snatPolicyLabels[key], policy)
1✔
372
                }
1✔
373
        }
374
}
375

376
func (agent *HostAgent) Init() {
×
377
        agent.log.Debug("Initializing endpoint CNI metadata")
×
378
        primaryNetwork := agent.config.CniNetwork
×
379
        if agent.config.ChainedMode {
×
380
                err := agent.LoadCniNetworks()
×
381
                if err != nil {
×
382
                        agent.log.Infof("%v", err)
×
383
                }
×
384
                primaryNetwork = agent.primaryNetworkName
×
385
                agent.LoadAdditionalNetworkMetadata()
×
386
        }
387

388
        err := md.LoadMetadata(agent.config.CniMetadataDir,
×
389
                primaryNetwork, &agent.epMetadata, agent.config.ChainedMode)
×
390
        if err != nil {
×
391
                panic(err.Error())
×
392
        }
393
        agent.log.Info("Loaded cached endpoint CNI metadata: ", len(agent.epMetadata))
×
394
        agent.buildUsedIPs()
×
395

×
396
        agent.serviceEndPoints = &serviceEndpointSlice{}
×
397
        agent.serviceEndPoints.(*serviceEndpointSlice).agent = agent
×
398
        agent.log.Info("Initializing ServiceEndpointSlices")
×
399

×
400
        err = agent.env.Init(agent)
×
401
        if err != nil {
×
402
                panic(err.Error())
×
403
        }
404
        if agent.config.ChainedMode {
×
405
                err = agent.FabricDiscoveryRegistryInit()
×
406
                if err != nil {
×
407
                        panic(err.Error())
×
408
                }
409
        }
410
}
411

412
func (agent *HostAgent) removeTaintIfNodeReady(node *v1.Node, taintKey string, clientset *kubernetes.Clientset) error {
×
413
        for _, condition := range node.Status.Conditions {
×
414
                if condition.Type == v1.NodeReady && condition.Status == v1.ConditionTrue {
×
415
                        updatedTaints := []v1.Taint{}
×
416
                        for _, taint := range node.Spec.Taints {
×
417
                                if taint.Key != taintKey {
×
418
                                        updatedTaints = append(updatedTaints, taint)
×
419
                                }
×
420
                        }
421

422
                        node.Spec.Taints = updatedTaints
×
423
                        _, err := clientset.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
×
424
                        if err != nil {
×
425
                                return err
×
426
                        }
×
427
                        agent.log.Debugf("Removed taint %s from node %s", taintKey, node.Name)
×
428
                }
429
        }
430
        return nil
×
431
}
432

433
func (agent *HostAgent) ScheduleSync(syncType string) {
1✔
434
        if syncType == "eps" {
2✔
435
                agent.epSyncQueue.AddRateLimited(syncType)
1✔
436
        } else if syncType == "ports" {
3✔
437
                agent.portSyncQueue.AddRateLimited(syncType)
1✔
438
        } else if syncType == "hpp" {
2✔
439
                agent.hppLocalMoSyncQueue.AddRateLimited(syncType)
×
440
        } else {
1✔
441
                agent.syncQueue.AddRateLimited(syncType)
1✔
442
        }
1✔
443
}
444

445
func (agent *HostAgent) scheduleSyncEps() {
1✔
446
        agent.ScheduleSync("eps")
1✔
447
        agent.scheduleSyncNodePodIfs()
1✔
448
}
1✔
449

450
func (agent *HostAgent) scheduleSyncServices() {
1✔
451
        agent.ScheduleSync("services")
1✔
452
}
1✔
453

454
func (agent *HostAgent) scheduleSyncSnats() {
1✔
455
        agent.ScheduleSync("snat")
1✔
456
}
1✔
457

458
func (agent *HostAgent) scheduleSyncOpflexServer() {
×
459
        agent.ScheduleSync("opflexServer")
×
460
}
×
461
func (agent *HostAgent) scheduleSyncNodeInfo() {
1✔
462
        agent.ScheduleSync("snatnodeInfo")
1✔
463
}
1✔
464
func (agent *HostAgent) scheduleSyncRdConfig() {
1✔
465
        agent.ScheduleSync("rdconfig")
1✔
466
}
1✔
467
func (agent *HostAgent) scheduleSyncLocalInfo() {
1✔
468
        agent.ScheduleSync("snatLocalInfo")
1✔
469
}
1✔
470
func (agent *HostAgent) scheduleSyncNodePodIfs() {
1✔
471
        agent.ScheduleSync("nodepodifs")
1✔
472
}
1✔
473
func (agent *HostAgent) scheduleSyncPorts() {
1✔
474
        agent.ScheduleSync("ports")
1✔
475
}
1✔
476
func (agent *HostAgent) scheduleSyncLocalHppMo() {
×
477
        agent.ScheduleSync("hpp")
×
478
}
×
479

480
func (agent *HostAgent) watchRebootConf(stopCh <-chan struct{}) {
×
481
        watcher, err := fsnotify.NewWatcher()
×
482
        if err != nil {
×
483
                panic(err)
×
484
        }
485
        defer watcher.Close()
×
486
        if err := watcher.Add("/usr/local/var/lib/opflex-agent-ovs/reboot-conf.d/reboot.conf"); err != nil {
×
487
                panic(err)
×
488
        }
489
        for {
×
490
                select {
×
491
                // watch for events
492
                case event := <-watcher.Events:
×
493
                        agent.log.Info("Reloading aci-containers-host because of an event in /usr/local/var/lib/opflex-agent-ovs/reboot-conf.d/reboot.conf : ", event)
×
494
                        os.Exit(0)
×
495

496
                        // watch for errors
497
                case err := <-watcher.Errors:
×
498
                        agent.log.Error("ERROR: ", err)
×
499

500
                case <-stopCh:
×
501
                        return
×
502
                }
503
        }
504
}
505

506
func (agent *HostAgent) runTickers(stopCh <-chan struct{}) {
1✔
507
        ticker := time.NewTicker(time.Second * 30)
1✔
508
        defer ticker.Stop()
1✔
509

1✔
510
        for {
2✔
511
                select {
1✔
512
                case <-ticker.C:
1✔
513
                        agent.updateOpflexConfig()
1✔
514
                case <-stopCh:
1✔
515
                        return
1✔
516
                }
517
        }
518
}
519

520
func (agent *HostAgent) checkSyncProcessorsCompletionStatus(stopCh <-chan struct{}) {
×
521
        ticker := time.NewTicker(time.Second)
×
522
        defer ticker.Stop()
×
523

×
524
        kubeClient := agent.env.(*K8sEnvironment).kubeClient
×
525
        for {
×
526
                select {
×
527
                case <-ticker.C:
×
528
                        node, err := kubeClient.CoreV1().Nodes().Get(context.TODO(), os.Getenv("KUBERNETES_NODE_NAME"), metav1.GetOptions{})
×
529
                        if err != nil {
×
530
                                continue
×
531
                        }
532
                        for _, taint := range node.Spec.Taints {
×
533
                                if taint.Key == ACIContainersTaintName && taint.Effect == v1.TaintEffectNoSchedule {
×
534
                                        removeTaint := false
×
535
                                        if agent.taintRemoved.Load().(bool) {
×
536
                                                removeTaint = true
×
537
                                        } else {
×
538
                                                agent.indexMutex.Lock()
×
539
                                                count := len(agent.completedSyncTypes)
×
540
                                                agent.indexMutex.Unlock()
×
541
                                                if count == 5 {
×
542
                                                        removeTaint = true
×
543
                                                }
×
544
                                        }
545

546
                                        if removeTaint {
×
547
                                                err := agent.removeTaintIfNodeReady(node, ACIContainersTaintName, kubeClient)
×
548
                                                if err != nil {
×
549
                                                        agent.log.Errorf("Failed to remove taint: %v", err)
×
550
                                                        continue
×
551
                                                }
552
                                                agent.taintRemoved.Store(true)
×
553
                                        }
554
                                }
555
                        }
556
                case <-stopCh:
×
557
                        return
×
558
                }
559
        }
560
}
561

562
func (agent *HostAgent) processSyncQueue(queue workqueue.RateLimitingInterface,
563
        queueStop <-chan struct{}) {
1✔
564
        go wait.Until(func() {
2✔
565
                for {
2✔
566
                        syncType, quit := queue.Get()
1✔
567
                        if quit {
2✔
568
                                break
1✔
569
                        }
570

571
                        var requeue bool
1✔
572
                        if sType, ok := syncType.(string); ok {
2✔
573
                                if f, ok := agent.syncProcessors[sType]; ok {
2✔
574
                                        requeue = f()
1✔
575

1✔
576
                                        switch sType {
1✔
577
                                        case "services", "eps", "snat", "snatnodeInfo", "nodepodifs":
1✔
578
                                                if agent.taintRemoved.Load().(bool) {
2✔
579
                                                        break
1✔
580
                                                }
581
                                                agent.indexMutex.Lock()
×
582
                                                agent.completedSyncTypes[sType] = struct{}{}
×
583
                                                agent.indexMutex.Unlock()
×
584
                                        }
585
                                }
586
                        }
587
                        if requeue {
1✔
588
                                queue.AddRateLimited(syncType)
×
589
                        } else {
1✔
590
                                queue.Forget(syncType)
1✔
591
                        }
1✔
592
                        queue.Done(syncType)
1✔
593
                }
594
        }, time.Second, queueStop)
595
        <-queueStop
1✔
596
        queue.ShutDown()
1✔
597
}
598

599
func (agent *HostAgent) EnableSync() (changed bool) {
1✔
600
        changed = false
1✔
601
        agent.indexMutex.Lock()
1✔
602
        if !agent.syncEnabled {
2✔
603
                agent.syncEnabled = true
1✔
604
                changed = true
1✔
605
        }
1✔
606
        agent.indexMutex.Unlock()
1✔
607
        if changed {
2✔
608
                agent.log.Info("Enabling OpFlex endpoint and service sync")
1✔
609
                agent.scheduleSyncServices()
1✔
610
                agent.scheduleSyncEps()
1✔
611
                agent.scheduleSyncSnats()
1✔
612
                agent.scheduleSyncNodeInfo()
1✔
613
                agent.scheduleSyncNodePodIfs()
1✔
614
        }
1✔
615
        return
1✔
616
}
617

618
func (agent *HostAgent) Run(stopCh <-chan struct{}) {
1✔
619
        err := agent.populateSnatLocalInfos()
1✔
620
        if err != nil {
1✔
621
                agent.log.Error("Failed to populate opflexSnatLocalInfos ", err.Error())
×
622
                panic(err.Error())
×
623
        }
624
        if agent.integ_test == nil {
1✔
625
                err = agent.updateResetConfFile()
×
626
                if err != nil {
×
627
                        agent.log.Error("Failed to create reset.conf ", err.Error())
×
628
                        panic(err.Error())
×
629
                }
630
        }
631
        if agent.config.TaintNotReadyNode {
1✔
632
                agent.taintRemoved.Store(false)
×
633
                go agent.checkSyncProcessorsCompletionStatus(stopCh)
×
634
        } else {
1✔
635
                agent.taintRemoved.Store(true)
1✔
636
        }
1✔
637
        syncEnabled, err := agent.env.PrepareRun(stopCh)
1✔
638
        if err != nil {
1✔
639
                panic(err.Error())
×
640
        }
641
        if agent.config.OpFlexEndpointDir == "" ||
1✔
642
                agent.config.OpFlexServiceDir == "" ||
1✔
643
                agent.config.OpFlexSnatDir == "" {
2✔
644
                if agent.config.EnableHppDirect && agent.config.OpFlexNetPolDir == "" {
1✔
645
                        agent.log.Warn("OpFlex endpoint, service, snat or netpol directories not set")
×
646
                } else {
1✔
647
                        agent.log.Warn("OpFlex endpoint, service, snat directories not set")
1✔
648
                }
1✔
649
        } else {
1✔
650
                if syncEnabled {
2✔
651
                        agent.EnableSync()
1✔
652
                }
1✔
653
                go agent.processSyncQueue(agent.syncQueue, stopCh)
1✔
654
                go agent.processSyncQueue(agent.epSyncQueue, stopCh)
1✔
655
                go agent.processSyncQueue(agent.portSyncQueue, stopCh)
1✔
656
                go agent.processSyncQueue(agent.hppLocalMoSyncQueue, stopCh)
1✔
657
        }
658
        if agent.config.ChainedMode {
2✔
659
                agent.FabricDiscoveryCollectDiscoveryData(stopCh)
1✔
660
        }
1✔
661
        agent.log.Info("Starting endpoint RPC")
1✔
662
        err = agent.runEpRPC(stopCh)
1✔
663
        if err != nil {
1✔
664
                panic(err.Error())
×
665
        }
666

667
        agent.cleanupSetup()
1✔
668
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc