• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5473

15 Jul 2025 12:49PM UTC coverage: 59.502% (-0.02%) from 59.518%
#5473

push

travis-ci

web-flow
metrics-server: use SecureServing option (#3837)

As part of an effort to standardize metric exposure across KubeVirt
components on port 8443, we are transitioning to HTTPS with TLS
encryption for the metrics-server.

To facilitate this, we leverage the controller-runtime's SecureServing
option, which creates a self-signed certificate and configures it as the
server certificate for the metrics endpoint when no external certificate
is provided[1].

Subsequent PRs will replace this self-signed certificate with a CDI
generated one to enable a fully trusted and secure connection between
the Prometheus instance and the target metrics endpoints as specified by
the CDI ServiceMonitor. Until that integration is complete, the
ServiceMonitor will be configured with insecureSkipVerify to allow
scraping despite the untrusted certificate.

[1] https://github.com/kubernetes-sigs/controller-runtime/pull/2407

Signed-off-by: Adi Aloni <aaloni@redhat.com>

3 of 20 new or added lines in 5 files covered. (15.0%)

2 existing lines in 1 file now uncovered.

17101 of 28740 relevant lines covered (59.5%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/cmd/cdi-controller/controller.go
1
package main
2

3
import (
4
        "context"
5
        "crypto/rsa"
6
        "crypto/tls"
7
        "flag"
8
        "fmt"
9
        "os"
10
        "strconv"
11

12
        "github.com/kelseyhightower/envconfig"
13
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
14
        ocpconfigv1 "github.com/openshift/api/config/v1"
15
        imagev1 "github.com/openshift/api/image/v1"
16
        routev1 "github.com/openshift/api/route/v1"
17
        "github.com/pkg/errors"
18
        "go.uber.org/zap/zapcore"
19

20
        batchv1 "k8s.io/api/batch/v1"
21
        v1 "k8s.io/api/core/v1"
22
        networkingv1 "k8s.io/api/networking/v1"
23
        extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
24
        "k8s.io/apimachinery/pkg/api/meta"
25
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26
        "k8s.io/apimachinery/pkg/fields"
27
        apiruntime "k8s.io/apimachinery/pkg/runtime"
28
        "k8s.io/client-go/kubernetes"
29
        clientgoscheme "k8s.io/client-go/kubernetes/scheme"
30
        "k8s.io/client-go/tools/clientcmd"
31
        "k8s.io/klog/v2"
32

33
        "sigs.k8s.io/controller-runtime/pkg/cache"
34
        "sigs.k8s.io/controller-runtime/pkg/client"
35
        "sigs.k8s.io/controller-runtime/pkg/client/config"
36
        logf "sigs.k8s.io/controller-runtime/pkg/log"
37
        "sigs.k8s.io/controller-runtime/pkg/log/zap"
38
        "sigs.k8s.io/controller-runtime/pkg/manager"
39
        "sigs.k8s.io/controller-runtime/pkg/manager/signals"
40
        "sigs.k8s.io/controller-runtime/pkg/metrics/server"
41

42
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
43
        forklift "kubevirt.io/containerized-data-importer-api/pkg/apis/forklift/v1beta1"
44
        "kubevirt.io/containerized-data-importer/pkg/common"
45
        "kubevirt.io/containerized-data-importer/pkg/controller"
46
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
47
        "kubevirt.io/containerized-data-importer/pkg/controller/populators"
48
        "kubevirt.io/containerized-data-importer/pkg/controller/transfer"
49
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
50
        "kubevirt.io/containerized-data-importer/pkg/util"
51
        "kubevirt.io/containerized-data-importer/pkg/util/cert"
52
        "kubevirt.io/containerized-data-importer/pkg/util/cert/fetcher"
53
        "kubevirt.io/containerized-data-importer/pkg/util/cert/generator"
54
)
55

56
const (
57
        readyFile = "/tmp/ready"
58
)
59

60
var (
61
        kubeconfig             string
62
        kubeURL                string
63
        importerImage          string
64
        clonerImage            string
65
        uploadServerImage      string
66
        uploadProxyServiceName string
67
        ovirtPopulatorImage    string
68
        configName             string
69
        pullPolicy             string
70
        verbose                string
71
        installerLabels        map[string]string
72
        log                    = logf.Log.WithName("controller")
73
        controllerEnvs         ControllerEnvs
74
        resourcesSchemeFuncs   = []func(*apiruntime.Scheme) error{
75
                clientgoscheme.AddToScheme,
76
                cdiv1.AddToScheme,
77
                extv1.AddToScheme,
78
                snapshotv1.AddToScheme,
79
                forklift.AddToScheme,
80
                imagev1.Install,
81
                ocpconfigv1.Install,
82
                routev1.Install,
83
        }
84
)
85

86
// ControllerEnvs contains environment variables read for setting custom cert paths
87
type ControllerEnvs struct {
88
        UploadServerKeyFile           string `default:"/var/run/certs/cdi-uploadserver-signer/tls.key" split_words:"true"`
89
        UploadServerCertFile          string `default:"/var/run/certs/cdi-uploadserver-signer/tls.crt" split_words:"true"`
90
        UploadClientKeyFile           string `default:"/var/run/certs/cdi-uploadserver-client-signer/tls.key" split_words:"true"`
91
        UploadClientCertFile          string `default:"/var/run/certs/cdi-uploadserver-client-signer/tls.crt" split_words:"true"`
92
        UploadServerCaBundleConfigMap string `default:"cdi-uploadserver-signer-bundle" split_words:"true"`
93
        UploadClientCaBundleConfigMap string `default:"cdi-uploadserver-client-signer-bundle" split_words:"true"`
94
}
95

96
// The importer and cloner images are obtained here along with the supported flags. IMPORTER_IMAGE, CLONER_IMAGE, and UPLOADSERVICE_IMAGE
97
// are required by the controller and will cause it to fail if not defined.
98
// Note: kubeconfig hierarchy is 1) -kubeconfig flag, 2) $KUBECONFIG exported var. If neither is
99
// specified we do an in-cluster config. For testing it's easiest to export KUBECONFIG.
100
func init() {
×
101
        // flags
×
102
        flag.StringVar(&kubeURL, "server", "", "(Optional) URL address of a remote api server.  Do not set for local clusters.")
×
103
        klog.InitFlags(nil)
×
104
        flag.Parse()
×
105

×
106
        if flag.Lookup("kubeconfig") != nil {
×
107
                kubeconfig = flag.Lookup("kubeconfig").Value.String()
×
108
        }
×
109
        importerImage = getRequiredEnvVar("IMPORTER_IMAGE")
×
110
        clonerImage = getRequiredEnvVar("CLONER_IMAGE")
×
111
        uploadServerImage = getRequiredEnvVar("UPLOADSERVER_IMAGE")
×
112
        ovirtPopulatorImage = getRequiredEnvVar("OVIRT_POPULATOR_IMAGE")
×
113
        uploadProxyServiceName = getRequiredEnvVar("UPLOADPROXY_SERVICE")
×
114
        installerLabels = map[string]string{}
×
115

×
116
        pullPolicy = common.DefaultPullPolicy
×
117
        if pp := os.Getenv(common.PullPolicy); len(pp) != 0 {
×
118
                pullPolicy = pp
×
119
        }
×
120

121
        // We will need to put those on every resource our controller creates
122
        if partOfVal := os.Getenv(common.InstallerPartOfLabel); len(partOfVal) != 0 {
×
123
                installerLabels[common.AppKubernetesPartOfLabel] = partOfVal
×
124
        }
×
125
        if versionVal := os.Getenv(common.InstallerVersionLabel); len(versionVal) != 0 {
×
126
                installerLabels[common.AppKubernetesVersionLabel] = versionVal
×
127
        }
×
128

129
        configName = common.ConfigName
×
130

×
131
        // NOTE we used to have a constant here and we're now just passing in the level directly
×
132
        // that should be fine since it was a constant and not a mutable variable
×
133
        defVerbose := fmt.Sprintf("%d", 1) // note flag values are strings
×
134
        verbose = defVerbose
×
135
        // visit actual flags passed in and if passed check -v and set verbose
×
136
        flag.Visit(func(f *flag.Flag) {
×
137
                if f.Name == "v" {
×
138
                        verbose = f.Value.String()
×
139
                }
×
140
        })
141
        if verbose == defVerbose {
×
142
                klog.V(1).Infof("Note: increase the -v level in the controller deployment for more detailed logging, eg. -v=%d or -v=%d\n", 2, 3)
×
143
        }
×
144

145
        // Setup metrics for our various controllers
146
        if err := metrics.SetupMetrics(); err != nil {
×
147
                klog.Errorf("failed to setup metrics: %v", err)
×
148
                os.Exit(1)
×
149
        }
×
150

151
        klog.V(3).Infof("init: complete: cdi controller will create importer using image %q\n", importerImage)
×
152
}
153

154
func getRequiredEnvVar(name string) string {
×
155
        val := os.Getenv(name)
×
156
        if val == "" {
×
157
                klog.Fatalf("Environment Variable %q undefined\n", name)
×
158
        }
×
159
        return val
×
160
}
161

162
func start() {
×
163
        klog.Info("Starting CDI controller components")
×
164

×
165
        namespace := util.GetNamespace()
×
166

×
167
        cfg, err := clientcmd.BuildConfigFromFlags(kubeURL, kubeconfig)
×
168
        if err != nil {
×
169
                klog.Fatalf("Unable to get kube config: %v\n", errors.WithStack(err))
×
170
        }
×
171

172
        k8sClient, err := kubernetes.NewForConfig(cfg)
×
173
        if err != nil {
×
174
                klog.Fatalf("Unable to get kube client: %v\n", errors.WithStack(err))
×
175
        }
×
176

177
        // Setup scheme for all resources
178
        scheme := apiruntime.NewScheme()
×
179
        for _, f := range resourcesSchemeFuncs {
×
180
                err := f(scheme)
×
181
                if err != nil {
×
182
                        klog.Errorf("Failed to add to scheme: %v", err)
×
183
                        os.Exit(1)
×
184
                }
×
185
        }
186

187
        // client.New() returns a client without cache
188
        // since we don't have a cached client before manager init
189
        apiClient, err := client.New(cfg, client.Options{
×
190
                Scheme: scheme,
×
191
        })
×
192
        if err != nil {
×
193
                klog.Fatalf("Unable to get uncached client: %v\n", errors.WithStack(err))
×
194
        }
×
195

196
        opts := manager.Options{
×
197
                LeaderElection:             true,
×
198
                LeaderElectionNamespace:    namespace,
×
199
                LeaderElectionID:           "cdi-controller-leader-election-helper",
×
200
                LeaderElectionResourceLock: "leases",
×
201
                Cache:                      getCacheOptions(apiClient, namespace),
×
202
                Scheme:                     scheme,
×
NEW
203
                Metrics: server.Options{
×
NEW
204
                        BindAddress:   ":8443",
×
NEW
205
                        SecureServing: true,
×
NEW
206
                        // Disable HTTP/2 to prevent rapid reset vulnerability
×
NEW
207
                        // See CVE-2023-44487, CVE-2023-39325
×
NEW
208
                        TLSOpts: []func(*tls.Config){func(c *tls.Config) {
×
NEW
209
                                c.NextProtos = []string{"http/1.1"}
×
NEW
210
                        }},
×
211
                },
212
        }
213

214
        mgr, err := manager.New(config.GetConfigOrDie(), opts)
×
215
        if err != nil {
×
216
                klog.Errorf("Unable to setup controller manager: %v", err)
×
217
                os.Exit(1)
×
218
        }
×
219

220
        uploadClientCAFetcher := &fetcher.FileCertFetcher{KeyFileName: controllerEnvs.UploadClientKeyFile, CertFileName: controllerEnvs.UploadClientCertFile}
×
221
        uploadClientBundleFetcher := &fetcher.ConfigMapCertBundleFetcher{
×
222
                Name:   controllerEnvs.UploadClientCaBundleConfigMap,
×
223
                Client: k8sClient.CoreV1().ConfigMaps(namespace),
×
224
        }
×
225
        uploadClientCertGenerator := &generator.FetchCertGenerator{Fetcher: uploadClientCAFetcher}
×
226

×
227
        uploadServerCAFetcher := &fetcher.FileCertFetcher{KeyFileName: controllerEnvs.UploadServerKeyFile, CertFileName: controllerEnvs.UploadServerCertFile}
×
228
        uploadServerBundleFetcher := &fetcher.ConfigMapCertBundleFetcher{
×
229
                Name:   controllerEnvs.UploadServerCaBundleConfigMap,
×
230
                Client: k8sClient.CoreV1().ConfigMaps(namespace),
×
231
        }
×
232
        uploadServerCertGenerator := &generator.FetchCertGenerator{Fetcher: uploadServerCAFetcher}
×
233

×
234
        if _, err := controller.NewConfigController(mgr, log, uploadProxyServiceName, configName, installerLabels); err != nil {
×
235
                klog.Errorf("Unable to setup config controller: %v", err)
×
236
                os.Exit(1)
×
237
        }
×
238

239
        if _, err := controller.NewStorageProfileController(mgr, log, installerLabels); err != nil {
×
240
                klog.Errorf("Unable to setup storage profiles controller: %v", err)
×
241
                os.Exit(1)
×
242
        }
×
243

244
        if err := dvc.CreateCommonIndexes(mgr); err != nil {
×
245
                klog.Errorf("Unable to create shared indexes: %v", err)
×
246
                os.Exit(1)
×
247
        }
×
248

249
        ctx := signals.SetupSignalHandler()
×
250

×
251
        // TODO: Current DV controller had threadiness 3, should we do the same here, defaults to one thread.
×
252
        if _, err := dvc.NewImportController(ctx, mgr, log, installerLabels); err != nil {
×
253
                klog.Errorf("Unable to setup datavolume import controller: %v", err)
×
254
                os.Exit(1)
×
255
        }
×
256
        if _, err := dvc.NewUploadController(ctx, mgr, log, installerLabels); err != nil {
×
257
                klog.Errorf("Unable to setup datavolume upload controller: %v", err)
×
258
                os.Exit(1)
×
259
        }
×
260
        if _, err := dvc.NewPvcCloneController(ctx, mgr, log,
×
261
                clonerImage, importerImage, pullPolicy, getTokenPublicKey(), getTokenPrivateKey(), installerLabels); err != nil {
×
262
                klog.Errorf("Unable to setup datavolume pvc clone controller: %v", err)
×
263
                os.Exit(1)
×
264
        }
×
265
        if _, err := dvc.NewSnapshotCloneController(ctx, mgr, log,
×
266
                clonerImage, importerImage, pullPolicy, getTokenPublicKey(), getTokenPrivateKey(), installerLabels); err != nil {
×
267
                klog.Errorf("Unable to setup datavolume snapshot clone controller: %v", err)
×
268
                os.Exit(1)
×
269
        }
×
270
        if _, err := dvc.NewPopulatorController(ctx, mgr, log, installerLabels); err != nil {
×
271
                klog.Errorf("Unable to setup datavolume external-population controller: %v", err)
×
272
                os.Exit(1)
×
273
        }
×
274

275
        if _, err := controller.NewImportController(mgr, log, importerImage, pullPolicy, verbose, installerLabels); err != nil {
×
276
                klog.Errorf("Unable to setup import controller: %v", err)
×
277
                os.Exit(1)
×
278
        }
×
279

280
        if _, err := controller.NewCloneController(mgr, log, clonerImage, pullPolicy, verbose, uploadClientCertGenerator, uploadServerBundleFetcher, getTokenPublicKey(), installerLabels); err != nil {
×
281
                klog.Errorf("Unable to setup clone controller: %v", err)
×
282
                os.Exit(1)
×
283
        }
×
284

285
        if _, err := controller.NewUploadController(mgr, log, uploadServerImage, pullPolicy, verbose, uploadServerCertGenerator, uploadClientBundleFetcher, installerLabels); err != nil {
×
286
                klog.Errorf("Unable to setup upload controller: %v", err)
×
287
                os.Exit(1)
×
288
        }
×
289

290
        if _, err := transfer.NewObjectTransferController(mgr, log, installerLabels); err != nil {
×
291
                klog.Errorf("Unable to setup transfer controller: %v", err)
×
292
                os.Exit(1)
×
293
        }
×
294

295
        if _, err := controller.NewDataImportCronController(mgr, log, importerImage, pullPolicy, installerLabels); err != nil {
×
296
                klog.Errorf("Unable to setup dataimportcron controller: %v", err)
×
297
                os.Exit(1)
×
298
        }
×
299
        if _, err := controller.NewDataSourceController(mgr, log, installerLabels); err != nil {
×
300
                klog.Errorf("Unable to setup datasource controller: %v", err)
×
301
                os.Exit(1)
×
302
        }
×
303
        // Populator controllers and indexes
304
        if err := populators.CreateCommonPopulatorIndexes(mgr); err != nil {
×
305
                klog.Errorf("Unable to create common populator indexes: %v", err)
×
306
                os.Exit(1)
×
307
        }
×
308
        if _, err := populators.NewImportPopulator(ctx, mgr, log, installerLabels); err != nil {
×
309
                klog.Errorf("Unable to setup import populator: %v", err)
×
310
                os.Exit(1)
×
311
        }
×
312
        if _, err := populators.NewUploadPopulator(ctx, mgr, log, installerLabels); err != nil {
×
313
                klog.Errorf("Unable to setup upload populator: %v", err)
×
314
                os.Exit(1)
×
315
        }
×
316
        if _, err := populators.NewClonePopulator(ctx, mgr, log, clonerImage, pullPolicy, installerLabels, getTokenPublicKey()); err != nil {
×
317
                klog.Errorf("Unable to setup clone populator: %v", err)
×
318
                os.Exit(1)
×
319
        }
×
320
        if _, err := populators.NewForkliftPopulator(ctx, mgr, log, importerImage, ovirtPopulatorImage, installerLabels); err != nil {
×
321
                klog.Errorf("Unable to setup forklift populator: %v", err)
×
322
                os.Exit(1)
×
323
        }
×
324

325
        klog.V(1).Infoln("created cdi controllers")
×
326

×
327
        if err := mgr.Start(ctx); err != nil {
×
328
                klog.Errorf("Error running manager: %v", err)
×
329
                os.Exit(1)
×
330
        }
×
331
}
332

333
func main() {
×
334
        defer klog.Flush()
×
335
        debug := false
×
336
        verbosityLevel, err := strconv.Atoi(verbose)
×
337
        if err == nil && verbosityLevel > 1 {
×
338
                debug = true
×
339
        }
×
340
        err = envconfig.Process("", &controllerEnvs)
×
341
        if err != nil {
×
342
                klog.Fatalf("Unable to get environment variables: %v\n", errors.WithStack(err))
×
343
        }
×
344

345
        logf.SetLogger(zap.New(zap.Level(zapcore.Level(-1*verbosityLevel)), zap.UseDevMode(debug)))
×
346
        logf.Log.WithName("main").Info("Verbosity level", "verbose", verbose, "debug", debug)
×
347

×
348
        if err = createReadyFile(); err != nil {
×
349
                klog.Fatalf("Error creating ready file: %+v", err)
×
350
        }
×
351

352
        start()
×
353

×
354
        deleteReadyFile()
×
355

×
356
        klog.V(2).Infoln("cdi controller exited")
×
357
}
358

359
func createReadyFile() error {
×
360
        f, err := os.Create(readyFile)
×
361
        if err != nil {
×
362
                return err
×
363
        }
×
364
        defer f.Close()
×
365
        return nil
×
366
}
367

368
func deleteReadyFile() {
×
369
        os.Remove(readyFile)
×
370
}
×
371

372
func getTokenPublicKey() *rsa.PublicKey {
×
373
        keyBytes, err := os.ReadFile(controller.TokenPublicKeyPath)
×
374
        if err != nil {
×
375
                klog.Fatalf("Error reading apiserver public key")
×
376
        }
×
377

378
        key, err := controller.DecodePublicKey(keyBytes)
×
379
        if err != nil {
×
380
                klog.Fatalf("Error decoding public key")
×
381
        }
×
382

383
        return key
×
384
}
385

386
func getTokenPrivateKey() *rsa.PrivateKey {
×
387
        bytes, err := os.ReadFile(controller.TokenPrivateKeyPath)
×
388
        if err != nil {
×
389
                klog.Fatalf("Error reading private key")
×
390
        }
×
391

392
        obj, err := cert.ParsePrivateKeyPEM(bytes)
×
393
        if err != nil {
×
394
                klog.Fatalf("Error decoding private key")
×
395
        }
×
396

397
        key, ok := obj.(*rsa.PrivateKey)
×
398
        if !ok {
×
399
                klog.Fatalf("Invalid private key format")
×
400
        }
×
401

402
        return key
×
403
}
404

405
// Restricts some types in the cache's ListWatch to specific fields/labels per GVK at the specified object,
406
// other types will continue working normally.
407
// Note: objects you read once with the controller runtime client are cached.
408
// TODO: Make our watches way more specific using labels, for example,
409
// at the point of writing this, we don't care about VolumeSnapshots without the CDI label
410
func getCacheOptions(apiClient client.Client, cdiNamespace string) cache.Options {
×
411
        namespaceSelector := fields.Set{"metadata.namespace": cdiNamespace}.AsSelector()
×
412

×
413
        cacheOptions := cache.Options{
×
414
                ByObject: map[client.Object]cache.ByObject{
×
415
                        &networkingv1.Ingress{}: {
×
416
                                Field: namespaceSelector,
×
417
                        },
×
418
                        &batchv1.CronJob{}: {
×
419
                                Field: namespaceSelector,
×
420
                        },
×
421
                        &batchv1.Job{}: {
×
422
                                Field: namespaceSelector,
×
423
                        },
×
424
                        &v1.ConfigMap{}: {
×
425
                                Field: namespaceSelector,
×
426
                        },
×
427
                        &v1.Secret{}: {
×
428
                                Field: namespaceSelector,
×
429
                        },
×
430
                },
×
431
        }
×
432

×
433
        cacheOptionsByObjectForOpenshift := map[client.Object]cache.ByObject{
×
434
                &routev1.Route{}: {
×
435
                        Field: namespaceSelector,
×
436
                },
×
437
        }
×
438

×
439
        // Currently controller-runtime will fail if types in here are not installed in the cluster
×
440
        // https://github.com/kubernetes-sigs/controller-runtime/issues/2456
×
441
        if isOpenShift(apiClient) {
×
442
                for k, v := range cacheOptionsByObjectForOpenshift {
×
443
                        cacheOptions.ByObject[k] = v
×
444
                }
×
445
        }
446

447
        return cacheOptions
×
448
}
449

450
func isOpenShift(apiClient client.Client) bool {
×
451
        clusterVersion := &ocpconfigv1.ClusterVersion{
×
452
                ObjectMeta: metav1.ObjectMeta{
×
453
                        Name: "version",
×
454
                },
×
455
        }
×
456
        if err := apiClient.Get(context.TODO(), client.ObjectKeyFromObject(clusterVersion), clusterVersion); err != nil {
×
457
                if !meta.IsNoMatchError(err) {
×
458
                        klog.Errorf("Error getting clusterVersion: %v", err)
×
459
                        os.Exit(1)
×
460
                }
×
461
                return false
×
462
        }
463

464
        return true
×
465
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc