• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #4766

07 Jul 2024 11:44AM UTC coverage: 58.985% (+0.03%) from 58.956%
#4766

push

travis-ci

web-flow
Run bazelisk run //robots/cmd/uploader:uploader -- -workspace /home/prow/go/src/github.com/kubevirt/project-infra/../containerized-data-importer/WORKSPACE -dry-run=false (#3334)

Signed-off-by: kubevirt-bot <kubevirtbot@redhat.com>

16445 of 27880 relevant lines covered (58.98%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/cmd/cdi-controller/controller.go
1
package main
2

3
import (
4
        "context"
5
        "crypto/rsa"
6
        "flag"
7
        "fmt"
8
        "os"
9
        "strconv"
10

11
        "github.com/kelseyhightower/envconfig"
12
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
13
        ocpconfigv1 "github.com/openshift/api/config/v1"
14
        imagev1 "github.com/openshift/api/image/v1"
15
        routev1 "github.com/openshift/api/route/v1"
16
        "github.com/pkg/errors"
17
        "go.uber.org/zap/zapcore"
18

19
        batchv1 "k8s.io/api/batch/v1"
20
        v1 "k8s.io/api/core/v1"
21
        networkingv1 "k8s.io/api/networking/v1"
22
        extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
23
        "k8s.io/apimachinery/pkg/api/meta"
24
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25
        "k8s.io/apimachinery/pkg/fields"
26
        apiruntime "k8s.io/apimachinery/pkg/runtime"
27
        "k8s.io/client-go/kubernetes"
28
        clientgoscheme "k8s.io/client-go/kubernetes/scheme"
29
        "k8s.io/client-go/tools/clientcmd"
30
        "k8s.io/klog/v2"
31

32
        "sigs.k8s.io/controller-runtime/pkg/cache"
33
        "sigs.k8s.io/controller-runtime/pkg/client"
34
        "sigs.k8s.io/controller-runtime/pkg/client/config"
35
        logf "sigs.k8s.io/controller-runtime/pkg/log"
36
        "sigs.k8s.io/controller-runtime/pkg/log/zap"
37
        "sigs.k8s.io/controller-runtime/pkg/manager"
38
        "sigs.k8s.io/controller-runtime/pkg/manager/signals"
39

40
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
41
        forklift "kubevirt.io/containerized-data-importer-api/pkg/apis/forklift/v1beta1"
42
        "kubevirt.io/containerized-data-importer/pkg/common"
43
        "kubevirt.io/containerized-data-importer/pkg/controller"
44
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
45
        "kubevirt.io/containerized-data-importer/pkg/controller/populators"
46
        "kubevirt.io/containerized-data-importer/pkg/controller/transfer"
47
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
48
        "kubevirt.io/containerized-data-importer/pkg/util"
49
        "kubevirt.io/containerized-data-importer/pkg/util/cert"
50
        "kubevirt.io/containerized-data-importer/pkg/util/cert/fetcher"
51
        "kubevirt.io/containerized-data-importer/pkg/util/cert/generator"
52
)
53

54
const (
55
        readyFile = "/tmp/ready"
56
)
57

58
var (
59
        kubeconfig             string
60
        kubeURL                string
61
        importerImage          string
62
        clonerImage            string
63
        uploadServerImage      string
64
        uploadProxyServiceName string
65
        ovirtPopulatorImage    string
66
        configName             string
67
        pullPolicy             string
68
        verbose                string
69
        installerLabels        map[string]string
70
        log                    = logf.Log.WithName("controller")
71
        controllerEnvs         ControllerEnvs
72
        resourcesSchemeFuncs   = []func(*apiruntime.Scheme) error{
73
                clientgoscheme.AddToScheme,
74
                cdiv1.AddToScheme,
75
                extv1.AddToScheme,
76
                snapshotv1.AddToScheme,
77
                forklift.AddToScheme,
78
                imagev1.Install,
79
                ocpconfigv1.Install,
80
                routev1.Install,
81
        }
82
)
83

84
// ControllerEnvs contains environment variables read for setting custom cert paths
85
type ControllerEnvs struct {
86
        UploadServerKeyFile           string `default:"/var/run/certs/cdi-uploadserver-signer/tls.key" split_words:"true"`
87
        UploadServerCertFile          string `default:"/var/run/certs/cdi-uploadserver-signer/tls.crt" split_words:"true"`
88
        UploadClientKeyFile           string `default:"/var/run/certs/cdi-uploadserver-client-signer/tls.key" split_words:"true"`
89
        UploadClientCertFile          string `default:"/var/run/certs/cdi-uploadserver-client-signer/tls.crt" split_words:"true"`
90
        UploadServerCaBundleConfigMap string `default:"cdi-uploadserver-signer-bundle" split_words:"true"`
91
        UploadClientCaBundleConfigMap string `default:"cdi-uploadserver-client-signer-bundle" split_words:"true"`
92
}
93

94
// The importer and cloner images are obtained here along with the supported flags. IMPORTER_IMAGE, CLONER_IMAGE, and UPLOADSERVICE_IMAGE
95
// are required by the controller and will cause it to fail if not defined.
96
// Note: kubeconfig hierarchy is 1) -kubeconfig flag, 2) $KUBECONFIG exported var. If neither is
97
// specified we do an in-cluster config. For testing it's easiest to export KUBECONFIG.
98
func init() {
×
99
        // flags
×
100
        flag.StringVar(&kubeURL, "server", "", "(Optional) URL address of a remote api server.  Do not set for local clusters.")
×
101
        klog.InitFlags(nil)
×
102
        flag.Parse()
×
103

×
104
        if flag.Lookup("kubeconfig") != nil {
×
105
                kubeconfig = flag.Lookup("kubeconfig").Value.String()
×
106
        }
×
107
        importerImage = getRequiredEnvVar("IMPORTER_IMAGE")
×
108
        clonerImage = getRequiredEnvVar("CLONER_IMAGE")
×
109
        uploadServerImage = getRequiredEnvVar("UPLOADSERVER_IMAGE")
×
110
        ovirtPopulatorImage = getRequiredEnvVar("OVIRT_POPULATOR_IMAGE")
×
111
        uploadProxyServiceName = getRequiredEnvVar("UPLOADPROXY_SERVICE")
×
112
        installerLabels = map[string]string{}
×
113

×
114
        pullPolicy = common.DefaultPullPolicy
×
115
        if pp := os.Getenv(common.PullPolicy); len(pp) != 0 {
×
116
                pullPolicy = pp
×
117
        }
×
118

119
        // We will need to put those on every resource our controller creates
120
        if partOfVal := os.Getenv(common.InstallerPartOfLabel); len(partOfVal) != 0 {
×
121
                installerLabels[common.AppKubernetesPartOfLabel] = partOfVal
×
122
        }
×
123
        if versionVal := os.Getenv(common.InstallerVersionLabel); len(versionVal) != 0 {
×
124
                installerLabels[common.AppKubernetesVersionLabel] = versionVal
×
125
        }
×
126

127
        configName = common.ConfigName
×
128

×
129
        // NOTE we used to have a constant here and we're now just passing in the level directly
×
130
        // that should be fine since it was a constant and not a mutable variable
×
131
        defVerbose := fmt.Sprintf("%d", 1) // note flag values are strings
×
132
        verbose = defVerbose
×
133
        // visit actual flags passed in and if passed check -v and set verbose
×
134
        flag.Visit(func(f *flag.Flag) {
×
135
                if f.Name == "v" {
×
136
                        verbose = f.Value.String()
×
137
                }
×
138
        })
139
        if verbose == defVerbose {
×
140
                klog.V(1).Infof("Note: increase the -v level in the controller deployment for more detailed logging, eg. -v=%d or -v=%d\n", 2, 3)
×
141
        }
×
142

143
        // Setup metrics for our various controllers
144
        if err := metrics.SetupMetrics(); err != nil {
×
145
                klog.Errorf("failed to setup metrics: %v", err)
×
146
                os.Exit(1)
×
147
        }
×
148

149
        klog.V(3).Infof("init: complete: cdi controller will create importer using image %q\n", importerImage)
×
150
}
151

152
func getRequiredEnvVar(name string) string {
×
153
        val := os.Getenv(name)
×
154
        if val == "" {
×
155
                klog.Fatalf("Environment Variable %q undefined\n", name)
×
156
        }
×
157
        return val
×
158
}
159

160
func start() {
×
161
        klog.Info("Starting CDI controller components")
×
162

×
163
        namespace := util.GetNamespace()
×
164

×
165
        cfg, err := clientcmd.BuildConfigFromFlags(kubeURL, kubeconfig)
×
166
        if err != nil {
×
167
                klog.Fatalf("Unable to get kube config: %v\n", errors.WithStack(err))
×
168
        }
×
169

170
        k8sClient, err := kubernetes.NewForConfig(cfg)
×
171
        if err != nil {
×
172
                klog.Fatalf("Unable to get kube client: %v\n", errors.WithStack(err))
×
173
        }
×
174

175
        // Setup scheme for all resources
176
        scheme := apiruntime.NewScheme()
×
177
        for _, f := range resourcesSchemeFuncs {
×
178
                err := f(scheme)
×
179
                if err != nil {
×
180
                        klog.Errorf("Failed to add to scheme: %v", err)
×
181
                        os.Exit(1)
×
182
                }
×
183
        }
184

185
        // client.New() returns a client without cache
186
        // since we don't have a cached client before manager init
187
        apiClient, err := client.New(cfg, client.Options{
×
188
                Scheme: scheme,
×
189
        })
×
190
        if err != nil {
×
191
                klog.Fatalf("Unable to get uncached client: %v\n", errors.WithStack(err))
×
192
        }
×
193

194
        opts := manager.Options{
×
195
                LeaderElection:             true,
×
196
                LeaderElectionNamespace:    namespace,
×
197
                LeaderElectionID:           "cdi-controller-leader-election-helper",
×
198
                LeaderElectionResourceLock: "leases",
×
199
                Cache:                      getCacheOptions(apiClient, namespace),
×
200
                Scheme:                     scheme,
×
201
        }
×
202

×
203
        mgr, err := manager.New(config.GetConfigOrDie(), opts)
×
204
        if err != nil {
×
205
                klog.Errorf("Unable to setup controller manager: %v", err)
×
206
                os.Exit(1)
×
207
        }
×
208

209
        uploadClientCAFetcher := &fetcher.FileCertFetcher{KeyFileName: controllerEnvs.UploadClientKeyFile, CertFileName: controllerEnvs.UploadClientCertFile}
×
210
        uploadClientBundleFetcher := &fetcher.ConfigMapCertBundleFetcher{
×
211
                Name:   controllerEnvs.UploadClientCaBundleConfigMap,
×
212
                Client: k8sClient.CoreV1().ConfigMaps(namespace),
×
213
        }
×
214
        uploadClientCertGenerator := &generator.FetchCertGenerator{Fetcher: uploadClientCAFetcher}
×
215

×
216
        uploadServerCAFetcher := &fetcher.FileCertFetcher{KeyFileName: controllerEnvs.UploadServerKeyFile, CertFileName: controllerEnvs.UploadServerCertFile}
×
217
        uploadServerBundleFetcher := &fetcher.ConfigMapCertBundleFetcher{
×
218
                Name:   controllerEnvs.UploadServerCaBundleConfigMap,
×
219
                Client: k8sClient.CoreV1().ConfigMaps(namespace),
×
220
        }
×
221
        uploadServerCertGenerator := &generator.FetchCertGenerator{Fetcher: uploadServerCAFetcher}
×
222

×
223
        if _, err := controller.NewConfigController(mgr, log, uploadProxyServiceName, configName, installerLabels); err != nil {
×
224
                klog.Errorf("Unable to setup config controller: %v", err)
×
225
                os.Exit(1)
×
226
        }
×
227

228
        if _, err := controller.NewStorageProfileController(mgr, log, installerLabels); err != nil {
×
229
                klog.Errorf("Unable to setup storage profiles controller: %v", err)
×
230
                os.Exit(1)
×
231
        }
×
232

233
        if err := dvc.CreateCommonIndexes(mgr); err != nil {
×
234
                klog.Errorf("Unable to create shared indexes: %v", err)
×
235
                os.Exit(1)
×
236
        }
×
237

238
        ctx := signals.SetupSignalHandler()
×
239

×
240
        // TODO: Current DV controller had threadiness 3, should we do the same here, defaults to one thread.
×
241
        if _, err := dvc.NewImportController(ctx, mgr, log, installerLabels); err != nil {
×
242
                klog.Errorf("Unable to setup datavolume import controller: %v", err)
×
243
                os.Exit(1)
×
244
        }
×
245
        if _, err := dvc.NewUploadController(ctx, mgr, log, installerLabels); err != nil {
×
246
                klog.Errorf("Unable to setup datavolume upload controller: %v", err)
×
247
                os.Exit(1)
×
248
        }
×
249
        if _, err := dvc.NewPvcCloneController(ctx, mgr, log,
×
250
                clonerImage, importerImage, pullPolicy, getTokenPublicKey(), getTokenPrivateKey(), installerLabels); err != nil {
×
251
                klog.Errorf("Unable to setup datavolume pvc clone controller: %v", err)
×
252
                os.Exit(1)
×
253
        }
×
254
        if _, err := dvc.NewSnapshotCloneController(ctx, mgr, log,
×
255
                clonerImage, importerImage, pullPolicy, getTokenPublicKey(), getTokenPrivateKey(), installerLabels); err != nil {
×
256
                klog.Errorf("Unable to setup datavolume snapshot clone controller: %v", err)
×
257
                os.Exit(1)
×
258
        }
×
259
        if _, err := dvc.NewPopulatorController(ctx, mgr, log, installerLabels); err != nil {
×
260
                klog.Errorf("Unable to setup datavolume external-population controller: %v", err)
×
261
                os.Exit(1)
×
262
        }
×
263

264
        if _, err := controller.NewImportController(mgr, log, importerImage, pullPolicy, verbose, installerLabels); err != nil {
×
265
                klog.Errorf("Unable to setup import controller: %v", err)
×
266
                os.Exit(1)
×
267
        }
×
268

269
        if _, err := controller.NewCloneController(mgr, log, clonerImage, pullPolicy, verbose, uploadClientCertGenerator, uploadServerBundleFetcher, getTokenPublicKey(), installerLabels); err != nil {
×
270
                klog.Errorf("Unable to setup clone controller: %v", err)
×
271
                os.Exit(1)
×
272
        }
×
273

274
        if _, err := controller.NewUploadController(mgr, log, uploadServerImage, pullPolicy, verbose, uploadServerCertGenerator, uploadClientBundleFetcher, installerLabels); err != nil {
×
275
                klog.Errorf("Unable to setup upload controller: %v", err)
×
276
                os.Exit(1)
×
277
        }
×
278

279
        if _, err := transfer.NewObjectTransferController(mgr, log, installerLabels); err != nil {
×
280
                klog.Errorf("Unable to setup transfer controller: %v", err)
×
281
                os.Exit(1)
×
282
        }
×
283

284
        if _, err := controller.NewDataImportCronController(mgr, log, importerImage, pullPolicy, installerLabels); err != nil {
×
285
                klog.Errorf("Unable to setup dataimportcron controller: %v", err)
×
286
                os.Exit(1)
×
287
        }
×
288
        if _, err := controller.NewDataSourceController(mgr, log, installerLabels); err != nil {
×
289
                klog.Errorf("Unable to setup datasource controller: %v", err)
×
290
                os.Exit(1)
×
291
        }
×
292
        // Populator controllers and indexes
293
        if err := populators.CreateCommonPopulatorIndexes(mgr); err != nil {
×
294
                klog.Errorf("Unable to create common populator indexes: %v", err)
×
295
                os.Exit(1)
×
296
        }
×
297
        if _, err := populators.NewImportPopulator(ctx, mgr, log, installerLabels); err != nil {
×
298
                klog.Errorf("Unable to setup import populator: %v", err)
×
299
                os.Exit(1)
×
300
        }
×
301
        if _, err := populators.NewUploadPopulator(ctx, mgr, log, installerLabels); err != nil {
×
302
                klog.Errorf("Unable to setup upload populator: %v", err)
×
303
                os.Exit(1)
×
304
        }
×
305
        if _, err := populators.NewClonePopulator(ctx, mgr, log, clonerImage, pullPolicy, installerLabels, getTokenPublicKey()); err != nil {
×
306
                klog.Errorf("Unable to setup clone populator: %v", err)
×
307
                os.Exit(1)
×
308
        }
×
309
        if _, err := populators.NewForkliftPopulator(ctx, mgr, log, importerImage, ovirtPopulatorImage, installerLabels); err != nil {
×
310
                klog.Errorf("Unable to setup forklift populator: %v", err)
×
311
                os.Exit(1)
×
312
        }
×
313

314
        klog.V(1).Infoln("created cdi controllers")
×
315

×
316
        if err := mgr.Start(ctx); err != nil {
×
317
                klog.Errorf("Error running manager: %v", err)
×
318
                os.Exit(1)
×
319
        }
×
320
}
321

322
func main() {
×
323
        defer klog.Flush()
×
324
        debug := false
×
325
        verbosityLevel, err := strconv.Atoi(verbose)
×
326
        if err == nil && verbosityLevel > 1 {
×
327
                debug = true
×
328
        }
×
329
        err = envconfig.Process("", &controllerEnvs)
×
330
        if err != nil {
×
331
                klog.Fatalf("Unable to get environment variables: %v\n", errors.WithStack(err))
×
332
        }
×
333

334
        logf.SetLogger(zap.New(zap.Level(zapcore.Level(-1*verbosityLevel)), zap.UseDevMode(debug)))
×
335
        logf.Log.WithName("main").Info("Verbosity level", "verbose", verbose, "debug", debug)
×
336

×
337
        if err = createReadyFile(); err != nil {
×
338
                klog.Fatalf("Error creating ready file: %+v", err)
×
339
        }
×
340

341
        start()
×
342

×
343
        deleteReadyFile()
×
344

×
345
        klog.V(2).Infoln("cdi controller exited")
×
346
}
347

348
func createReadyFile() error {
×
349
        f, err := os.Create(readyFile)
×
350
        if err != nil {
×
351
                return err
×
352
        }
×
353
        defer f.Close()
×
354
        return nil
×
355
}
356

357
func deleteReadyFile() {
×
358
        os.Remove(readyFile)
×
359
}
×
360

361
func getTokenPublicKey() *rsa.PublicKey {
×
362
        keyBytes, err := os.ReadFile(controller.TokenPublicKeyPath)
×
363
        if err != nil {
×
364
                klog.Fatalf("Error reading apiserver public key")
×
365
        }
×
366

367
        key, err := controller.DecodePublicKey(keyBytes)
×
368
        if err != nil {
×
369
                klog.Fatalf("Error decoding public key")
×
370
        }
×
371

372
        return key
×
373
}
374

375
func getTokenPrivateKey() *rsa.PrivateKey {
×
376
        bytes, err := os.ReadFile(controller.TokenPrivateKeyPath)
×
377
        if err != nil {
×
378
                klog.Fatalf("Error reading private key")
×
379
        }
×
380

381
        obj, err := cert.ParsePrivateKeyPEM(bytes)
×
382
        if err != nil {
×
383
                klog.Fatalf("Error decoding private key")
×
384
        }
×
385

386
        key, ok := obj.(*rsa.PrivateKey)
×
387
        if !ok {
×
388
                klog.Fatalf("Invalid private key format")
×
389
        }
×
390

391
        return key
×
392
}
393

394
// Restricts some types in the cache's ListWatch to specific fields/labels per GVK at the specified object,
395
// other types will continue working normally.
396
// Note: objects you read once with the controller runtime client are cached.
397
// TODO: Make our watches way more specific using labels, for example,
398
// at the point of writing this, we don't care about VolumeSnapshots without the CDI label
399
func getCacheOptions(apiClient client.Client, cdiNamespace string) cache.Options {
×
400
        namespaceSelector := fields.Set{"metadata.namespace": cdiNamespace}.AsSelector()
×
401

×
402
        cacheOptions := cache.Options{
×
403
                ByObject: map[client.Object]cache.ByObject{
×
404
                        &networkingv1.Ingress{}: {
×
405
                                Field: namespaceSelector,
×
406
                        },
×
407
                        &batchv1.CronJob{}: {
×
408
                                Field: namespaceSelector,
×
409
                        },
×
410
                        &batchv1.Job{}: {
×
411
                                Field: namespaceSelector,
×
412
                        },
×
413
                        &v1.ConfigMap{}: {
×
414
                                Field: namespaceSelector,
×
415
                        },
×
416
                },
×
417
        }
×
418

×
419
        cacheOptionsByObjectForOpenshift := map[client.Object]cache.ByObject{
×
420
                &routev1.Route{}: {
×
421
                        Field: namespaceSelector,
×
422
                },
×
423
        }
×
424

×
425
        // Currently controller-runtime will fail if types in here are not installed in the cluster
×
426
        // https://github.com/kubernetes-sigs/controller-runtime/issues/2456
×
427
        if isOpenShift(apiClient) {
×
428
                for k, v := range cacheOptionsByObjectForOpenshift {
×
429
                        cacheOptions.ByObject[k] = v
×
430
                }
×
431
        }
432

433
        return cacheOptions
×
434
}
435

436
func isOpenShift(apiClient client.Client) bool {
×
437
        clusterVersion := &ocpconfigv1.ClusterVersion{
×
438
                ObjectMeta: metav1.ObjectMeta{
×
439
                        Name: "version",
×
440
                },
×
441
        }
×
442
        if err := apiClient.Get(context.TODO(), client.ObjectKeyFromObject(clusterVersion), clusterVersion); err != nil {
×
443
                if !meta.IsNoMatchError(err) {
×
444
                        klog.Errorf("Error getting clusterVersion: %v", err)
×
445
                        os.Exit(1)
×
446
                }
×
447
                return false
×
448
        }
449

450
        return true
×
451
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc