• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

turbonomic / kubeturbo / #607817138

11 Aug 2023 06:30PM UTC coverage: 41.683%. First build
#607817138

Pull #918

travis-ci

Pull Request #918: [TRB-44977] Pre-Scale Execution Condition Checks for Safe Node Scalin…

29 of 29 new or added lines in 2 files covered. (100.0%)

7350 of 17633 relevant lines covered (41.68%)

48212.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

13.74
/cmd/kubeturbo/app/kubeturbo_builder.go
1
package app
2

3
import (
4
        "context"
5
        "fmt"
6
        "net"
7
        "net/http"
8
        "net/http/pprof"
9
        "os"
10
        "os/signal"
11
        "strconv"
12
        "sync"
13
        "syscall"
14
        "time"
15

16
        set "github.com/deckarep/golang-set"
17
        "github.com/fsnotify/fsnotify"
18
        "github.com/golang/glog"
19
        osclient "github.com/openshift/client-go/apps/clientset/versioned"
20
        machineClient "github.com/openshift/client-go/machine/clientset/versioned"
21
        "github.com/prometheus/client_golang/prometheus/promhttp"
22
        "github.com/spf13/pflag"
23
        "github.com/spf13/viper"
24
        apiv1 "k8s.io/api/core/v1"
25
        apierrors "k8s.io/apimachinery/pkg/api/errors"
26
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27
        "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
28
        "k8s.io/apimachinery/pkg/runtime"
29
        "k8s.io/apimachinery/pkg/runtime/schema"
30
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
31
        versionhelper "k8s.io/apimachinery/pkg/version"
32
        "k8s.io/apiserver/pkg/server/healthz"
33
        utilfeature "k8s.io/apiserver/pkg/util/feature"
34
        "k8s.io/client-go/dynamic"
35
        "k8s.io/client-go/kubernetes"
36
        "k8s.io/client-go/kubernetes/scheme"
37
        v1core "k8s.io/client-go/kubernetes/typed/core/v1"
38
        restclient "k8s.io/client-go/rest"
39
        "k8s.io/client-go/tools/clientcmd"
40
        "k8s.io/client-go/tools/record"
41
        runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
42

43
        gitopsv1alpha1 "github.ibm.com/turbonomic/turbo-gitops/api/v1alpha1"
44
        policyv1alpha1 "github.ibm.com/turbonomic/turbo-policy/api/v1alpha1"
45

46
        kubeturbo "github.ibm.com/turbonomic/kubeturbo/pkg"
47
        "github.ibm.com/turbonomic/kubeturbo/pkg/action/executor/gitops"
48
        "github.ibm.com/turbonomic/kubeturbo/pkg/discovery/configs"
49
        "github.ibm.com/turbonomic/kubeturbo/pkg/discovery/processor"
50
        nodeUtil "github.ibm.com/turbonomic/kubeturbo/pkg/discovery/util"
51
        "github.ibm.com/turbonomic/kubeturbo/pkg/discovery/worker"
52
        agg "github.ibm.com/turbonomic/kubeturbo/pkg/discovery/worker/aggregation"
53
        "github.ibm.com/turbonomic/kubeturbo/pkg/features"
54
        "github.ibm.com/turbonomic/kubeturbo/pkg/kubeclient"
55
        "github.ibm.com/turbonomic/kubeturbo/pkg/resourcemapping"
56
        "github.ibm.com/turbonomic/kubeturbo/pkg/util"
57
        "github.ibm.com/turbonomic/kubeturbo/test/flag"
58
)
59

60
const (
61
        // The default port for vmt service server
62
        KubeturboPort                     = 10265
63
        DefaultKubeletPort                = 10255
64
        DefaultKubeletHttps               = false
65
        defaultVMPriority                 = -1
66
        defaultVMIsBase                   = true
67
        defaultDiscoveryIntervalSec       = 600
68
        DefaultValidationWorkers          = 10
69
        DefaultValidationTimeout          = 60
70
        DefaultDiscoveryWorkers           = 10
71
        DefaultDiscoveryTimeoutSec        = 180
72
        DefaultDiscoverySamples           = 10
73
        DefaultDiscoverySampleIntervalSec = 60
74
        DefaultGCIntervalMin              = 10
75
        DefaultReadinessRetryThreshold    = 60
76
)
77

78
var (
79
        defaultSccSupport = []string{"*"}
80

81
        // these variables will be deprecated. Keep it here for backward compatibility only
82
        k8sVersion        = "1.8"
83
        noneSchedulerName = "turbo-no-scheduler"
84

85
        // custom resource scheme for controller runtime client
86
        customScheme = runtime.NewScheme()
87
)
88

89
type cleanUp func()
90

1✔
91
func init() {
1✔
92
        // Add registered custom types to the custom scheme
1✔
93
        utilruntime.Must(policyv1alpha1.AddToScheme(customScheme))
1✔
94
        utilruntime.Must(gitopsv1alpha1.AddToScheme(customScheme))
1✔
95
}
96

97
// VMTServer has all the context and params needed to run a Scheduler
98
// TODO: leaderElection is disabled now because of dependency problems.
99
type VMTServer struct {
100
        Port                 int
101
        Address              string
102
        Master               string
103
        K8sTAPSpec           string
104
        TestingFlagPath      string
105
        KubeConfig           string
106
        BindPodsQPS          float32
107
        BindPodsBurst        int
108
        DiscoveryIntervalSec int
109
        K8sTAPService        kubeturbo.K8sTAPService
110

111
        // LeaderElection componentconfig.LeaderElectionConfiguration
112

113
        EnableProfiling bool
114

115
        // To stitch the Nodes in Kubernetes cluster with the VM from the underlying cloud or
116
        // hypervisor infrastructure: either use VM UUID or VM IP.
117
        // If the underlying infrastructure is VMWare, AWS instances, or Azure instances, VM's UUID is used.
118
        UseUUID bool
119

120
        // VMPriority: priority of VM in supplyChain definition from kubeturbo, should be less than 0;
121
        VMPriority int32
122
        // VMIsBase: Is VM is the base template from kubeturbo, when stitching with other VM probes, should be false;
123
        VMIsBase bool
124

125
        // Kubelet related config
126
        KubeletPort          int
127
        EnableKubeletHttps   bool
128
        UseNodeProxyEndpoint bool
129

130
        // The cluster processor related config
131
        ValidationWorkers int
132
        ValidationTimeout int
133

134
        // Discovery related config
135
        DiscoveryWorkers    int
136
        DiscoveryTimeoutSec int
137

138
        // Data sampling discovery related config
139
        DiscoverySamples           int
140
        DiscoverySampleIntervalSec int
141

142
        // Garbage collection (leaked pods) interval config
143
        GCIntervalMin int
144

145
        // Number of workload controller items the list api call should request for
146
        ItemsPerListQuery int
147

148
        // The Openshift SCC list allowed for action execution
149
        sccSupport []string
150

151
        // Injected Cluster Key to enable pod move across cluster
152
        ClusterKeyInjected string
153

154
        // Force the use of self-signed certificates.
155
        // The default is true.
156
        ForceSelfSignedCerts bool
157

158
        // Don't try to move pods which have volumes attached
159
        // If set to false kubeturbo can still try to move such pods.
160
        FailVolumePodMoves bool
161

162
        // Try to update namespace quotas to allow pod moves when one or
163
        // more quota(s) is/are full.
164
        UpdateQuotaToAllowMoves bool
165

166
        // The Cluster API namespace
167
        ClusterAPINamespace string
168

169
        // Busybox image uri used for cpufreq getter job
170
        BusyboxImage string
171

172
        // Name of the secret that stores the image pull credentials of cpu freq getter job image
173
        BusyboxImagePullSecret string
174

175
        // CpufreqJobExcludeNodeLabels is used to specify node labels for nodes to be
176
        // excluded from running cpufreq job
177
        CpufreqJobExcludeNodeLabels string
178

179
        // Strategy to aggregate Container utilization data on ContainerSpec entity
180
        containerUtilizationDataAggStrategy string
181
        // Strategy to aggregate Container usage data on ContainerSpec entity
182
        containerUsageDataAggStrategy string
183
        // Total number of retrys. When a pod is not ready, Kubeturbo will try failureThreshold times before giving up
184
        readinessRetryThreshold int
185
        // Git configuration for gitops based action execution
186
        gitConfig gitops.GitConfig
187

188
        // Cpu frequency getter, used to replace busybox
189
        CpuFrequencyGetterImage string
190
        // Name of the secret that stores the image pull credentials of cpu freq getter job image
191
        CpuFrequencyGetterPullSecret string
192
        // Cleanup resources created in the SCC impersonation
193
        CleanupSccRelatedResources bool
194
        // Skip creating resources for the SCC impersonation
195
        SkipCreatingSccRelatedResources bool
×
196
}
×
197

×
198
// NewVMTServer creates a new VMTServer with default parameters
×
199
func NewVMTServer() *VMTServer {
×
200
        s := VMTServer{
×
201
                Port:       KubeturboPort,
×
202
                Address:    "127.0.0.1",
×
203
                VMPriority: defaultVMPriority,
×
204
                VMIsBase:   defaultVMIsBase,
205
        }
206
        return &s
1✔
207
}
1✔
208

1✔
209
// AddFlags adds flags for a specific VMTServer to the specified FlagSet
1✔
210
func (s *VMTServer) AddFlags(fs *pflag.FlagSet) {
1✔
211
        fs.StringVar(&s.ClusterKeyInjected, "cluster-key-injected", "", "Injected cluster key to enable pod move across cluster")
1✔
212
        fs.IntVar(&s.Port, "port", s.Port, "The port that kubeturbo's http service runs on.")
1✔
213
        fs.StringVar(&s.Address, "ip", s.Address, "the ip address that kubeturbo's http service runs on.")
1✔
214
        // TODO: The flagset that is included by vendoring k8s uses the same names i.e. "master" and "kubeconfig".
1✔
215
        // This for some reason conflicts with the names introduced by kubeturbo after upgrading the k8s vendored code
1✔
216
        // to version 1.19.1. Right now we have changed the names of kubeturbo flags as a quick fix. These flags are
1✔
217
        // not user facing and are useful only when running kubeturbo outside the cluster. Find a better solution
1✔
218
        // when need be.
1✔
219
        fs.StringVar(&s.Master, "k8s-master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig).")
1✔
220
        fs.StringVar(&s.K8sTAPSpec, "turboconfig", s.K8sTAPSpec, "Path to the config file.")
1✔
221
        fs.StringVar(&s.TestingFlagPath, "testingflag", s.TestingFlagPath, "Path to the testing flag.")
1✔
222
        fs.StringVar(&s.KubeConfig, "k8s-kubeconfig", s.KubeConfig, "Path to kubeconfig file with authorization and master location information.")
1✔
223
        fs.BoolVar(&s.EnableProfiling, "profiling", false, "Enable profiling via web interface host:port/debug/pprof/.")
1✔
224
        fs.BoolVar(&s.UseUUID, "stitch-uuid", true, "Use VirtualMachine's UUID to do stitching, otherwise IP is used.")
1✔
225
        fs.IntVar(&s.KubeletPort, "kubelet-port", DefaultKubeletPort, "The port of the kubelet runs on.")
1✔
226
        fs.BoolVar(&s.EnableKubeletHttps, "kubelet-https", DefaultKubeletHttps, "Indicate if Kubelet is running on https server.")
1✔
227
        fs.BoolVar(&s.UseNodeProxyEndpoint, "use-node-proxy-endpoint", false, "Indicate if Kubelet queries should be routed through APIServer node proxy endpoint.")
1✔
228
        fs.BoolVar(&s.ForceSelfSignedCerts, "kubelet-force-selfsigned-cert", true, "Indicate if we must use self-signed cert.")
1✔
229
        fs.BoolVar(&s.FailVolumePodMoves, "fail-volume-pod-moves", true, "Indicate if kubeturbo should fail to move pods which have volumes attached. Default is set to true.")
1✔
230
        fs.BoolVar(&s.UpdateQuotaToAllowMoves, "update-quota-to-allow-moves", true, "Indicate if kubeturbo should try to update namespace quotas to allow pod moves when quota(s) is/are full. Default is set to true.")
1✔
231
        fs.StringVar(&k8sVersion, "k8sVersion", k8sVersion, "[deprecated] the kubernetes server version; for openshift, it is the underlying Kubernetes' version.")
1✔
232
        fs.StringVar(&noneSchedulerName, "noneSchedulerName", noneSchedulerName, "[deprecated] a none-exist scheduler name, to prevent controller to create Running pods during move Action.")
1✔
233
        fs.IntVar(&s.DiscoveryIntervalSec, "discovery-interval-sec", defaultDiscoveryIntervalSec, "The discovery interval in seconds.")
1✔
234
        fs.IntVar(&s.ValidationWorkers, "validation-workers", DefaultValidationWorkers, "The validation workers")
1✔
235
        fs.IntVar(&s.ValidationTimeout, "validation-timeout-sec", DefaultValidationTimeout, "The validation timeout in seconds.")
1✔
236
        fs.IntVar(&s.DiscoveryWorkers, "discovery-workers", DefaultDiscoveryWorkers, "The number of discovery workers.")
1✔
237
        fs.IntVar(&s.DiscoveryTimeoutSec, "discovery-timeout-sec", DefaultDiscoveryTimeoutSec, "The discovery timeout in seconds for each discovery worker.")
1✔
238
        fs.IntVar(&s.DiscoverySamples, "discovery-samples", DefaultDiscoverySamples, "The number of resource usage data samples to be collected from kubelet in each full discovery cycle. This should be no larger than 60.")
1✔
239
        fs.IntVar(&s.DiscoverySampleIntervalSec, "discovery-sample-interval", DefaultDiscoverySampleIntervalSec, "The discovery interval in seconds to collect additional resource usage data samples from kubelet. This should be no smaller than 10 seconds.")
1✔
240
        fs.IntVar(&s.GCIntervalMin, "garbage-collection-interval", DefaultGCIntervalMin, "The garbage collection interval in minutes for possible leaked pods from actions failed because of kubeturbo restarts. Default value is 20 mins.")
1✔
241
        fs.IntVar(&s.ItemsPerListQuery, "items-per-list-query", 0, "Number of workload controller items the list api call should request for.")
1✔
242
        fs.StringSliceVar(&s.sccSupport, "scc-support", defaultSccSupport, "The SCC list allowed for executing pod actions, e.g., --scc-support=restricted,anyuid or --scc-support=* to allow all. Default allowed scc is [*].")
1✔
243
        // So far we have noticed cluster api support only in openshift clusters and our implementation works only for openshift
1✔
244
        // It thus makes sense to have openshifts machine api namespace as our default cluster api namespace
1✔
245
        fs.StringVar(&s.ClusterAPINamespace, "cluster-api-namespace", "openshift-machine-api", "The Cluster API namespace.")
1✔
246
        fs.StringVar(&s.BusyboxImage, "busybox-image", "busybox", "The complete image uri used for fallback node cpu frequency getter job.")
1✔
247
        fs.StringVar(&s.BusyboxImagePullSecret, "busybox-image-pull-secret", "", "The name of the secret that stores the image pull credentials for busybox image.")
1✔
248
        fs.StringVar(&s.CpufreqJobExcludeNodeLabels, "cpufreq-job-exclude-node-labels", "", "The comma separated list of key=value node label pairs for the nodes (for example windows nodes) to be excluded from running job based cpufrequency getter.")
1✔
249
        fs.StringVar(&s.containerUtilizationDataAggStrategy, "cnt-utilization-data-agg-strategy", agg.DefaultContainerUtilizationDataAggStrategy, "Container utilization data aggregation strategy.")
1✔
250
        fs.StringVar(&s.containerUsageDataAggStrategy, "cnt-usage-data-agg-strategy", agg.DefaultContainerUsageDataAggStrategy, "Container usage data aggregation strategy.")
1✔
251
        fs.IntVar(&s.readinessRetryThreshold, "readiness-retry-threshold", DefaultReadinessRetryThreshold, "When the pod readiness check fails, Kubeturbo will try readinessRetryThreshold times before giving up. Defaults to 60.")
1✔
252
        // Flags for gitops based action execution
1✔
253
        fs.StringVar(&s.gitConfig.GitSecretNamespace, "git-secret-namespace", "", "The namespace of the secret which holds the git credentials.")
1✔
254
        fs.StringVar(&s.gitConfig.GitSecretName, "git-secret-name", "", "The name of the secret which holds the git credentials.")
1✔
255
        fs.StringVar(&s.gitConfig.GitUsername, "git-username", "", "The user name to be used to push changes to git.")
1✔
256
        fs.StringVar(&s.gitConfig.GitEmail, "git-email", "", "The email to be used to push changes to git.")
1✔
257
        fs.StringVar(&s.gitConfig.CommitMode, "git-commit-mode", "direct", "The commit mode that should be used for git action executions. One of request|direct. Defaults to direct.")
1✔
258
        // CpuFreqGetter image and secret
1✔
259
        fs.StringVar(&s.CpuFrequencyGetterImage, "cpufreqgetter-image", "icr.io/cpopen/turbonomic/cpufreqgetter", "The complete cpufreqgetter image uri used for fallback node cpu frequency getter job.")
260
        fs.StringVar(&s.CpuFrequencyGetterPullSecret, "cpufreqgetter-image-pull-secret", "", "The name of the secret that stores the image pull credentials for cpufreqgetter image.")
261
        fs.BoolVar(&s.CleanupSccRelatedResources, "cleanup-scc-impersonation-resources", true, "Enable cleanup the resources for scc impersonation.")
×
262
        fs.BoolVar(&s.SkipCreatingSccRelatedResources, "skip-creating-scc-impersonation-resources", false, "Skip creating the resources for scc impersonation.")
×
263
        fs.String("satellite-location-provider", "", "The IBM cloud satellite location provider, it only supports azure as of today. Defaults to none.")
×
264
}
×
265

×
266
// create an eventRecorder to send events to Kubernetes APIserver
×
267
func createRecorder(kubecli *kubernetes.Clientset) record.EventRecorder {
×
268
        // Create a new broadcaster which will send events we generate to the apiserver
×
269
        eventBroadcaster := record.NewBroadcaster()
×
270
        eventBroadcaster.StartLogging(glog.Infof)
×
271
        eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
×
272
                Interface: v1core.New(kubecli.CoreV1().RESTClient()).Events(apiv1.NamespaceAll),
273
        })
×
274
        // this EventRecorder can be used to send events to this EventBroadcaster
×
275
        // with the given event source.
×
276
        return eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "kubeturbo"})
×
277
}
×
278

×
279
func (s *VMTServer) createKubeConfigOrDie() *restclient.Config {
280
        kubeConfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.KubeConfig)
×
281
        if err != nil {
×
282
                glog.Errorf("Fatal error: failed to get kubeconfig:  %s", err)
×
283
                os.Exit(1)
×
284
        }
285
        // This specifies the number and the max number of query per second to the api server.
286
        kubeConfig.QPS = 20.0
×
287
        kubeConfig.Burst = 30
×
288
        // Check if WireMock mode is enabled or not, if enabled, connect to WireMock
×
289
        if wiremockMode, wiremockSvcUrl := configs.GetCurrentWireMockMode(); wiremockMode {
×
290
                glog.Warningf("Kubeturbo is working on the WireMock mode. It's connecting to %v as the apiserver.", wiremockSvcUrl)
×
291
                kubeConfig.Host = wiremockSvcUrl
×
292
                // Force use-node-proxy-endpoint to true to make sure the kubelet scrape traffic will go through apiserver
293
                s.UseNodeProxyEndpoint = true
×
294
        }
295

296
        return kubeConfig
×
297
}
×
298

×
299
func (s *VMTServer) createKubeClientOrDie(kubeConfig *restclient.Config) *kubernetes.Clientset {
×
300
        kubeClient, err := kubernetes.NewForConfig(kubeConfig)
×
301
        if err != nil {
×
302
                glog.Errorf("Fatal error: failed to create kubeClient:%v", err)
303
                os.Exit(1)
304
        }
305

306
        return kubeClient
307
}
×
308

×
309
func (s *VMTServer) ensureBusyboxImageBackwardCompatibility() {
×
310
        if s.CpuFrequencyGetterImage == "icr.io/cpopen/turbonomic/cpufreqgetter" && s.BusyboxImage != "busybox" {
×
311
                // Somebody has set --busybox-image only explicitly, for example coming from an old configuration
×
312
                // we should use it
313
                s.CpuFrequencyGetterImage = s.BusyboxImage
314
        }
315
        // Other cases for example below, --cpufreqgetter-image value will always take precedence:
316
        // if s.CpuFrequencyGetterImage == "icr.io/cpopen/turbonomic/cpufreqgetter" && s.BusyboxImage == "busybox"
317
        // if s.CpuFrequencyGetterImage != "icr.io/cpopen/turbonomic/cpufreqgetter" && s.BusyboxImage != "busybox"
318
        // if s.CpuFrequencyGetterImage == "icr.io/cpopen/turbonomic/cpufreqgetter" && s.BusyboxImage != "busybox"
319

×
320
        if s.CpuFrequencyGetterPullSecret == "" && s.BusyboxImagePullSecret != "" {
×
321
                // Somebody has set --busybox-image-pull-secret        only explicitly, for example coming from an old configuration
×
322
                // we should use it
×
323
                s.CpuFrequencyGetterPullSecret = s.BusyboxImagePullSecret
×
324
        }
×
325
        // Other cases for example below, --cpufreqgetter-image-pull-secret value will always take precedence:
×
326
        // if s.CpuFrequencyGetterPullSecret != "" && s.BusyboxImagePullSecret != ""
×
327
        // if s.CpuFrequencyGetterPullSecret != "" && s.BusyboxImagePullSecret == ""
×
328
}
×
329

×
330
func (s *VMTServer) CreateKubeletClientOrDie(kubeConfig *restclient.Config, fallbackClient *kubernetes.Clientset,
331
        cpuFreqGetterImage, imagePullSecret string, cpufreqJobExcludeNodeLabels map[string]set.Set, useProxyEndpoint bool,
×
332
) *kubeclient.KubeletClient {
333
        kubeletClient, err := kubeclient.NewKubeletConfig(kubeConfig).
334
                WithPort(s.KubeletPort).
×
335
                EnableHttps(s.EnableKubeletHttps).
×
336
                ForceSelfSignedCerts(s.ForceSelfSignedCerts).
×
337
                // Timeout(to).
×
338
                Create(fallbackClient, cpuFreqGetterImage, imagePullSecret, cpufreqJobExcludeNodeLabels, useProxyEndpoint)
339
        if err != nil {
×
340
                glog.Errorf("Fatal error: failed to create kubeletClient: %v", err)
×
341
                os.Exit(1)
×
342
        }
343

×
344
        return kubeletClient
×
345
}
×
346

347
func (s *VMTServer) checkFlag() error {
×
348
        if s.KubeConfig == "" && s.Master == "" {
×
349
                glog.Warningf("Neither --kubeconfig nor --master was specified.  Using default API client.  This might not work.")
×
350
        }
×
351

352
        if s.Master != "" {
×
353
                glog.V(3).Infof("Master is %s", s.Master)
×
354
        }
×
355

356
        if s.TestingFlagPath != "" {
×
357
                flag.SetPath(s.TestingFlagPath)
×
358
        }
×
359

360
        ip := net.ParseIP(s.Address)
×
361
        if ip == nil {
362
                return fmt.Errorf("wrong ip format:%s", s.Address)
363
        }
364

×
365
        if s.Port < 1 {
×
366
                return fmt.Errorf("Port[%d] should be bigger than 0.", s.Port)
×
367
        }
×
368

369
        if s.KubeletPort < 1 {
×
370
                return fmt.Errorf("[KubeletPort[%d] should be bigger than 0.", s.KubeletPort)
×
371
        }
×
372

×
373
        return nil
×
374
}
×
375

×
376
// Run runs the specified VMTServer.  This should never exit.
×
377
func (s *VMTServer) Run() {
×
378
        if err := s.checkFlag(); err != nil {
×
379
                glog.Fatalf("Check flag failed: %v. Abort.", err.Error())
380
        }
381

×
382
        kubeConfig := s.createKubeConfigOrDie()
×
383
        glog.V(3).Infof("kubeConfig: %+v", kubeConfig)
×
384

×
385
        kubeClient := s.createKubeClientOrDie(kubeConfig)
386

387
        // Create controller runtime client that support custom resources
×
388
        runtimeClient, err := runtimeclient.New(kubeConfig, runtimeclient.Options{Scheme: customScheme})
×
389
        if err != nil {
×
390
                glog.Fatalf("Failed to create controller runtime client: %v.", err)
×
391
        }
392

×
393
        // Openshift client for deploymentconfig resize forced rollouts
×
394
        osClient, err := osclient.NewForConfig(kubeConfig)
×
395
        if err != nil {
×
396
                glog.Fatalf("Failed to generate openshift client for kubernetes target: %v", err)
×
397
        }
×
398

×
399
        // TODO: Replace dynamicClient with runtimeClient
×
400
        dynamicClient, err := dynamic.NewForConfig(kubeConfig)
×
401
        if err != nil {
×
402
                glog.Fatalf("Failed to generate dynamic client for kubernetes target: %v", err)
×
403
        }
×
404

×
405
        util.K8sAPIDeploymentGV, err = discoverk8sAPIResourceGV(kubeClient, util.DeploymentResName)
×
406
        if err != nil {
×
407
                glog.Warningf("Failure in discovering k8s deployment API group/version: %v", err.Error())
×
408
        }
×
409
        glog.V(2).Infof("Using group version %v for k8s deployments", util.K8sAPIDeploymentGV)
×
410

411
        util.K8sAPIReplicasetGV, err = discoverk8sAPIResourceGV(kubeClient, util.ReplicaSetResName)
×
412
        if err != nil {
×
413
                glog.Warningf("Failure in discovering k8s replicaset API group/version: %v", err.Error())
×
414
        }
×
415
        glog.V(2).Infof("Using group version %v for k8s replicasets", util.K8sAPIReplicasetGV)
×
416

417
        glog.V(3).Infof("Turbonomic config path is: %v", s.K8sTAPSpec)
418

×
419
        k8sTAPSpec, err := kubeturbo.ParseK8sTAPServiceSpec(s.K8sTAPSpec,
×
420
                kubeConfig.Host, kubeConfig, kubeClient, configs.CollectK8sTargetAndProbeInfo)
×
421
        if err != nil {
×
422
                glog.Fatalf("Failed to generate correct TAP config: %v", err.Error())
×
423
        }
×
424

×
425
        if k8sTAPSpec.FeatureGates != nil {
×
426
                err = utilfeature.DefaultMutableFeatureGate.SetFromMap(k8sTAPSpec.FeatureGates)
×
427
                if err != nil {
×
428
                        glog.Fatalf("Invalid Feature Gates: %v", err)
×
429
                }
×
430
        }
×
431

×
432
        if utilfeature.DefaultFeatureGate.Enabled(features.GoMemLimit) {
×
433
                glog.V(2).Info("Memory Optimisations are enabled.")
×
434
                // AUTOMEMLIMIT_DEBUG environment variable enables debug logging of AUTOMEMLIMIT
×
435
                // GoMemLimit will be set during the start of each discovery, see K8sDiscoveryClient.Discover,
×
436
                // as memory limit may change overtime
×
437
                _ = os.Setenv("AUTOMEMLIMIT_DEBUG", "true")
×
438
                if s.ItemsPerListQuery != 0 {
×
439
                        // Perform sanity check on user specified value of itemsPerListQuery
440
                        if s.ItemsPerListQuery < processor.DefaultItemsPerGiMemory {
×
441
                                var errMsg string
×
442
                                if s.ItemsPerListQuery < 0 {
×
443
                                        errMsg = "negative"
444
                                } else {
445
                                        errMsg = "set too low"
×
446
                                }
×
447
                                glog.Warningf("Argument --items-per-list-query is %s (%v). Setting it to the default value of %d.",
×
448
                                        errMsg, s.ItemsPerListQuery, processor.DefaultItemsPerGiMemory)
×
449
                                s.ItemsPerListQuery = processor.DefaultItemsPerGiMemory
×
450
                        } else {
×
451
                                glog.V(2).Infof("Set items per list API call to the user specified value: %v.", s.ItemsPerListQuery)
×
452
                        }
453
                }
×
454
        } else {
×
455
                glog.V(2).Info("Memory Optimisations are not enabled.")
×
456
        }
×
457

×
458
        excludeLabelsMap, err := nodeUtil.LabelMapFromNodeSelectorString(s.CpufreqJobExcludeNodeLabels)
×
459
        if err != nil {
×
460
                glog.Fatalf("Invalid cpu frequency exclude node label selectors: %v. The selectors "+
×
461
                        "should be a comma saperated list of key=value node label pairs", err)
462
        }
463

×
464
        s.ensureBusyboxImageBackwardCompatibility()
×
465
        kubeletClient := s.CreateKubeletClientOrDie(kubeConfig, kubeClient, s.CpuFrequencyGetterImage,
×
466
                s.CpuFrequencyGetterPullSecret, excludeLabelsMap, s.UseNodeProxyEndpoint)
×
467
        caClient, err := machineClient.NewForConfig(kubeConfig)
×
468
        if err != nil {
×
469
                glog.Errorf("Failed to generate correct TAP config: %v", err.Error())
×
470
                caClient = nil
×
471
        }
×
472

×
473
        // Interface to discover turbonomic ORM mappings (legacy and v2) for resize actions
×
474
        ormClientManager := resourcemapping.NewORMClientManager(dynamicClient, kubeConfig)
×
475

×
476
        // Configuration for creating the Kubeturbo TAP service
×
477
        vmtConfig := kubeturbo.NewVMTConfig2()
×
478
        vmtConfig.WithTapSpec(k8sTAPSpec).
×
479
                WithKubeClient(kubeClient).
×
480
                WithKubeConfig(kubeConfig).
×
481
                WithDynamicClient(dynamicClient).
×
482
                WithControllerRuntimeClient(runtimeClient).
×
483
                WithORMClientManager(ormClientManager).
×
484
                WithKubeletClient(kubeletClient).
×
485
                WithClusterAPIClient(caClient).
×
486
                WithOpenshiftClient(osClient).
×
487
                WithVMPriority(s.VMPriority).
×
488
                WithVMIsBase(s.VMIsBase).
×
489
                UsingUUIDStitch(s.UseUUID).
×
490
                WithDiscoveryInterval(s.DiscoveryIntervalSec).
×
491
                WithValidationTimeout(s.ValidationTimeout).
×
492
                WithValidationWorkers(s.ValidationWorkers).
×
493
                WithDiscoveryWorkers(s.DiscoveryWorkers).
×
494
                WithDiscoveryTimeout(s.DiscoveryTimeoutSec).
×
495
                WithDiscoverySamples(s.DiscoverySamples).
×
496
                WithDiscoverySampleIntervalSec(s.DiscoverySampleIntervalSec).
×
497
                WithSccSupport(s.sccSupport).
×
498
                WithCAPINamespace(s.ClusterAPINamespace).
×
499
                WithContainerUtilizationDataAggStrategy(s.containerUtilizationDataAggStrategy).
×
500
                WithContainerUsageDataAggStrategy(s.containerUsageDataAggStrategy).
×
501
                WithVolumePodMoveConfig(s.FailVolumePodMoves).
×
502
                WithQuotaUpdateConfig(s.UpdateQuotaToAllowMoves).
×
503
                WithReadinessRetryThreshold(s.readinessRetryThreshold).
×
504
                WithClusterKeyInjected(s.ClusterKeyInjected).
×
505
                WithItemsPerListQuery(s.ItemsPerListQuery)
×
506

×
507
        if utilfeature.DefaultFeatureGate.Enabled(features.GitopsApps) {
508
                vmtConfig.WithGitConfig(s.gitConfig)
×
509
        } else {
×
510
                if s.gitConfig.GitEmail != "" ||
×
511
                        s.gitConfig.GitSecretName != "" ||
×
512
                        s.gitConfig.GitSecretNamespace != "" ||
×
513
                        s.gitConfig.GitUsername != "" {
×
514
                        glog.V(2).Infof("Feature: %v is not enabled, arg values set for git-email: %s, git-username: %s "+
×
515
                                "git-secret-name: %s, git-secret-namespace: %s will be ignored.", features.GitopsApps,
516
                                s.gitConfig.GitEmail, s.gitConfig.GitUsername, s.gitConfig.GitSecretName, s.gitConfig.GitSecretNamespace)
517
                }
×
518
        }
×
519
        glog.V(3).Infof("Finished creating turbo configuration: %+v", vmtConfig)
×
520

×
521
        // The KubeTurbo TAP service
×
522
        k8sTAPService, err := kubeturbo.NewKubernetesTAPService(vmtConfig)
×
523
        if err != nil {
×
524
                glog.Fatalf("Unexpected error while creating Kubernetes TAP service: %s", err)
×
525
        }
×
526
        s.K8sTAPService = *k8sTAPService
×
527

×
528
        if k8sTAPSpec.K8sTargetConfig.IsOcp {
×
529
                // Its a must to include the namespace env var in the kubeturbo pod spec.
×
530
                ns := util.GetKubeturboNamespace()
×
531
                // Update scc resources in parallel.
×
532
                go ManageSCCs(ns, dynamicClient, kubeClient, s.SkipCreatingSccRelatedResources)
×
533
        }
×
534

×
535
        // The client for healthz, debug, and prometheus
×
536
        go s.startHttp()
×
537

×
538
        cleanupWG := &sync.WaitGroup{}
×
539
        cleanupSCCFn := func() {
×
540
                ns := util.GetKubeturboNamespace()
×
541
                CleanUpSCCMgmtResources(ns, dynamicClient, kubeClient)
×
542
        }
×
543
        disconnectFn := func() {
×
544
                // Disconnect from Turbo server when Kubeturbo is shutdown
×
545
                // Close the mediation container including the endpoints. It avoids the
×
546
                // invalid endpoints remaining in the server side. See OM-28801.
×
547
                k8sTAPService.DisconnectFromTurbo()
×
548
        }
×
549
        var cleanupFuns []cleanUp
×
550
        if s.CleanupSccRelatedResources && !s.SkipCreatingSccRelatedResources {
×
551
                cleanupFuns = append(cleanupFuns, cleanupSCCFn)
×
552
        }
553
        cleanupFuns = append(cleanupFuns, disconnectFn)
554
        handleExit(cleanupWG, cleanupFuns...)
×
555

×
556
        gCChan := make(chan bool)
×
557
        defer close(gCChan)
×
558
        worker.NewGarbageCollector(kubeClient, dynamicClient, gCChan, s.GCIntervalMin*60, time.Minute*30, k8sTAPSpec.IsOcp).StartCleanup()
×
559

×
560
        glog.V(1).Infof("********** Start running Kubeturbo Service **********")
×
561
        k8sTAPService.Run()
×
562
        glog.V(1).Info("Kubeturbo service is stopped.")
×
563

×
564
        cleanupWG.Wait()
×
565
        glog.V(1).Info("Cleanup completed. Exiting gracefully.")
×
566
}
×
567

×
568
func (s *VMTServer) startHttp() {
×
569
        mux := http.NewServeMux()
×
570

571
        // healthz
×
572
        healthz.InstallHandler(mux)
×
573

×
574
        // debug
×
575
        if s.EnableProfiling {
×
576
                mux.HandleFunc("/debug/pprof/", pprof.Index)
577
                mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
578
                mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
579
                mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
1✔
580

1✔
581
                // prometheus.metrics
1✔
582
                mux.Handle("/metrics", promhttp.Handler())
1✔
583
        }
1✔
584

1✔
585
        server := &http.Server{
1✔
586
                Addr:    net.JoinHostPort(s.Address, strconv.Itoa(s.Port)),
1✔
587
                Handler: mux,
1✔
588
        }
1✔
589
        glog.Fatal(server.ListenAndServe())
2✔
590
}
1✔
591

1✔
592
// handleExit disconnects the tap service from Turbo service when Kubeturbo is shotdown
1✔
593
func handleExit(wg *sync.WaitGroup, cleanUpFns ...cleanUp) { // k8sTAPService *kubeturbo.K8sTAPService) {
2✔
594
        glog.V(4).Infof("*** Handling Kubeturbo Termination ***")
1✔
595
        sigChan := make(chan os.Signal, 1)
1✔
596
        signal.Notify(sigChan,
1✔
597
                syscall.SIGTERM,
2✔
598
                syscall.SIGINT,
1✔
599
                syscall.SIGQUIT,
1✔
600
                syscall.SIGHUP)
1✔
601

602
        wg.Add(1)
603
        go func() {
1✔
604
                select {
605
                case sig := <-sigChan:
606
                        glog.V(2).Infof("Signal %s received. Will run exit handlers.. \n", sig)
607
                        for _, f := range cleanUpFns {
×
608
                                // The default graceful timeout, once a container is sent a SIGTERM before it is
×
609
                                // killed in k8s is 30 seconds. We want to make maximum use of that time.
×
610
                                wg.Add(1)
×
611
                                go func(f cleanUp) {
×
612
                                        f()
×
613
                                        wg.Done()
×
614
                                }(f)
×
615
                        }
×
616
                }
×
617
                wg.Done()
×
618
        }()
×
619
}
620

×
621
func discoverk8sAPIResourceGV(client *kubernetes.Clientset, resourceName string) (schema.GroupVersion, error) {
×
622
        // We optimistically use a globally set default if we cannot discover the GV.
×
623
        defaultGV := util.K8sAPIDeploymentReplicasetDefaultGV
×
624

×
625
        apiResourceLists, err := client.ServerPreferredResources()
626
        if apiResourceLists == nil {
627
                return defaultGV, err
×
628
        }
×
629
        if err != nil {
×
630
                // We don't exit here as ServerPreferredResources can return the resource list even with errors.
×
631
                glog.Warningf("Error listing api resources: %v", err)
×
632
        }
633

634
        latestExtensionsVersion := schema.GroupVersion{Group: util.K8sExtensionsGroupName, Version: ""}
×
635
        latestAppsVersion := schema.GroupVersion{Group: util.K8sAppsGroupName, Version: ""}
×
636
        for _, apiResourceList := range apiResourceLists {
637
                if len(apiResourceList.APIResources) == 0 {
638
                        continue
×
639
                }
×
640

×
641
                found := false
×
642
                for _, apiResource := range apiResourceList.APIResources {
643
                        if apiResource.Name == resourceName {
×
644
                                found = true
×
645
                                break
×
646
                        }
×
647
                }
×
648
                if found == false {
×
649
                        continue
×
650
                }
651

652
                gv, err := schema.ParseGroupVersion(apiResourceList.GroupVersion)
×
653
                if err != nil {
×
654
                        return defaultGV, fmt.Errorf("error parsing GroupVersion: %v", err)
×
655
                }
×
656

×
657
                group := gv.Group
×
658
                version := gv.Version
×
659
                if group == util.K8sExtensionsGroupName {
660
                        latestExtensionsVersion.Version = latestComparedVersion(version, latestExtensionsVersion.Version)
661
                } else if group == util.K8sAppsGroupName {
×
662
                        latestAppsVersion.Version = latestComparedVersion(version, latestAppsVersion.Version)
×
663
                }
×
664
        }
×
665

×
666
        if latestAppsVersion.Version != "" {
667
                return latestAppsVersion, nil
668
        }
×
669
        if latestExtensionsVersion.Version != "" {
×
670
                return latestExtensionsVersion, nil
×
671
        }
×
672
        return defaultGV, nil
×
673
}
×
674

675
func latestComparedVersion(newVersion, existingVersion string) string {
×
676
        if existingVersion != "" && versionhelper.CompareKubeAwareVersionStrings(newVersion, existingVersion) <= 0 {
×
677
                return existingVersion
×
678
        }
×
679
        return newVersion
×
680
}
×
681

×
682
func ManageSCCs(ns string, dynClient dynamic.Interface, kubeClient kubernetes.Interface, skipCreatingResources bool) {
×
683
        sccList := GetSCCs(dynClient)
×
684
        if (sccList == nil) || (sccList != nil && len(sccList.Items) < 1) {
×
685
                // We don't need to bother as this cluster is most probably not openshift
×
686
                return
×
687
        }
688
        glog.V(3).Info("This looks like an openshift cluster and kubeturbo has appropriate permissions to manage SCCs.")
689

×
690
        if skipCreatingResources { // User is supposed to create the related resources by running the script create-scc-resources.sh
×
691
                glog.V(2).Infof("Skip creating SCC resources and start checking the SCC related resources")
×
692
                for _, scc := range sccList.Items {
×
693
                        sccName := scc.GetName()
×
694
                        if saName, isExisted := CheckSCCResoucesByName(sccName, ns, kubeClient); isExisted {
×
695
                                util.SCCMapping[sccName] = saName
×
696
                        }
×
697
                }
×
698
                glog.V(2).Infof("Scan the exsiting SCC impersonation related resources and add %v service accounts into the SCC mapping", len(util.SCCMapping))
×
699
                return
700
        }
×
701

×
702
        // If not skipCreatingResources, Kubeturbo will create them
×
703
        fail := true
×
704
        defer func() {
×
705
                if fail {
706
                        CleanUpSCCMgmtResources(ns, dynClient, kubeClient)
×
707
                }
×
708
        }()
×
709

×
710
        saNames := []string{}
711
        for _, scc := range sccList.Items {
712
                sccName := scc.GetName()
713
                saName, err := createSCCServiceAccount(ns, sccName, kubeClient)
714
                if err != nil {
715
                        // We have no option but to abort halfway and cleanup in case of persistent errors.
716
                        // We already retry couple of times in case of an error.
717
                        glog.Errorf("Error creating SCC ServicAccount '%s/%s'. %s.", ns, saName, err)
718
                        return
719
                }
720

721
                roleName, err := createSCCRole(ns, sccName, kubeClient)
×
722
                if err != nil {
×
723
                        glog.Errorf("Error creating SCC Role '%s/%s'. %s.", ns, roleName, err)
724
                        return
725
                }
×
726

×
727
                roleBindingName, err := createSCCRoleBinding(saName, ns, sccName, roleName, kubeClient)
×
728
                if err != nil {
×
729
                        glog.Errorf("Error creating SCC RoleBinding '%s/%s'. %s.", ns, roleBindingName, err)
730
                        return
×
731
                }
×
732

×
733
                // We use this map both for updating the user names in sccs and to cleanup the resources
734
                // in case of an error or at exit.
×
735
                // This has potential for race conditions, for example the service account was created
736
                // but not updated in this map when the exit was trigerred.
737
                // Ignoring this as of now because this can be no better then facing transient API errors
×
738
                // while deleting resources at exit, which will also leak resources behind.
×
739
                // Leaking resources is ok to some extent, because we use constant names and everything is
×
740
                // created within a namespace. Any leaked resources will automatically be cleaned up when
×
741
                // the kubeturbo namespace is deleted.
×
742
                // Also in case of kubeturbo restarts if there are leaked resources, kubeturbo will adopt them.
×
743
                util.SCCMapping[sccName] = saName
×
744
                saNames = append(saNames, saName)
×
745
        }
×
746

×
747
        clusterRoleName, err := createSCCClusterRole(ns, kubeClient)
×
748
        if err != nil {
×
749
                glog.Errorf("Error creating SCC Cluster Role '%s'. %s.", clusterRoleName, err)
×
750
                return
×
751
        }
×
752

753
        clusterRoleBindingName, err := createSCCClusterRoleBinding(saNames, ns, clusterRoleName, kubeClient)
×
754
        if err != nil {
×
755
                glog.Errorf("Error creating SCC ClusterRoleBinding '%s'. %s.", clusterRoleBindingName, err)
×
756
                return
×
757
        }
758

759
        fail = false
×
760
}
×
761

×
762
func GetSCCs(client dynamic.Interface) (sccList *unstructured.UnstructuredList) {
×
763
        res := schema.GroupVersionResource{
×
764
                Group:    util.OpenShiftAPISCCGV.Group,
×
765
                Version:  util.OpenShiftAPISCCGV.Version,
×
766
                Resource: util.OpenShiftSCCResName,
×
767
        }
×
768

×
769
        err := util.RetryDuring(util.TransientRetryTimes, 0,
×
770
                util.QuickRetryInterval, func() error {
×
771
                        var err error
772
                        sccList, err = client.Resource(res).List(context.TODO(), metav1.ListOptions{})
×
773
                        if err != nil {
×
774
                                glog.Warningf("Could not get openshift cluster sccs: %v", err)
×
775
                        }
×
776
                        return err
×
777
                })
778
        if err != nil {
779
                return nil
×
780
        }
781
        return sccList
782
}
×
783

×
784
func createSCCServiceAccount(namespace, sccName string, kubeClient kubernetes.Interface) (string, error) {
×
785
        sa := util.GetServiceAccountForSCC(sccName)
×
786
        saName := sa.Name
×
787

×
788
        // TODO: an improvement on retries would be to retry only on transient errors.
×
789
        err := util.RetryDuring(util.TransientRetryTimes, 0,
×
790
                util.QuickRetryInterval, func() error {
×
791
                        _, err := kubeClient.CoreV1().ServiceAccounts(namespace).Create(context.TODO(), sa, metav1.CreateOptions{})
×
792
                        if apierrors.IsAlreadyExists(err) {
×
793
                                glog.V(2).Infof("SCC ServiceAccount: %s/%s already exists.", namespace, saName)
794
                                return nil
×
795
                        }
×
796

×
797
                        if err != nil {
×
798
                                return err
799
                        }
×
800

×
801
                        glog.V(3).Infof("Service Account: %s created.", saName)
802
                        return nil
803
                })
×
804

805
        return saName, err
806
}
×
807

×
808
func createSCCRole(namespace, sccName string, kubeClient kubernetes.Interface) (string, error) {
×
809
        role := util.GetRoleForSCC(sccName)
×
810
        roleName := role.Name
×
811

×
812
        err := util.RetryDuring(util.TransientRetryTimes, 0,
×
813
                util.QuickRetryInterval, func() error {
×
814
                        _, err := kubeClient.RbacV1().Roles(namespace).Create(context.TODO(), role, metav1.CreateOptions{})
×
815
                        if apierrors.IsAlreadyExists(err) {
×
816
                                glog.V(3).Infof("SCC Role: %s already exists.", roleName)
×
817
                                return nil
818
                        }
×
819

×
820
                        if err != nil {
×
821
                                return err
×
822
                        }
823

×
824
                        glog.V(3).Infof("SCC Role: %s created.", roleName)
×
825
                        return nil
826
                })
827

828
        return roleName, err
×
829
}
×
830

×
831
func createSCCRoleBinding(saName, namespace, sccName, roleName string, kubeClient kubernetes.Interface) (string, error) {
×
832
        rb := util.GetRoleBindingForSCC(saName, namespace, sccName, roleName)
×
833
        rbName := rb.Name
×
834

×
835
        err := util.RetryDuring(util.TransientRetryTimes, 0,
×
836
                util.QuickRetryInterval, func() error {
×
837
                        _, err := kubeClient.RbacV1().RoleBindings(namespace).Create(context.TODO(), rb, metav1.CreateOptions{})
×
838
                        if apierrors.IsAlreadyExists(err) {
×
839
                                // TODO: We ignore the case where a new scc might appear between kubeturbo runs
840
                                // That means a new scc definition will be picked across restarts only.
×
841
                                glog.V(3).Infof("SCC RoleBinding: %s already exists.", rbName)
×
842
                                return nil
×
843
                        }
×
844

845
                        if err != nil {
×
846
                                return err
×
847
                        }
848

849
                        glog.V(3).Infof("SCC RoleBinding: %s created.", rbName)
×
850
                        return nil
851
                })
852

×
853
        return rbName, err
×
854
}
×
855

×
856
func createSCCClusterRole(ns string, kubeClient kubernetes.Interface) (string, error) {
×
857
        clusterRole := util.GetClusterRoleForSCC(ns)
×
858
        clusterRoleName := clusterRole.Name
×
859

×
860
        err := util.RetryDuring(util.TransientRetryTimes, 0,
×
861
                util.QuickRetryInterval, func() error {
×
862
                        _, err := kubeClient.RbacV1().ClusterRoles().Create(context.TODO(), clusterRole, metav1.CreateOptions{})
×
863
                        if apierrors.IsAlreadyExists(err) {
×
864
                                glog.V(3).Infof("SCC Cluster Role: %s already exists.", clusterRoleName)
×
865
                                return nil
866
                        }
×
867

×
868
                        if err != nil {
×
869
                                return err
×
870
                        }
871

×
872
                        glog.V(3).Infof("SCC Cluster Role: %s created.", clusterRoleName)
×
873
                        return nil
874
                })
875

876
        return clusterRoleName, err
×
877
}
×
878

×
879
func createSCCClusterRoleBinding(saNames []string, namespace, roleName string, kubeClient kubernetes.Interface) (string, error) {
×
880
        crb := util.GetClusterRoleBindingForSCC(saNames, namespace, roleName)
×
881
        crbName := crb.Name
×
882

×
883
        err := util.RetryDuring(util.TransientRetryTimes, 0,
×
884
                util.QuickRetryInterval, func() error {
×
885
                        _, err := kubeClient.RbacV1().ClusterRoleBindings().Create(context.TODO(), crb, metav1.CreateOptions{})
×
886
                        if apierrors.IsAlreadyExists(err) {
×
887
                                // TODO: We ignore the case where a new scc might appear between kubeturbo runs
×
888
                                // That means a new scc definition will be picked across restarts only.
×
889
                                glog.V(3).Infof("SCC ClusterRoleBinding: %s already exists. "+
×
890
                                        "It will be updated with the latest subjects.", crbName)
×
891
                                _, err := kubeClient.RbacV1().ClusterRoleBindings().Update(context.TODO(), crb, metav1.UpdateOptions{})
×
892
                                return err
×
893
                        }
×
894

×
895
                        if err != nil {
×
896
                                return err
897
                        }
×
898

×
899
                        glog.V(3).Infof("SCC ClusterRoleBinding: %s created.", crbName)
×
900
                        return nil
×
901
                })
902

×
903
        return crbName, err
×
904
}
×
905

×
906
func CleanUpSCCMgmtResources(ns string, dynClient dynamic.Interface, kubeClient kubernetes.Interface) {
907
        if len(util.SCCMapping) < 1 {
908
                glog.V(2).Infof("SCC management resource cleanup is not needed.")
909
                return
910
        }
×
911
        glog.V(2).Infof("SCC management resource cleanup started.")
×
912

×
913
        for sccName, saName := range util.SCCMapping {
914
                // Errors are found and logged in the called methods.
915
                // We retry couple of times on errors but continue to delete other
×
916
                // resources on persistent error on a particular resource.
×
917
                deleteSCCRole(ns, sccName, kubeClient)
×
918
                deleteSCCRoleBinding(ns, sccName, kubeClient)
×
919
                if err := util.RetryDuring(util.TransientRetryTimes, 0,
×
920
                        util.QuickRetryInterval, func() error {
×
921
                                err := kubeClient.CoreV1().ServiceAccounts(ns).Delete(context.TODO(), saName, metav1.DeleteOptions{})
×
922
                                if apierrors.IsNotFound(err) {
×
923
                                        glog.V(2).Infof("SCC ServiceAccount: %s/%s already deleted.", ns, saName)
×
924
                                        return nil
925
                                }
×
926

×
927
                                if err != nil {
×
928
                                        glog.Errorf("Error deleting SCC ServicAccount: %s/%s, %s.", ns, saName, err)
×
929
                                        return err
×
930
                                }
931

×
932
                                return nil
×
933
                        }); err != nil {
×
934
                        glog.Error(err)
935
                }
936

×
937
        }
×
938

×
939
        // Errors are found and logged in the called methods.
×
940
        deleteSCCClusterRoleBinding(ns, kubeClient)
×
941
        deleteSCCClusterRole(ns, kubeClient)
×
942
        glog.V(2).Infof("SCC management resource cleanup completed.")
×
943
}
×
944

945
func deleteSCCRole(namespace, sccName string, kubeClient kubernetes.Interface) {
×
946
        roleName := util.RoleNameForSCC(sccName)
×
947
        err := util.RetryDuring(util.TransientRetryTimes, 0,
×
948
                util.QuickRetryInterval, func() error {
×
949
                        err := kubeClient.RbacV1().Roles(namespace).Delete(context.TODO(), roleName, metav1.DeleteOptions{})
950
                        if apierrors.IsNotFound(err) {
×
951
                                glog.V(3).Infof("SCC Role: %s already deleted.", roleName)
×
952
                                return nil
×
953
                        }
954

955
                        if err != nil {
×
956
                                glog.Errorf("Error deleting SCC Role: %s, %s.", roleName, err)
×
957
                                return err
×
958
                        }
×
959
                        return nil
×
960
                })
×
961
        if err != nil {
×
962
                glog.Errorf("Error deleting SCC role. %v", err)
×
963
        }
×
964
}
965

×
966
func deleteSCCRoleBinding(namespace, sccName string, kubeClient kubernetes.Interface) {
×
967
        roleBindingName := util.RoleBindingNameForSCC(sccName)
×
968
        err := util.RetryDuring(util.TransientRetryTimes, 0,
×
969
                util.QuickRetryInterval, func() error {
×
970
                        err := kubeClient.RbacV1().RoleBindings(namespace).Delete(context.TODO(), roleBindingName, metav1.DeleteOptions{})
971
                        if apierrors.IsNotFound(err) {
×
972
                                glog.V(3).Infof("SCC RoleBinding: %s already deleted.", roleBindingName)
×
973
                        }
×
974

975
                        if err != nil {
976
                                glog.Errorf("Error deleting SCC RoleBinding: %s, %s.", roleBindingName, err)
×
977
                        }
×
978
                        return nil
×
979
                })
×
980
        if err != nil {
×
981
                glog.Errorf("Error deleting SCC role binding. %v", err)
×
982
        }
×
983
}
×
984

985
func deleteSCCClusterRole(namespace string, kubeClient kubernetes.Interface) {
×
986
        clusterRoleName := fmt.Sprintf("%s-%s", util.SCCClusterRoleName, namespace)
×
987
        err := util.RetryDuring(util.TransientRetryTimes, 0,
×
988
                util.QuickRetryInterval, func() error {
×
989
                        err := kubeClient.RbacV1().ClusterRoles().Delete(context.TODO(), clusterRoleName, metav1.DeleteOptions{})
990
                        if apierrors.IsNotFound(err) {
×
991
                                glog.V(3).Infof("SCC ClusterRole: %s already deleted.", clusterRoleName)
×
992
                                return nil
×
993
                        }
994

995
                        if err != nil {
996
                                glog.Errorf("Error deleting SCC ClusterRole: %s, %s.", clusterRoleName, err)
997
                                return err
×
998
                        }
×
999
                        return nil
×
1000
                })
×
1001
        if err != nil {
×
1002
                glog.Errorf("Error deleting SCC cluster role. %v", err)
×
1003
        }
×
1004
}
×
1005

×
1006
func deleteSCCClusterRoleBinding(namespace string, kubeClient kubernetes.Interface) {
×
1007
        clusterRoleBindingName := fmt.Sprintf("%s-%s", util.SCCClusterRoleBindingName, namespace)
×
1008
        err := util.RetryDuring(util.TransientRetryTimes, 0,
×
1009
                util.QuickRetryInterval, func() error {
×
1010
                        err := kubeClient.RbacV1().ClusterRoleBindings().Delete(context.TODO(), clusterRoleBindingName, metav1.DeleteOptions{})
×
1011
                        if apierrors.IsNotFound(err) {
1012
                                glog.V(3).Infof("SCC ClusterRoleBinding: %s already deleted.", clusterRoleBindingName)
1013
                        }
×
1014

1015
                        if err != nil {
1016
                                glog.Errorf("Error deleting SCC ClusterRoleBinding: %s, %s.", clusterRoleBindingName, err)
×
1017
                        }
×
1018
                        return nil
×
1019
                })
×
1020
        if err != nil {
×
1021
                glog.Errorf("Error deleting SCC cluster role binding. %v", err)
×
1022
        }
×
1023
}
×
1024

×
1025
func CheckSCCResoucesByName(sccName, namespace string, kubeClient kubernetes.Interface) (string, bool) {
×
1026
        saName := util.ServiceAccountNameForSCC(sccName)
×
1027
        roleName := util.RoleBindingNameForSCC(sccName)
×
1028
        roleBindingName := util.RoleBindingNameForSCC(sccName)
×
1029
        _, saErr := kubeClient.CoreV1().ServiceAccounts(namespace).Get(context.TODO(), saName, metav1.GetOptions{})
×
1030
        _, roleErr := kubeClient.RbacV1().Roles(namespace).Get(context.TODO(), roleName, metav1.GetOptions{})
×
1031
        _, roleBindingErr := kubeClient.RbacV1().RoleBindings(namespace).Get(context.TODO(), roleBindingName, metav1.GetOptions{})
×
1032
        return saName, saErr == nil && roleErr == nil && roleBindingErr == nil
×
1033
}
×
1034

×
1035
func WatchConfigMap(vmtServer *VMTServer) {
×
1036
        // Get the current value
×
1037
        currentMinNodes := configs.DefaultMinNodePoolSize
1038
        currentMaxNodes := configs.DefaultMaxNodePoolSize
1039
        currentMode, _ := configs.GetCurrentWireMockMode()
1040

×
1041
        //Check if the file /etc/kubeturbo/turbo-autoreload.config exists
×
1042
        retrySeconds := 30
×
1043
        for {
×
1044
                if verr := viper.ReadInConfig(); verr == nil {
×
1045
                        break
×
1046
                } else {
×
1047
                        if _, ok := verr.(viper.ConfigFileNotFoundError); ok {
×
1048
                                glog.V(4).Infof("Autoreload config file %s/%s not found", configs.AutoReloadConfigFilePath, configs.AutoReloadConfigFileName)
×
1049
                        } else {
×
1050
                                glog.Warningf("Invalid configuration: error %s reading autoreload config %s/%s. Retry in %d seconds.",
×
1051
                                        verr, configs.AutoReloadConfigFilePath, configs.AutoReloadConfigFileName, retrySeconds)
×
1052
                        }
×
1053
                        time.Sleep(time.Duration(retrySeconds) * time.Second)
1054
                }
×
1055
        }
1056

1057
        glog.V(1).Infof("Start watching the autoreload config file %s/%s", configs.AutoReloadConfigFilePath, configs.AutoReloadConfigFileName)
1058
        updateConfigClosure := func() {
×
1059
                configs.UpdateLoggingLevel()
×
1060
                configs.UpdateNodePoolConfig(&currentMinNodes, &currentMaxNodes)
×
1061
                configs.UpdateSystemNamespaceDetectors()
×
1062
                configs.UpdateOperatorControlledWorkloadsExclusion()
×
1063
                configs.UpdateOperatorControlledNamespacesExclusion()
×
1064
                configs.UpdateWireMockMode(currentMode)
×
1065
                configs.UpdateDaemonNamespaces()
×
1066
                configs.UpdateDaemonPods()
×
1067
                configs.UpdateChunkSendDelayMillis()
×
1068
                configs.UpdateNumObjectsPerChunk()
×
1069
        }
×
1070
        updateConfigClosure() //update the logging level during startup
×
1071
        viper.OnConfigChange(func(in fsnotify.Event) {
×
1072
                updateConfigClosure()
1073
                vmtServer.K8sTAPService.UpdateTAPServiceConfigs()
1074
        })
1075

1076
        viper.WatchConfig()
1077
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc