• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5877

10 Feb 2026 12:35AM UTC coverage: 49.319% (-0.2%) from 49.49%
#5877

Pull #4034

travis-ci

web-flow
Bump github.com/go-git/go-git/v5 in /tools/release-notes

Bumps [github.com/go-git/go-git/v5](https://github.com/go-git/go-git) from 5.16.4 to 5.16.5.
- [Release notes](https://github.com/go-git/go-git/releases)
- [Commits](https://github.com/go-git/go-git/compare/v5.16.4...v5.16.5)

---
updated-dependencies:
- dependency-name: github.com/go-git/go-git/v5
  dependency-version: 5.16.5
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #4034: Bump github.com/go-git/go-git/v5 from 5.16.4 to 5.16.5 in /tools/release-notes

14742 of 29891 relevant lines covered (49.32%)

0.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/cmd/cdi-controller/controller.go
1
package main
2

3
import (
4
        "context"
5
        "crypto/rsa"
6
        "crypto/tls"
7
        "flag"
8
        "fmt"
9
        "os"
10
        "strconv"
11

12
        "github.com/kelseyhightower/envconfig"
13
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
14
        ocpconfigv1 "github.com/openshift/api/config/v1"
15
        imagev1 "github.com/openshift/api/image/v1"
16
        routev1 "github.com/openshift/api/route/v1"
17
        "github.com/pkg/errors"
18
        "go.uber.org/zap/zapcore"
19

20
        batchv1 "k8s.io/api/batch/v1"
21
        v1 "k8s.io/api/core/v1"
22
        networkingv1 "k8s.io/api/networking/v1"
23
        extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
24
        "k8s.io/apimachinery/pkg/api/meta"
25
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26
        "k8s.io/apimachinery/pkg/fields"
27
        apiruntime "k8s.io/apimachinery/pkg/runtime"
28
        "k8s.io/client-go/kubernetes"
29
        clientgoscheme "k8s.io/client-go/kubernetes/scheme"
30
        "k8s.io/client-go/tools/clientcmd"
31
        "k8s.io/klog/v2"
32

33
        "sigs.k8s.io/controller-runtime/pkg/cache"
34
        "sigs.k8s.io/controller-runtime/pkg/client"
35
        "sigs.k8s.io/controller-runtime/pkg/client/config"
36
        logf "sigs.k8s.io/controller-runtime/pkg/log"
37
        "sigs.k8s.io/controller-runtime/pkg/log/zap"
38
        "sigs.k8s.io/controller-runtime/pkg/manager"
39
        "sigs.k8s.io/controller-runtime/pkg/manager/signals"
40
        "sigs.k8s.io/controller-runtime/pkg/metrics/server"
41

42
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
43
        forklift "kubevirt.io/containerized-data-importer-api/pkg/apis/forklift/v1beta1"
44
        "kubevirt.io/containerized-data-importer/pkg/common"
45
        "kubevirt.io/containerized-data-importer/pkg/controller"
46
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
47
        "kubevirt.io/containerized-data-importer/pkg/controller/populators"
48
        "kubevirt.io/containerized-data-importer/pkg/controller/transfer"
49
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
50
        "kubevirt.io/containerized-data-importer/pkg/util"
51
        "kubevirt.io/containerized-data-importer/pkg/util/cert"
52
        "kubevirt.io/containerized-data-importer/pkg/util/cert/fetcher"
53
        "kubevirt.io/containerized-data-importer/pkg/util/cert/generator"
54
)
55

56
const (
57
        readyFile = "/tmp/ready"
58
)
59

60
var (
61
        kubeconfig             string
62
        kubeURL                string
63
        importerImage          string
64
        clonerImage            string
65
        uploadServerImage      string
66
        uploadProxyServiceName string
67
        ovirtPopulatorImage    string
68
        configName             string
69
        pullPolicy             string
70
        verbose                string
71
        installerLabels        map[string]string
72
        log                    = logf.Log.WithName("controller")
73
        controllerEnvs         ControllerEnvs
74
        resourcesSchemeFuncs   = []func(*apiruntime.Scheme) error{
75
                clientgoscheme.AddToScheme,
76
                cdiv1.AddToScheme,
77
                extv1.AddToScheme,
78
                snapshotv1.AddToScheme,
79
                forklift.AddToScheme,
80
                imagev1.Install,
81
                ocpconfigv1.Install,
82
                routev1.Install,
83
        }
84
)
85

86
// ControllerEnvs contains environment variables read for setting custom cert paths
87
type ControllerEnvs struct {
88
        UploadServerKeyFile           string `default:"/var/run/certs/cdi-uploadserver-signer/tls.key" split_words:"true"`
89
        UploadServerCertFile          string `default:"/var/run/certs/cdi-uploadserver-signer/tls.crt" split_words:"true"`
90
        UploadClientKeyFile           string `default:"/var/run/certs/cdi-uploadserver-client-signer/tls.key" split_words:"true"`
91
        UploadClientCertFile          string `default:"/var/run/certs/cdi-uploadserver-client-signer/tls.crt" split_words:"true"`
92
        UploadServerCaBundleConfigMap string `default:"cdi-uploadserver-signer-bundle" split_words:"true"`
93
        UploadClientCaBundleConfigMap string `default:"cdi-uploadserver-client-signer-bundle" split_words:"true"`
94
}
95

96
// The importer and cloner images are obtained here along with the supported flags. IMPORTER_IMAGE, CLONER_IMAGE, and UPLOADSERVICE_IMAGE
97
// are required by the controller and will cause it to fail if not defined.
98
// Note: kubeconfig hierarchy is 1) -kubeconfig flag, 2) $KUBECONFIG exported var. If neither is
99
// specified we do an in-cluster config. For testing it's easiest to export KUBECONFIG.
100
func init() {
101
        // flags
102
        flag.StringVar(&kubeURL, "server", "", "(Optional) URL address of a remote api server.  Do not set for local clusters.")
×
103
        klog.InitFlags(nil)
×
104
        flag.Parse()
×
105

×
106
        if flag.Lookup("kubeconfig") != nil {
×
107
                kubeconfig = flag.Lookup("kubeconfig").Value.String()
×
108
        }
×
109
        importerImage = getRequiredEnvVar("IMPORTER_IMAGE")
×
110
        clonerImage = getRequiredEnvVar("CLONER_IMAGE")
×
111
        uploadServerImage = getRequiredEnvVar("UPLOADSERVER_IMAGE")
×
112
        ovirtPopulatorImage = getRequiredEnvVar("OVIRT_POPULATOR_IMAGE")
×
113
        uploadProxyServiceName = getRequiredEnvVar("UPLOADPROXY_SERVICE")
×
114
        installerLabels = map[string]string{}
×
115

×
116
        pullPolicy = common.DefaultPullPolicy
×
117
        if pp := os.Getenv(common.PullPolicy); len(pp) != 0 {
×
118
                pullPolicy = pp
×
119
        }
×
120

×
121
        // We will need to put those on every resource our controller creates
×
122
        if partOfVal := os.Getenv(common.InstallerPartOfLabel); len(partOfVal) != 0 {
123
                installerLabels[common.AppKubernetesPartOfLabel] = partOfVal
124
        }
×
125
        if versionVal := os.Getenv(common.InstallerVersionLabel); len(versionVal) != 0 {
×
126
                installerLabels[common.AppKubernetesVersionLabel] = versionVal
×
127
        }
×
128

×
129
        configName = common.ConfigName
×
130

131
        // NOTE we used to have a constant here and we're now just passing in the level directly
×
132
        // that should be fine since it was a constant and not a mutable variable
×
133
        defVerbose := fmt.Sprintf("%d", 1) // note flag values are strings
×
134
        verbose = defVerbose
×
135
        // visit actual flags passed in and if passed check -v and set verbose
×
136
        flag.Visit(func(f *flag.Flag) {
×
137
                if f.Name == "v" {
×
138
                        verbose = f.Value.String()
×
139
                }
×
140
        })
×
141
        if verbose == defVerbose {
×
142
                klog.V(1).Infof("Note: increase the -v level in the controller deployment for more detailed logging, eg. -v=%d or -v=%d\n", 2, 3)
143
        }
×
144

×
145
        // Setup metrics for our various controllers
×
146
        if err := metrics.SetupMetrics(); err != nil {
147
                klog.Errorf("failed to setup metrics: %v", err)
148
                os.Exit(1)
×
149
        }
×
150

×
151
        klog.V(3).Infof("init: complete: cdi controller will create importer using image %q\n", importerImage)
×
152
}
153

×
154
func getRequiredEnvVar(name string) string {
155
        val := os.Getenv(name)
156
        if val == "" {
×
157
                klog.Fatalf("Environment Variable %q undefined\n", name)
×
158
        }
×
159
        return val
×
160
}
×
161

×
162
func start() {
163
        klog.Info("Starting CDI controller components")
164

×
165
        namespace := util.GetNamespace()
×
166

×
167
        cfg, err := clientcmd.BuildConfigFromFlags(kubeURL, kubeconfig)
×
168
        if err != nil {
×
169
                klog.Fatalf("Unable to get kube config: %v\n", errors.WithStack(err))
×
170
        }
×
171

×
172
        k8sClient, err := kubernetes.NewForConfig(cfg)
×
173
        if err != nil {
174
                klog.Fatalf("Unable to get kube client: %v\n", errors.WithStack(err))
×
175
        }
×
176

×
177
        // Setup scheme for all resources
×
178
        scheme := apiruntime.NewScheme()
179
        for _, f := range resourcesSchemeFuncs {
180
                err := f(scheme)
×
181
                if err != nil {
×
182
                        klog.Errorf("Failed to add to scheme: %v", err)
×
183
                        os.Exit(1)
×
184
                }
×
185
        }
×
186

×
187
        // client.New() returns a client without cache
188
        // since we don't have a cached client before manager init
189
        apiClient, err := client.New(cfg, client.Options{
190
                Scheme: scheme,
191
        })
×
192
        if err != nil {
×
193
                klog.Fatalf("Unable to get uncached client: %v\n", errors.WithStack(err))
×
194
        }
×
195

×
196
        opts := manager.Options{
×
197
                LeaderElection:             true,
198
                LeaderElectionNamespace:    namespace,
×
199
                LeaderElectionID:           "cdi-controller-leader-election-helper",
×
200
                LeaderElectionResourceLock: "leases",
×
201
                Cache:                      getCacheOptions(apiClient, namespace),
×
202
                Scheme:                     scheme,
×
203
                Metrics: server.Options{
×
204
                        BindAddress:   ":8443",
×
205
                        SecureServing: true,
×
206
                        // Disable HTTP/2 to prevent rapid reset vulnerability
×
207
                        // See CVE-2023-44487, CVE-2023-39325
×
208
                        TLSOpts: []func(*tls.Config){func(c *tls.Config) {
×
209
                                c.NextProtos = []string{"http/1.1"}
×
210
                        }},
×
211
                },
×
212
        }
×
213

×
214
        mgr, err := manager.New(config.GetConfigOrDie(), opts)
×
215
        if err != nil {
×
216
                klog.Errorf("Unable to setup controller manager: %v", err)
×
217
                os.Exit(1)
×
218
        }
×
219

×
220
        uploadClientCAFetcher := &fetcher.FileCertFetcher{KeyFileName: controllerEnvs.UploadClientKeyFile, CertFileName: controllerEnvs.UploadClientCertFile}
×
221
        uploadClientBundleFetcher := &fetcher.ConfigMapCertBundleFetcher{
×
222
                Name:   controllerEnvs.UploadClientCaBundleConfigMap,
223
                Client: k8sClient.CoreV1().ConfigMaps(namespace),
224
        }
225
        uploadClientCertGenerator := &generator.FetchCertGenerator{Fetcher: uploadClientCAFetcher}
226

227
        uploadServerCAFetcher := &fetcher.FileCertFetcher{KeyFileName: controllerEnvs.UploadServerKeyFile, CertFileName: controllerEnvs.UploadServerCertFile}
×
228
        uploadServerBundleFetcher := &fetcher.ConfigMapCertBundleFetcher{
×
229
                Name:   controllerEnvs.UploadServerCaBundleConfigMap,
×
230
                Client: k8sClient.CoreV1().ConfigMaps(namespace),
×
231
        }
×
232
        uploadServerCertGenerator := &generator.FetchCertGenerator{Fetcher: uploadServerCAFetcher}
233

×
234
        if _, err := controller.NewConfigController(mgr, log, uploadProxyServiceName, configName, installerLabels); err != nil {
×
235
                klog.Errorf("Unable to setup config controller: %v", err)
×
236
                os.Exit(1)
×
237
        }
×
238

×
239
        if _, err := controller.NewStorageProfileController(mgr, log, installerLabels); err != nil {
×
240
                klog.Errorf("Unable to setup storage profiles controller: %v", err)
×
241
                os.Exit(1)
×
242
        }
×
243

×
244
        if err := dvc.CreateCommonIndexes(mgr); err != nil {
×
245
                klog.Errorf("Unable to create shared indexes: %v", err)
×
246
                os.Exit(1)
×
247
        }
×
248

×
249
        ctx := signals.SetupSignalHandler()
×
250

×
251
        // TODO: Current DV controller had threadiness 3, should we do the same here, defaults to one thread.
252
        if _, err := dvc.NewImportController(ctx, mgr, log, installerLabels); err != nil {
×
253
                klog.Errorf("Unable to setup datavolume import controller: %v", err)
×
254
                os.Exit(1)
×
255
        }
×
256
        if _, err := dvc.NewUploadController(ctx, mgr, log, installerLabels); err != nil {
257
                klog.Errorf("Unable to setup datavolume upload controller: %v", err)
×
258
                os.Exit(1)
×
259
        }
×
260
        if _, err := dvc.NewPvcCloneController(ctx, mgr, log,
×
261
                clonerImage, importerImage, pullPolicy, getTokenPublicKey(), getTokenPrivateKey(), installerLabels); err != nil {
262
                klog.Errorf("Unable to setup datavolume pvc clone controller: %v", err)
×
263
                os.Exit(1)
×
264
        }
×
265
        if _, err := dvc.NewSnapshotCloneController(ctx, mgr, log,
×
266
                clonerImage, importerImage, pullPolicy, getTokenPublicKey(), getTokenPrivateKey(), installerLabels); err != nil {
×
267
                klog.Errorf("Unable to setup datavolume snapshot clone controller: %v", err)
×
268
                os.Exit(1)
×
269
        }
×
270
        if _, err := dvc.NewPopulatorController(ctx, mgr, log, installerLabels); err != nil {
×
271
                klog.Errorf("Unable to setup datavolume external-population controller: %v", err)
×
272
                os.Exit(1)
×
273
        }
×
274

×
275
        if _, err := controller.NewImportController(mgr, log, importerImage, pullPolicy, verbose, installerLabels); err != nil {
×
276
                klog.Errorf("Unable to setup import controller: %v", err)
×
277
                os.Exit(1)
×
278
        }
×
279

×
280
        if _, err := controller.NewCloneController(mgr, log, clonerImage, pullPolicy, verbose, uploadClientCertGenerator, uploadServerBundleFetcher, getTokenPublicKey(), installerLabels); err != nil {
×
281
                klog.Errorf("Unable to setup clone controller: %v", err)
×
282
                os.Exit(1)
×
283
        }
×
284

×
285
        if _, err := controller.NewUploadController(mgr, log, uploadServerImage, pullPolicy, verbose, uploadServerCertGenerator, uploadClientBundleFetcher, installerLabels); err != nil {
×
286
                klog.Errorf("Unable to setup upload controller: %v", err)
×
287
                os.Exit(1)
288
        }
×
289

×
290
        if _, err := transfer.NewObjectTransferController(mgr, log, installerLabels); err != nil {
×
291
                klog.Errorf("Unable to setup transfer controller: %v", err)
×
292
                os.Exit(1)
293
        }
×
294

×
295
        if _, err := controller.NewDataImportCronController(mgr, log, importerImage, pullPolicy, installerLabels); err != nil {
×
296
                klog.Errorf("Unable to setup dataimportcron controller: %v", err)
×
297
                os.Exit(1)
298
        }
×
299
        if _, err := controller.NewDataSourceController(mgr, log, installerLabels); err != nil {
×
300
                klog.Errorf("Unable to setup datasource controller: %v", err)
×
301
                os.Exit(1)
×
302
        }
303
        // Populator controllers and indexes
×
304
        if err := populators.CreateCommonPopulatorIndexes(mgr); err != nil {
×
305
                klog.Errorf("Unable to create common populator indexes: %v", err)
×
306
                os.Exit(1)
×
307
        }
308
        if _, err := populators.NewImportPopulator(ctx, mgr, log, installerLabels); err != nil {
×
309
                klog.Errorf("Unable to setup import populator: %v", err)
×
310
                os.Exit(1)
×
311
        }
×
312
        if _, err := populators.NewUploadPopulator(ctx, mgr, log, installerLabels); err != nil {
×
313
                klog.Errorf("Unable to setup upload populator: %v", err)
×
314
                os.Exit(1)
×
315
        }
×
316
        if _, err := populators.NewClonePopulator(ctx, mgr, log, clonerImage, pullPolicy, installerLabels, getTokenPublicKey()); err != nil {
317
                klog.Errorf("Unable to setup clone populator: %v", err)
×
318
                os.Exit(1)
×
319
        }
×
320
        if _, err := populators.NewForkliftPopulator(ctx, mgr, log, importerImage, ovirtPopulatorImage, installerLabels); err != nil {
×
321
                klog.Errorf("Unable to setup forklift populator: %v", err)
×
322
                os.Exit(1)
×
323
        }
×
324

×
325
        klog.V(1).Infoln("created cdi controllers")
×
326

×
327
        if err := mgr.Start(ctx); err != nil {
×
328
                klog.Errorf("Error running manager: %v", err)
×
329
                os.Exit(1)
×
330
        }
×
331
}
×
332

×
333
func main() {
×
334
        defer klog.Flush()
×
335
        debug := false
×
336
        verbosityLevel, err := strconv.Atoi(verbose)
×
337
        if err == nil && verbosityLevel > 1 {
338
                debug = true
×
339
        }
×
340
        err = envconfig.Process("", &controllerEnvs)
×
341
        if err != nil {
×
342
                klog.Fatalf("Unable to get environment variables: %v\n", errors.WithStack(err))
×
343
        }
344

×
345
        logf.SetLogger(zap.New(zap.Level(zapcore.Level(-1*verbosityLevel)), zap.UseDevMode(debug)))
×
346
        logf.Log.WithName("main").Info("Verbosity level", "verbose", verbose, "debug", debug)
×
347

×
348
        if err = createReadyFile(); err != nil {
×
349
                klog.Fatalf("Error creating ready file: %+v", err)
×
350
        }
351

352
        start()
×
353

×
354
        deleteReadyFile()
×
355

×
356
        klog.V(2).Infoln("cdi controller exited")
×
357
}
×
358

×
359
func createReadyFile() error {
×
360
        f, err := os.Create(readyFile)
×
361
        if err != nil {
×
362
                return err
×
363
        }
364
        defer f.Close()
×
365
        return nil
×
366
}
×
367

×
368
func deleteReadyFile() {
×
369
        os.Remove(readyFile)
×
370
}
371

×
372
func getTokenPublicKey() *rsa.PublicKey {
×
373
        keyBytes, err := os.ReadFile(controller.TokenPublicKeyPath)
×
374
        if err != nil {
×
375
                klog.Fatalf("Error reading apiserver public key")
×
376
        }
377

378
        key, err := controller.DecodePublicKey(keyBytes)
×
379
        if err != nil {
×
380
                klog.Fatalf("Error decoding public key")
×
381
        }
×
382

×
383
        return key
×
384
}
×
385

386
func getTokenPrivateKey() *rsa.PrivateKey {
387
        bytes, err := os.ReadFile(controller.TokenPrivateKeyPath)
×
388
        if err != nil {
×
389
                klog.Fatalf("Error reading private key")
×
390
        }
391

×
392
        obj, err := cert.ParsePrivateKeyPEM(bytes)
×
393
        if err != nil {
×
394
                klog.Fatalf("Error decoding private key")
×
395
        }
×
396

397
        key, ok := obj.(*rsa.PrivateKey)
×
398
        if !ok {
×
399
                klog.Fatalf("Invalid private key format")
×
400
        }
×
401

402
        return key
×
403
}
404

405
// Restricts some types in the cache's ListWatch to specific fields/labels per GVK at the specified object,
×
406
// other types will continue working normally.
×
407
// Note: objects you read once with the controller runtime client are cached.
×
408
// TODO: Make our watches way more specific using labels, for example,
×
409
// at the point of writing this, we don't care about VolumeSnapshots without the CDI label
×
410
func getCacheOptions(apiClient client.Client, cdiNamespace string) cache.Options {
411
        namespaceSelector := fields.Set{"metadata.namespace": cdiNamespace}.AsSelector()
×
412

×
413
        cacheOptions := cache.Options{
×
414
                ByObject: map[client.Object]cache.ByObject{
×
415
                        &networkingv1.Ingress{}: {
416
                                Field: namespaceSelector,
×
417
                        },
×
418
                        &batchv1.CronJob{}: {
×
419
                                Field: namespaceSelector,
×
420
                        },
421
                        &batchv1.Job{}: {
×
422
                                Field: namespaceSelector,
423
                        },
424
                        &v1.ConfigMap{}: {
425
                                Field: namespaceSelector,
426
                        },
427
                        &v1.Secret{}: {
428
                                Field: namespaceSelector,
429
                        },
×
430
                },
×
431
        }
×
432

×
433
        cacheOptionsByObjectForOpenshift := map[client.Object]cache.ByObject{
×
434
                &routev1.Route{}: {
×
435
                        Field: namespaceSelector,
×
436
                },
×
437
        }
×
438

×
439
        // Currently controller-runtime will fail if types in here are not installed in the cluster
×
440
        // https://github.com/kubernetes-sigs/controller-runtime/issues/2456
×
441
        if isOpenShift(apiClient) {
×
442
                for k, v := range cacheOptionsByObjectForOpenshift {
×
443
                        cacheOptions.ByObject[k] = v
×
444
                }
×
445
        }
×
446

×
447
        return cacheOptions
×
448
}
×
449

×
450
func isOpenShift(apiClient client.Client) bool {
×
451
        clusterVersion := &ocpconfigv1.ClusterVersion{
×
452
                ObjectMeta: metav1.ObjectMeta{
×
453
                        Name: "version",
×
454
                },
×
455
        }
×
456
        if err := apiClient.Get(context.TODO(), client.ObjectKeyFromObject(clusterVersion), clusterVersion); err != nil {
×
457
                if !meta.IsNoMatchError(err) {
×
458
                        klog.Errorf("Error getting clusterVersion: %v", err)
×
459
                        os.Exit(1)
×
460
                }
×
461
                return false
×
462
        }
×
463

×
464
        return true
465
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc