• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / sig-storage-local-static-provisioner / 16065277026

04 Jul 2025 03:37AM UTC coverage: 50.79% (-0.4%) from 51.195%
16065277026

push

github

web-flow
Merge pull request #501 from saidjawad/support_for_karpenter_start_up_taint

Support for removing start up taint

32 of 90 new or added lines in 4 files covered. (35.56%)

1 existing line in 1 file now uncovered.

996 of 1961 relevant lines covered (50.79%)

6.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

11.21
/pkg/controller/controller.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controller
18

19
import (
20
        "fmt"
21
        "math/rand"
22
        "net/http"
23
        "time"
24

25
        "k8s.io/klog/v2"
26

27
        "sigs.k8s.io/sig-storage-local-static-provisioner/pkg/cache"
28
        "sigs.k8s.io/sig-storage-local-static-provisioner/pkg/common"
29
        "sigs.k8s.io/sig-storage-local-static-provisioner/pkg/deleter"
30
        "sigs.k8s.io/sig-storage-local-static-provisioner/pkg/discovery"
31
        nodetaint "sigs.k8s.io/sig-storage-local-static-provisioner/pkg/node-taint"
32
        "sigs.k8s.io/sig-storage-local-static-provisioner/pkg/populator"
33
        "sigs.k8s.io/sig-storage-local-static-provisioner/pkg/util"
34

35
        v1 "k8s.io/api/core/v1"
36
        "k8s.io/apimachinery/pkg/util/wait"
37
        "k8s.io/apiserver/pkg/server/healthz"
38
        "k8s.io/client-go/informers"
39
        "k8s.io/client-go/kubernetes"
40
        "k8s.io/client-go/kubernetes/scheme"
41
        v1core "k8s.io/client-go/kubernetes/typed/core/v1"
42
        "k8s.io/client-go/tools/record"
43
        "k8s.io/utils/mount"
44
)
45

46
// signal represents an indication to from client to terminate a service and waits for a callback
47
// indicating that the service has successfully stopped.
48
type signal struct {
49
        closing chan chan struct{}
50
}
51

52
func newSignal() *signal {
1✔
53
        return &signal{
1✔
54
                closing: make(chan chan struct{}),
1✔
55
        }
1✔
56
}
1✔
57

58
func (s *signal) stop() {
1✔
59
        stopped := make(chan struct{})
1✔
60
        s.closing <- stopped
1✔
61
        <-stopped
1✔
62
}
1✔
63

64
func (s *signal) close() {
1✔
65
        close(s.closing)
1✔
66
}
1✔
67

68
// RunLocalController facilitates and manages the sync loop.
69
// It launches the main sync loop and if there is an updated configuration from the ConfigWatcher,
70
// it will inform the main sync loop to terminate and then will launch a new sync loop with the
71
// updated configuration.
72
func RunLocalController(configUpdate <-chan common.ProvisionerConfiguration, client *kubernetes.Clientset, ptable deleter.ProcTable, discoveryPeriod time.Duration, node *v1.Node, namespace, jobImage string, config common.ProvisionerConfiguration) {
×
73
        s := newSignal()
×
74
        defer s.close()
×
75

×
76
        startController := func(config common.ProvisionerConfiguration) {
×
77
                StartLocalController(s, client, ptable, discoveryPeriod, common.UserConfigFromProvisionerConfig(node, namespace, jobImage, config))
×
78
        }
×
79
        go startController(config)
×
80

×
81
        for {
×
82
                select {
×
83
                case newConfig := <-configUpdate:
×
84
                        s.stop()
×
85
                        go startController(newConfig)
×
86
                }
87
        }
88
}
89

90
// StartLocalController starts the sync loop for the local PV discovery and deleter
91
func StartLocalController(signal *signal, client *kubernetes.Clientset, ptable deleter.ProcTable, discoveryPeriod time.Duration, config *common.UserConfig) {
×
92
        klog.Info("Initializing volume cache\n")
×
93

×
94
        informerStopChan := make(chan struct{})
×
95
        jobControllerStopChan := make(chan struct{})
×
96

×
97
        var provisionerName string
×
98
        if config.UseNodeNameOnly {
×
99
                provisionerName = fmt.Sprintf("local-volume-provisioner-%v", config.Node.Name)
×
100
        } else {
×
101
                provisionerName = fmt.Sprintf("local-volume-provisioner-%v-%v", config.Node.Name, config.Node.UID)
×
102
        }
×
103

104
        broadcaster := record.NewBroadcaster()
×
105
        broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.CoreV1().RESTClient()).Events("")})
×
106
        recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: provisionerName})
×
107

×
108
        // We choose a random resync period between MinResyncPeriod and 2 *
×
109
        // MinResyncPeriod, so that local provisioners deployed on multiple nodes
×
110
        // at same time don't list the apiserver simultaneously.
×
111
        resyncPeriod := time.Duration(config.MinResyncPeriod.Seconds()*(1+rand.Float64())) * time.Second
×
112

×
113
        var err error
×
114
        volumeUtil, err := util.NewVolumeUtil()
×
115
        if err != nil {
×
116
                klog.Fatalf("Error initializing VolumeUtil: %v", err)
×
117
        }
×
118

119
        runtimeConfig := &common.RuntimeConfig{
×
120
                UserConfig:      config,
×
121
                Cache:           cache.NewVolumeCache(),
×
122
                VolUtil:         volumeUtil,
×
123
                APIUtil:         util.NewAPIUtil(client),
×
124
                Client:          client,
×
125
                Name:            provisionerName,
×
126
                Recorder:        recorder,
×
127
                Mounter:         mount.New("" /* default mount path */),
×
128
                InformerFactory: informers.NewSharedInformerFactory(client, resyncPeriod),
×
129
        }
×
130

×
131
        populator.NewPopulator(runtimeConfig)
×
132

×
133
        var jobController deleter.JobController
×
134
        if runtimeConfig.UseJobForCleaning {
×
135
                labels := map[string]string{common.NodeNameLabel: config.Node.Name}
×
136
                jobController, err = deleter.NewJobController(labels, runtimeConfig)
×
137
                if err != nil {
×
138
                        klog.Fatalf("Error initializing jobController: %v", err)
×
139
                }
×
140
                klog.Infof("Enabling Jobs based cleaning.")
×
141
        }
142
        cleanupTracker := &deleter.CleanupStatusTracker{ProcTable: ptable, JobController: jobController}
×
143

×
144
        discoverer, err := discovery.NewDiscoverer(runtimeConfig, cleanupTracker)
×
145
        if err != nil {
×
146
                klog.Fatalf("Error initializing discoverer: %v", err)
×
147
        }
×
148
        healthz.InstallPathHandler(http.DefaultServeMux, "/ready", discoverer.Readyz)
×
149

×
150
        deleter := deleter.NewDeleter(runtimeConfig, cleanupTracker)
×
151

×
152
        // Start informers after all event listeners are registered.
×
153
        runtimeConfig.InformerFactory.Start(informerStopChan)
×
154
        // Wait for all started informers' cache were synced.
×
155
        for v, synced := range runtimeConfig.InformerFactory.WaitForCacheSync(wait.NeverStop) {
×
156
                if !synced {
×
157
                        klog.Fatalf("Error syncing informer for %v", v)
×
158
                }
×
159
        }
160
        // Run controller logic.
161
        if jobController != nil {
×
162
                go jobController.Run(jobControllerStopChan)
×
163
        }
×
164
        klog.Info("Controller started\n")
×
NEW
165

×
NEW
166
        nodeTaintRemover := nodetaint.NewRemover(runtimeConfig)
×
NEW
167

×
168
        for {
×
169
                select {
×
170
                case stopped := <-signal.closing:
×
171
                        close(informerStopChan)
×
172
                        if jobController != nil {
×
173
                                close(jobControllerStopChan)
×
174
                        }
×
175
                        stopped <- struct{}{}
×
176
                        klog.Info("Controller stopped\n")
×
177
                        return
×
178
                default:
×
179
                        deleter.DeletePVs()
×
180
                        discoverer.DiscoverLocalVolumes()
×
NEW
181
                        if !nodeTaintRemover.ShouldRemoveTaint() && discoverer.Readyz.Check(nil) == nil {
×
NEW
182
                                nodeTaintRemover.RemoveTaintWithBackoff()
×
NEW
183
                        }
×
UNCOV
184
                        time.Sleep(discoveryPeriod)
×
185
                }
186
        }
187
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc