• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5105

26 Jan 2025 10:50AM UTC coverage: 59.44%. First build
#5105

Pull #3617

travis-ci

arnongilboa
Add PVC source support for DataImportCron

A PVC from any namespace can now be the source for a DataImportCron. The
source digest is based on the PVC UID, which is polled by the schedule
similarly to image stream, so when a new PVC is detected it will be
imported.

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>
Pull Request #3617: Add PVC source support for DataImportCron

69 of 72 new or added lines in 2 files covered. (95.83%)

16796 of 28257 relevant lines covered (59.44%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.82
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        "github.com/pkg/errors"
33
        cronexpr "github.com/robfig/cron/v3"
34

35
        batchv1 "k8s.io/api/batch/v1"
36
        corev1 "k8s.io/api/core/v1"
37
        storagev1 "k8s.io/api/storage/v1"
38
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
39
        "k8s.io/apimachinery/pkg/api/meta"
40
        "k8s.io/apimachinery/pkg/api/resource"
41
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
42
        "k8s.io/apimachinery/pkg/labels"
43
        "k8s.io/apimachinery/pkg/runtime"
44
        "k8s.io/apimachinery/pkg/types"
45
        "k8s.io/client-go/tools/record"
46
        openapicommon "k8s.io/kube-openapi/pkg/common"
47
        "k8s.io/utils/ptr"
48

49
        "sigs.k8s.io/controller-runtime/pkg/client"
50
        "sigs.k8s.io/controller-runtime/pkg/controller"
51
        "sigs.k8s.io/controller-runtime/pkg/event"
52
        "sigs.k8s.io/controller-runtime/pkg/handler"
53
        "sigs.k8s.io/controller-runtime/pkg/manager"
54
        "sigs.k8s.io/controller-runtime/pkg/predicate"
55
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
56
        "sigs.k8s.io/controller-runtime/pkg/source"
57

58
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
59
        "kubevirt.io/containerized-data-importer/pkg/common"
60
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
61
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
62
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
63
        "kubevirt.io/containerized-data-importer/pkg/operator"
64
        "kubevirt.io/containerized-data-importer/pkg/util"
65
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
66
)
67

68
const (
69
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
70
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
71
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
72
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
73
)
74

75
// DataImportCronReconciler members
76
type DataImportCronReconciler struct {
77
        client          client.Client
78
        uncachedClient  client.Client
79
        recorder        record.EventRecorder
80
        scheme          *runtime.Scheme
81
        log             logr.Logger
82
        image           string
83
        pullPolicy      string
84
        cdiNamespace    string
85
        installerLabels map[string]string
86
}
87

88
const (
89
        // AnnSourceDesiredDigest is the digest of the pending updated image
90
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
91
        // AnnImageStreamDockerRef is the ImageStream Docker reference
92
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
93
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
94
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
95
        // AnnLastCronTime is the cron last execution time stamp
96
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
97
        // AnnLastUseTime is the PVC last use time stamp
98
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
99
        // AnnStorageClass is the cron DV's storage class
100
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
101

102
        dataImportControllerName    = "dataimportcron-controller"
103
        digestSha256Prefix          = "sha256:"
104
        digestUIDPrefix             = "uid:"
105
        digestDvNameSuffixLength    = 12
106
        cronJobUIDSuffixLength      = 8
107
        defaultImportsToKeepPerCron = 3
108
)
109

110
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
111

112
// Reconcile loop for DataImportCronReconciler
113
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
114
        dataImportCron := &cdiv1.DataImportCron{}
1✔
115
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
116
                return reconcile.Result{}, err
1✔
117
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
118
                err := r.cleanup(ctx, req.NamespacedName)
1✔
119
                return reconcile.Result{}, err
1✔
120
        }
1✔
121
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
122
        if !shouldReconcile || err != nil {
1✔
123
                return reconcile.Result{}, err
×
124
        }
×
125

126
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
127
                return reconcile.Result{}, err
×
128
        }
×
129

130
        return r.update(ctx, dataImportCron)
1✔
131
}
132

133
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
134
        dataSource := &cdiv1.DataSource{}
1✔
135
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
136
                if k8serrors.IsNotFound(err) {
2✔
137
                        return true, nil
1✔
138
                }
1✔
139
                return false, err
×
140
        }
141
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
142
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
143
                return true, nil
1✔
144
        }
1✔
145
        otherCron := &cdiv1.DataImportCron{}
1✔
146
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
147
                if k8serrors.IsNotFound(err) {
2✔
148
                        return true, nil
1✔
149
                }
1✔
150
                return false, err
×
151
        }
152
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
153
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
154
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
155
                r.log.V(3).Info(msg)
1✔
156
                return false, nil
1✔
157
        }
1✔
158
        return true, nil
×
159
}
160

161
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
162
        if dataImportCron.Spec.Schedule == "" {
2✔
163
                return nil
1✔
164
        }
1✔
165
        if isControllerPolledSource(dataImportCron) {
2✔
166
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
167
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
168
                }
1✔
169
                return nil
1✔
170
        }
171
        if !isURLSource(dataImportCron) {
1✔
172
                return nil
×
173
        }
×
174
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
175
        if exists || err != nil {
2✔
176
                return err
1✔
177
        }
1✔
178
        cronJob, err := r.newCronJob(dataImportCron)
1✔
179
        if err != nil {
1✔
180
                return err
×
181
        }
×
182
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
183
                return err
×
184
        }
×
185
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
186
        if err != nil {
1✔
187
                return err
×
188
        }
×
189
        if err := r.client.Create(ctx, job); err != nil {
1✔
190
                return err
×
191
        }
×
192
        return nil
1✔
193
}
194

195
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
196
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
197
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
198
        }
×
199
        imageStream := &imagev1.ImageStream{}
1✔
200
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
201
        if err != nil {
2✔
202
                return nil, "", err
1✔
203
        }
1✔
204
        imageStreamNamespacedName := types.NamespacedName{
1✔
205
                Namespace: imageStreamNamespace,
1✔
206
                Name:      name,
1✔
207
        }
1✔
208
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
209
                return nil, "", err
1✔
210
        }
1✔
211
        return imageStream, tag, nil
1✔
212
}
213

214
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
215
        if imageStream == nil {
1✔
216
                return "", "", errors.Errorf("No ImageStream")
×
217
        }
×
218
        tags := imageStream.Status.Tags
1✔
219
        if len(tags) == 0 {
1✔
220
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
221
        }
×
222

223
        tagIdx := 0
1✔
224
        if len(imageStreamTag) > 0 {
2✔
225
                tagIdx = -1
1✔
226
                for i, tag := range tags {
2✔
227
                        if tag.Tag == imageStreamTag {
2✔
228
                                tagIdx = i
1✔
229
                                break
1✔
230
                        }
231
                }
232
        }
233
        if tagIdx == -1 {
2✔
234
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
235
        }
1✔
236

237
        if len(tags[tagIdx].Items) == 0 {
2✔
238
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
239
        }
1✔
240
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
241
}
242

243
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
244
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
245
                return imageStreamName, "", nil
1✔
246
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
247
                return subs[0], subs[1], nil
1✔
248
        }
1✔
249
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
250
}
251

252
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
253
        if nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]; nextTimeStr != "" {
2✔
254
                nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
255
                if err != nil {
1✔
256
                        return reconcile.Result{}, err
×
257
                }
×
258
                if nextTime.Before(time.Now()) {
2✔
259
                        if isImageStreamSource(dataImportCron) {
2✔
260
                                if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
261
                                        return reconcile.Result{}, err
1✔
262
                                }
1✔
263
                        } else if isPvcSource(dataImportCron) {
2✔
264
                                if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
2✔
265
                                        return reconcile.Result{}, err
1✔
266
                                }
1✔
267
                        }
268
                }
269
        }
270
        return r.setNextCronTime(dataImportCron)
1✔
271
}
272

273
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
274
        now := time.Now()
1✔
275
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
276
        if err != nil {
1✔
277
                return reconcile.Result{}, err
×
278
        }
×
279
        nextTime := expr.Next(now)
1✔
280
        requeueAfter := nextTime.Sub(now)
1✔
281
        res := reconcile.Result{Requeue: true, RequeueAfter: requeueAfter}
1✔
282
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
283
        return res, err
1✔
284
}
285

286
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
287
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
288
        return err == nil && regSource.ImageStream != nil
1✔
289
}
1✔
290

291
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
292
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
293
        return err == nil && regSource.URL != nil
1✔
294
}
1✔
295

296
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
297
        if !isCronRegistrySource(cron) {
2✔
298
                return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
1✔
299
        }
1✔
300
        return cron.Spec.Template.Spec.Source.Registry, nil
1✔
301
}
302

303
func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
1✔
304
        source := cron.Spec.Template.Spec.Source
1✔
305
        return source != nil && source.Registry != nil
1✔
306
}
1✔
307

308
func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
1✔
309
        if !isPvcSource(cron) {
1✔
NEW
310
                return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
×
311
        }
×
312
        return cron.Spec.Template.Spec.Source.PVC, nil
1✔
313
}
314

315
func isPvcSource(cron *cdiv1.DataImportCron) bool {
1✔
316
        source := cron.Spec.Template.Spec.Source
1✔
317
        return source != nil && source.PVC != nil
1✔
318
}
1✔
319

320
func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
1✔
321
        return isImageStreamSource(cron) || isPvcSource(cron)
1✔
322
}
1✔
323

324
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
325
        res := reconcile.Result{}
1✔
326

1✔
327
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
328
        if err != nil {
1✔
329
                return res, err
×
330
        }
×
331

332
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
333
        imports := dataImportCron.Status.CurrentImports
1✔
334
        importSucceeded := false
1✔
335

1✔
336
        dataVolume := dataImportCron.Spec.Template
1✔
337
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
338
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
339
        if err != nil {
1✔
340
                return res, err
×
341
        }
×
342
        if desiredStorageClass != nil {
2✔
343
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
344
                        return res, err
1✔
345
                }
1✔
346
                currentSc, hasCurrent := dataImportCron.Annotations[AnnStorageClass]
1✔
347
                desiredSc := desiredStorageClass.Name
1✔
348
                if hasCurrent && currentSc != desiredSc {
2✔
349
                        r.log.Info("Storage class changed, delete most recent source on the old sc as it's no longer the desired", "currentSc", currentSc, "desiredSc", desiredSc)
1✔
350
                        if err := r.handleStorageClassChange(ctx, dataImportCron, desiredSc); err != nil {
1✔
351
                                return res, err
×
352
                        }
×
353
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
354
                }
355
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
356
        }
357
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
358
        if err != nil {
1✔
359
                return res, err
×
360
        }
×
361
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
362
        if err != nil {
1✔
363
                return res, err
×
364
        }
×
365

366
        handlePopulatedPvc := func() error {
2✔
367
                if pvc != nil {
2✔
368
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
369
                                return err
×
370
                        }
×
371
                }
372
                importSucceeded = true
1✔
373
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
374
                        return err
×
375
                }
×
376

377
                return nil
1✔
378
        }
379

380
        switch {
1✔
381
        case dv != nil:
1✔
382
                switch dv.Status.Phase {
1✔
383
                case cdiv1.Succeeded:
1✔
384
                        if err := handlePopulatedPvc(); err != nil {
1✔
385
                                return res, err
×
386
                        }
×
387
                case cdiv1.ImportScheduled:
1✔
388
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
389
                case cdiv1.ImportInProgress:
1✔
390
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
391
                default:
1✔
392
                        dvPhase := string(dv.Status.Phase)
1✔
393
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
394
                }
395
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
396
                if err := handlePopulatedPvc(); err != nil {
1✔
397
                        return res, err
×
398
                }
×
399
        case snapshot != nil:
1✔
400
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
401
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
402
                                return res, err
×
403
                        }
×
404
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
405
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
406
                }
407
                // Below k8s 1.29 there's no way to know the source volume mode
408
                // Let's at least expose this info on our own snapshots
409
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
410
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
411
                        if err != nil {
1✔
412
                                return res, err
×
413
                        }
×
414
                        if volMode != nil {
2✔
415
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
416
                        }
1✔
417
                }
418
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
419
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
420
                if err != nil {
2✔
421
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
422
                                return res, err
×
423
                        }
×
424
                } else {
1✔
425
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
426
                }
1✔
427
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
428
                        return res, err
×
429
                }
×
430
                importSucceeded = true
1✔
431
        default:
1✔
432
                if len(imports) > 0 {
2✔
433
                        imports = imports[1:]
1✔
434
                        dataImportCron.Status.CurrentImports = imports
1✔
435
                }
1✔
436
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
437
        }
438

439
        if importSucceeded {
2✔
440
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
441
                        return res, err
×
442
                }
×
443
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
444
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
445
                        return res, err
×
446
                }
×
447
        }
448

449
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
450
                return res, err
×
451
        }
×
452

453
        // Skip if schedule is disabled
454
        if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
455
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
456
                pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
1✔
457
                if err != nil {
2✔
458
                        return pollRes, err
1✔
459
                }
1✔
460
                res = pollRes
1✔
461
        }
462

463
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
464
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
465
        if digestUpdated {
2✔
466
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
467
                if dv != nil {
1✔
468
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
469
                                return res, err
×
470
                        }
×
471
                }
472
                if importSucceeded || len(imports) == 0 {
2✔
473
                        if err := r.createImportDataVolume(ctx, dataImportCron); err != nil {
2✔
474
                                return res, err
1✔
475
                        }
1✔
476
                }
477
        } else if importSucceeded {
2✔
478
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
479
                        return res, err
×
480
                }
×
481
        } else if len(imports) > 0 {
2✔
482
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
483
        } else {
2✔
484
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
485
        }
1✔
486

487
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
488
                return res, err
×
489
        }
×
490

491
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
492
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
493
                        return res, err
×
494
                }
×
495
        }
496
        return res, nil
1✔
497
}
498

499
// Returns the current import DV if exists, and the last imported PVC
500
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
501
        imports := cron.Status.CurrentImports
1✔
502
        if len(imports) == 0 {
2✔
503
                return nil, nil, nil
1✔
504
        }
1✔
505

506
        dvName := imports[0].DataVolumeName
1✔
507
        dv := &cdiv1.DataVolume{}
1✔
508
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
509
                if !k8serrors.IsNotFound(err) {
1✔
510
                        return nil, nil, err
×
511
                }
×
512
                dv = nil
1✔
513
        }
514

515
        pvc := &corev1.PersistentVolumeClaim{}
1✔
516
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
517
                if !k8serrors.IsNotFound(err) {
1✔
518
                        return nil, nil, err
×
519
                }
×
520
                pvc = nil
1✔
521
        }
522
        return dv, pvc, nil
1✔
523
}
524

525
// Returns the current import DV if exists, and the last imported PVC
526
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
527
        imports := cron.Status.CurrentImports
1✔
528
        if len(imports) == 0 {
2✔
529
                return nil, nil
1✔
530
        }
1✔
531

532
        snapName := imports[0].DataVolumeName
1✔
533
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
534
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
535
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
536
                        return nil, err
×
537
                }
×
538
                return nil, nil
1✔
539
        }
540

541
        return snapshot, nil
1✔
542
}
543

544
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
545
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
546
        dataSource := &cdiv1.DataSource{}
1✔
547
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
548
                return nil, err
1✔
549
        }
1✔
550
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
551
                log := r.log.WithName("getCronManagedDataSource")
×
552
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
×
553
                return nil, ErrNotManagedByCron
×
554
        }
×
555
        return dataSource, nil
1✔
556
}
557

558
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
559
        objCopy := obj.DeepCopyObject()
1✔
560
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
561
        r.setDataImportCronResourceLabels(cron, obj)
1✔
562
        if !reflect.DeepEqual(obj, objCopy) {
2✔
563
                if err := r.client.Update(ctx, obj); err != nil {
1✔
564
                        return err
×
565
                }
×
566
        }
567
        return nil
1✔
568
}
569

570
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
571
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
572
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
573
                if cond.Status == corev1.ConditionFalse &&
×
574
                        (cond.Reason == common.GenericError || cond.Reason == ImagePullFailedReason) {
×
575
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
576
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
577
                        dv.Labels[common.DataImportCronLabel] = ""
×
578
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
579
                                return err
×
580
                        }
×
581
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
582
                                return err
×
583
                        }
×
584
                        cron.Status.CurrentImports = nil
×
585
                }
586
        }
587
        return nil
×
588
}
589

590
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
591
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
592
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
593
        if err != nil {
1✔
594
                return err
×
595
        }
×
596
        if regSource.ImageStream == nil {
1✔
597
                return nil
×
598
        }
×
599
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
600
        if err != nil {
2✔
601
                return err
1✔
602
        }
1✔
603
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
604
        if err != nil {
2✔
605
                return err
1✔
606
        }
1✔
607
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
608
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
609
                log.Info("Updating DataImportCron", "digest", digest)
1✔
610
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
611
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
612
        }
1✔
613
        return nil
1✔
614
}
615

616
func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
617
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
618
        pvcSource, err := getCronPvcSource(dataImportCron)
1✔
619
        if err != nil {
1✔
NEW
620
                return err
×
NEW
621
        }
×
622
        ns := pvcSource.Namespace
1✔
623
        if ns == "" {
2✔
624
                ns = dataImportCron.Namespace
1✔
625
        }
1✔
626
        pvc := &corev1.PersistentVolumeClaim{}
1✔
627
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
2✔
628
                return err
1✔
629
        }
1✔
630
        digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
1✔
631
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
632
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
633
                log.Info("Updating DataImportCron", "digest", digest)
1✔
634
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
635
        }
1✔
636
        return nil
1✔
637
}
638

639
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
640
        log := r.log.WithName("updateDataSource")
1✔
641
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
642
        if err != nil {
2✔
643
                if k8serrors.IsNotFound(err) {
2✔
644
                        dataSource = r.newDataSource(dataImportCron)
1✔
645
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
646
                                return err
×
647
                        }
×
648
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
1✔
649
                } else if errors.Is(err, ErrNotManagedByCron) {
×
650
                        return nil
×
651
                } else {
×
652
                        return err
×
653
                }
×
654
        }
655
        dataSourceCopy := dataSource.DeepCopy()
1✔
656
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
657

1✔
658
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
659
        populateDataSource(format, dataSource, sourcePVC)
1✔
660

1✔
661
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
662
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
663
                        return err
×
664
                }
×
665
        }
666

667
        return nil
1✔
668
}
669

670
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
671
        if sourcePVC == nil {
2✔
672
                return
1✔
673
        }
1✔
674

675
        switch format {
1✔
676
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
677
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
678
                        PVC: sourcePVC,
1✔
679
                }
1✔
680
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
681
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
682
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
683
                                Namespace: sourcePVC.Namespace,
1✔
684
                                Name:      sourcePVC.Name,
1✔
685
                        },
1✔
686
                }
1✔
687
        }
688
}
689

690
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
691
        if dataImportCron.Status.CurrentImports == nil {
1✔
692
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
693
        }
×
694
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
695
                Namespace: dataImportCron.Namespace,
1✔
696
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
697
        }
1✔
698
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
699
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
700
                now := metav1.Now()
1✔
701
                dataImportCron.Status.LastImportTimestamp = &now
1✔
702
        }
1✔
703
        return nil
1✔
704
}
705

706
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
707
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
708
        if lastTimeStr == "" {
2✔
709
                return nil
1✔
710
        }
1✔
711
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
712
        if err != nil {
1✔
713
                return err
×
714
        }
×
715
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
716
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
717
        }
1✔
718
        return nil
1✔
719
}
720

721
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
722
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
723
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
724
        if digest == "" {
1✔
725
                return nil
×
726
        }
×
727
        dvName, err := createDvName(dataSourceName, digest)
1✔
728
        if err != nil {
2✔
729
                return err
1✔
730
        }
1✔
731
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
732

1✔
733
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
734
        for _, src := range sources {
2✔
735
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
736
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
737
                                return err
×
738
                        }
×
739
                } else {
1✔
740
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
741
                                return err
×
742
                        }
×
743
                        // If source exists don't create DV
744
                        return nil
1✔
745
                }
746
        }
747

748
        dv := r.newSourceDataVolume(dataImportCron, dvName)
1✔
749
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
750
                return err
×
751
        }
×
752

753
        return nil
1✔
754
}
755

756
func (r *DataImportCronReconciler) handleStorageClassChange(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
1✔
757
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
758
        if !ok {
1✔
759
                // nothing to delete
×
760
                return nil
×
761
        }
×
762
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
763
        if err != nil {
1✔
764
                return err
×
765
        }
×
766
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
1✔
767
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
1✔
768
        for _, src := range sources {
2✔
769
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
770
                        return err
×
771
                }
×
772
        }
773
        for _, src := range sources {
2✔
774
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
1✔
775
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
×
776
                }
×
777
        }
778
        // Only update desired storage class once garbage collection went through
779
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
1✔
780
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
781
        if err != nil {
1✔
782
                return err
×
783
        }
×
784

785
        return nil
1✔
786
}
787

788
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
789
        switch format {
1✔
790
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
791
                return nil
1✔
792
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
793
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
794
        default:
×
795
                return fmt.Errorf("unknown source format for snapshot")
×
796
        }
797
}
798

799
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
800
        if pvc == nil {
1✔
801
                return nil
×
802
        }
×
803
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
804
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
805
                return nil
1✔
806
        }
1✔
807
        storageProfile := &cdiv1.StorageProfile{}
1✔
808
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
809
                return err
×
810
        }
×
811
        className, err := cc.GetSnapshotClassForSmartClone(pvc, &desiredStorageClass.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
812
        if err != nil {
1✔
813
                return err
×
814
        }
×
815
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
816
                ObjectMeta: metav1.ObjectMeta{
1✔
817
                        Name:      pvc.Name,
1✔
818
                        Namespace: dataImportCron.Namespace,
1✔
819
                        Labels: map[string]string{
1✔
820
                                common.CDILabelKey:       common.CDILabelValue,
1✔
821
                                common.CDIComponentLabel: "",
1✔
822
                        },
1✔
823
                },
1✔
824
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
825
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
826
                                PersistentVolumeClaimName: &pvc.Name,
1✔
827
                        },
1✔
828
                        VolumeSnapshotClassName: &className,
1✔
829
                },
1✔
830
        }
1✔
831
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
832
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
1✔
833

1✔
834
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
835
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
836
                if !k8serrors.IsNotFound(err) {
1✔
837
                        return err
×
838
                }
×
839
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
840
                if pvc.Spec.VolumeMode != nil {
2✔
841
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
842
                }
1✔
843
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
844
                        return err
×
845
                }
×
846
        } else {
1✔
847
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
848
                        // Clean up DV/PVC as they are not needed anymore
1✔
849
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
850
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
851
                                return err
×
852
                        }
×
853
                }
854
        }
855

856
        return nil
1✔
857
}
858

859
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
860
        dataImportCron.Status.SourceFormat = &format
1✔
861

1✔
862
        switch format {
1✔
863
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
864
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
865
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
866
                if snapshot == nil {
2✔
867
                        // Snapshot create/update will trigger reconcile
1✔
868
                        return nil
1✔
869
                }
1✔
870
                if cc.IsSnapshotReady(snapshot) {
2✔
871
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
872
                } else {
2✔
873
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
874
                }
1✔
875
        default:
×
876
                return fmt.Errorf("unknown source format for snapshot")
×
877
        }
878

879
        return nil
1✔
880
}
881

882
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
883
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
884
        if desiredStorageClass == nil {
2✔
885
                return format, nil
1✔
886
        }
1✔
887

888
        storageProfile := &cdiv1.StorageProfile{}
1✔
889
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
890
                return format, err
×
891
        }
×
892
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
893
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
894
        }
1✔
895

896
        return format, nil
1✔
897
}
898

899
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
900
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
901
                return nil
×
902
        }
×
903
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
904
        if err != nil {
1✔
905
                return err
×
906
        }
×
907

908
        maxImports := defaultImportsToKeepPerCron
1✔
909

1✔
910
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
911
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
912
        }
1✔
913

914
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
915
                return err
×
916
        }
×
917
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
918
                return err
×
919
        }
×
920

921
        return nil
1✔
922
}
923

924
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
925
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
926

1✔
927
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
928
                return err
×
929
        }
×
930
        if len(pvcList.Items) > maxImports {
2✔
931
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
932
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
933
                })
1✔
934
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
935
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
936
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
937
                                return err
×
938
                        }
×
939
                }
940
        }
941

942
        dvList := &cdiv1.DataVolumeList{}
1✔
943
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
944
                return err
×
945
        }
×
946

947
        if len(dvList.Items) > maxImports {
2✔
948
                for _, dv := range dvList.Items {
2✔
949
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
950
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
951
                                return err
×
952
                        }
×
953

954
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
955
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
956
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
957
                                        return err
×
958
                                }
×
959
                        }
960
                }
961
        }
962

963
        return nil
1✔
964
}
965

966
// deleteDvPvc deletes DV or PVC if DV was GCed
967
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
968
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
969
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
970
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
971
                return err
1✔
972
        }
1✔
973
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
974
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
975
                return err
×
976
        }
×
977
        return nil
1✔
978
}
979

980
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
981
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
982

1✔
983
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
984
                if meta.IsNoMatchError(err) {
×
985
                        return nil
×
986
                }
×
987
                return err
×
988
        }
989
        if len(snapList.Items) > maxImports {
1✔
990
                sort.Slice(snapList.Items, func(i, j int) bool {
×
991
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
992
                })
×
993
                for _, snap := range snapList.Items[maxImports:] {
×
994
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
995
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
996
                                return err
×
997
                        }
×
998
                }
999
        }
1000

1001
        return nil
1✔
1002
}
1003

1004
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
1005
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
1006
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
1007

1✔
1008
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
1009
                return err
×
1010
        }
×
1011
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
1012
        if err != nil {
1✔
1013
                return err
×
1014
        }
×
1015
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
1016
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
1017
                return err
×
1018
        }
×
1019
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
1020
                return err
×
1021
        }
×
1022
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
1023
                return err
×
1024
        }
×
1025
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
1026
                return err
×
1027
        }
×
1028
        return nil
1✔
1029
}
1030

1031
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
1032
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
1033
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
1034
        if err != nil {
1✔
1035
                return err
×
1036
        }
×
1037
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
1038
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
1039
                return err
×
1040
        }
×
1041
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
1042
                return err
×
1043
        }
×
1044

1045
        return nil
1✔
1046
}
1047

1048
// NewDataImportCronController creates a new instance of the DataImportCron controller
1049
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
1050
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
1051
                Scheme: mgr.GetScheme(),
×
1052
                Mapper: mgr.GetRESTMapper(),
×
1053
        })
×
1054
        if err != nil {
×
1055
                return nil, err
×
1056
        }
×
1057
        reconciler := &DataImportCronReconciler{
×
1058
                client:          mgr.GetClient(),
×
1059
                uncachedClient:  uncachedClient,
×
1060
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1061
                scheme:          mgr.GetScheme(),
×
1062
                log:             log.WithName(dataImportControllerName),
×
1063
                image:           importerImage,
×
1064
                pullPolicy:      pullPolicy,
×
1065
                cdiNamespace:    util.GetNamespace(),
×
1066
                installerLabels: installerLabels,
×
1067
        }
×
1068
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1069
                MaxConcurrentReconciles: 3,
×
1070
                Reconciler:              reconciler,
×
1071
        })
×
1072
        if err != nil {
×
1073
                return nil, err
×
1074
        }
×
1075
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
1076
                return nil, err
×
1077
        }
×
1078
        log.Info("Initialized DataImportCron controller")
×
1079
        return dataImportCronController, nil
×
1080
}
1081

1082
func getCronName(obj client.Object) string {
×
1083
        return obj.GetLabels()[common.DataImportCronLabel]
×
1084
}
×
1085

1086
func getCronNs(obj client.Object) string {
×
1087
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
1088
}
×
1089

1090
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
1091
        if cronName := getCronName(obj); cronName != "" {
×
1092
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
1093
        }
×
1094
        return nil
×
1095
}
1096

1097
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1098
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1099
                return err
×
1100
        }
×
1101

1102
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1103
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1104
                // Otherwise we risk losing the storage profile event
×
1105
                var crons cdiv1.DataImportCronList
×
1106
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
1107
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
1108
                        return nil
×
1109
                }
×
1110
                // Storage profiles are 1:1 to storage classes
1111
                scName := obj.GetName()
×
1112
                var reqs []reconcile.Request
×
1113
                for _, cron := range crons.Items {
×
1114
                        dataVolume := cron.Spec.Template
×
1115
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1116
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1117
                        if err != nil || templateSc == nil {
×
1118
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1119
                                return reqs
×
1120
                        }
×
1121
                        if templateSc.Name == scName {
×
1122
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1123
                        }
×
1124
                }
1125
                return reqs
×
1126
        }
1127

1128
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1129
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1130
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1131
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1132
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1133
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1134
                },
1135
        )); err != nil {
×
1136
                return err
×
1137
        }
×
1138

1139
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1140
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1141
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1142
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1143
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1144
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1145
                },
1146
        )); err != nil {
×
1147
                return err
×
1148
        }
×
1149

1150
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1151
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1152
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1153
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1154
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1155
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1156
                },
1157
        )); err != nil {
×
1158
                return err
×
1159
        }
×
1160

1161
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1162
                return err
×
1163
        }
×
1164

1165
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1166
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1167
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
1168
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1169
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1170
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1171
                                return e.ObjectOld.Status.DataImportCronSourceFormat != e.ObjectNew.Status.DataImportCronSourceFormat
×
1172
                        },
×
1173
                },
1174
        )); err != nil {
×
1175
                return err
×
1176
        }
×
1177

1178
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1179
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1180
        }
×
1181

1182
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1183
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1184
                predicate.TypedFuncs[*batchv1.CronJob]{
×
1185
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1186
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1187
                        },
×
1188
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
1189
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1190
                },
1191
        )); err != nil {
×
1192
                return err
×
1193
        }
×
1194

1195
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1196
                if meta.IsNoMatchError(err) {
×
1197
                        // Back out if there's no point to attempt watch
×
1198
                        return nil
×
1199
                }
×
1200
                if !cc.IsErrCacheNotStarted(err) {
×
1201
                        return err
×
1202
                }
×
1203
        }
1204
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1205
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
1206
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1207
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1208
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1209
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1210
                },
1211
        )); err != nil {
×
1212
                return err
×
1213
        }
×
1214

1215
        return nil
×
1216
}
1217

1218
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1219
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1220
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1221
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
1222
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1223
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1224
                                log.Info("Update", "sc", obj.GetName(),
×
1225
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1226
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1227
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
×
1228
                                if err != nil {
×
1229
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
1230
                                }
×
1231
                                return reqs
×
1232
                        },
1233
                ),
1234
                predicate.TypedFuncs[*storagev1.StorageClass]{
1235
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1236
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1237
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1238
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1239
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1240
                        },
×
1241
                },
1242
        )); err != nil {
×
1243
                return err
×
1244
        }
×
1245

1246
        return nil
×
1247
}
1248

1249
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1250
        dicList := &cdiv1.DataImportCronList{}
×
1251
        if err := c.List(ctx, dicList); err != nil {
×
1252
                return nil, err
×
1253
        }
×
1254
        reqs := []reconcile.Request{}
×
1255
        for _, dic := range dicList.Items {
×
1256
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
1257
                        continue
×
1258
                }
1259

1260
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1261
        }
1262

1263
        return reqs, nil
×
1264
}
1265

1266
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
1✔
1267
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1268
                return false, nil
1✔
1269
        }
1✔
1270

1271
        sc := pvc.Spec.StorageClassName
1✔
1272
        if sc == nil || *sc == desiredStorageClass {
2✔
1273
                return false, nil
1✔
1274
        }
1✔
1275

1276
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1✔
1277
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
1278
                return false, err
×
1279
        }
×
1280

1281
        return true, nil
1✔
1282
}
1283

1284
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1285
        cronJob := &batchv1.CronJob{}
1✔
1286
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1287
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1288
                return false, cc.IgnoreNotFound(err)
1✔
1289
        }
1✔
1290

1291
        cronJobCopy := cronJob.DeepCopy()
1✔
1292
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1293
                return false, err
×
1294
        }
×
1295

1296
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1297
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1298
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
1299
                        return false, cc.IgnoreNotFound(err)
×
1300
                }
×
1301
        }
1302
        return true, nil
1✔
1303
}
1304

1305
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1306
        cronJob := &batchv1.CronJob{
1✔
1307
                ObjectMeta: metav1.ObjectMeta{
1✔
1308
                        Name:      GetCronJobName(cron),
1✔
1309
                        Namespace: r.cdiNamespace,
1✔
1310
                },
1✔
1311
        }
1✔
1312
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
1313
                return nil, err
×
1314
        }
×
1315
        return cronJob, nil
1✔
1316
}
1317

1318
// InitPollerPodSpec inits poller PodSpec
1319
func InitPollerPodSpec(c client.Client, cron *cdiv1.DataImportCron, podSpec *corev1.PodSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1320
        regSource, err := getCronRegistrySource(cron)
1✔
1321
        if err != nil {
1✔
1322
                return err
×
1323
        }
×
1324
        if regSource.URL == nil {
1✔
1325
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1326
        }
×
1327
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1328
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
1329
                return err
×
1330
        }
×
1331
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1332
        if err != nil {
1✔
1333
                return err
×
1334
        }
×
1335
        container := corev1.Container{
1✔
1336
                Name:  "cdi-source-update-poller",
1✔
1337
                Image: image,
1✔
1338
                Command: []string{
1✔
1339
                        "/usr/bin/cdi-source-update-poller",
1✔
1340
                        "-ns", cron.Namespace,
1✔
1341
                        "-cron", cron.Name,
1✔
1342
                        "-url", *regSource.URL,
1✔
1343
                },
1✔
1344
                ImagePullPolicy:          pullPolicy,
1✔
1345
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1346
                TerminationMessagePolicy: corev1.TerminationMessageReadFile,
1✔
1347
        }
1✔
1348

1✔
1349
        var volumes []corev1.Volume
1✔
1350
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1351
        if hasCertConfigMap {
1✔
1352
                vm := corev1.VolumeMount{
×
1353
                        Name:      CertVolName,
×
1354
                        MountPath: common.ImporterCertDir,
×
1355
                }
×
1356
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1357
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
1358
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
1359
        }
×
1360

1361
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1362
                vm := corev1.VolumeMount{
1✔
1363
                        Name:      ProxyCertVolName,
1✔
1364
                        MountPath: common.ImporterProxyCertDir,
1✔
1365
                }
1✔
1366
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1367
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1368
        }
1✔
1369

1370
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
1371
                container.Env = append(container.Env,
×
1372
                        corev1.EnvVar{
×
1373
                                Name: common.ImporterAccessKeyID,
×
1374
                                ValueFrom: &corev1.EnvVarSource{
×
1375
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1376
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1377
                                                        Name: *regSource.SecretRef,
×
1378
                                                },
×
1379
                                                Key: common.KeyAccess,
×
1380
                                        },
×
1381
                                },
×
1382
                        },
×
1383
                        corev1.EnvVar{
×
1384
                                Name: common.ImporterSecretKey,
×
1385
                                ValueFrom: &corev1.EnvVarSource{
×
1386
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1387
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1388
                                                        Name: *regSource.SecretRef,
×
1389
                                                },
×
1390
                                                Key: common.KeySecret,
×
1391
                                        },
×
1392
                                },
×
1393
                        },
×
1394
                )
×
1395
        }
×
1396

1397
        addEnvVar := func(varName, value string) {
2✔
1398
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1399
        }
1✔
1400

1401
        if insecureTLS {
1✔
1402
                addEnvVar(common.InsecureTLSVar, "true")
×
1403
        }
×
1404

1405
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1406
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1407
                        addEnvVar(varName, value)
1✔
1408
                }
1✔
1409
        }
1410

1411
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1412
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1413
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1414

1✔
1415
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1416
        if err != nil {
1✔
1417
                return err
×
1418
        }
×
1419
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1420
        if err != nil {
1✔
1421
                return err
×
1422
        }
×
1423

1424
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1425
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1426
        podSpec.Containers = []corev1.Container{container}
1✔
1427
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1428
        podSpec.Volumes = volumes
1✔
1429
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1430
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1431
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1432
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1433

1✔
1434
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1435

1✔
1436
        return nil
1✔
1437
}
1438

1439
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1440
        cronJobSpec := &cronJob.Spec
1✔
1441
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1442
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1443
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1444
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1445

1✔
1446
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1447
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1448
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1449

1✔
1450
        podSpec := &jobSpec.Template.Spec
1✔
1451
        if err := InitPollerPodSpec(r.client, cron, podSpec, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1452
                return err
×
1453
        }
×
1454
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1455
                return err
×
1456
        }
×
1457
        return nil
1✔
1458
}
1459

1460
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1461
        job := &batchv1.Job{
1✔
1462
                ObjectMeta: metav1.ObjectMeta{
1✔
1463
                        Name:      GetInitialJobName(cron),
1✔
1464
                        Namespace: cronJob.Namespace,
1✔
1465
                },
1✔
1466
                Spec: cronJob.Spec.JobTemplate.Spec,
1✔
1467
        }
1✔
1468
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1469
                return nil, err
×
1470
        }
×
1471
        return job, nil
1✔
1472
}
1473

1474
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1475
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1476
                return err
×
1477
        }
×
1478
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1479
        labels := obj.GetLabels()
1✔
1480
        labels[common.DataImportCronNsLabel] = cron.Namespace
1✔
1481
        labels[common.DataImportCronLabel] = cron.Name
1✔
1482
        obj.SetLabels(labels)
1✔
1483
        return nil
1✔
1484
}
1485

1486
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string) *cdiv1.DataVolume {
1✔
1487
        dv := cron.Spec.Template.DeepCopy()
1✔
1488
        if isCronRegistrySource(cron) {
2✔
1489
                var digestedURL string
1✔
1490
                if isURLSource(cron) {
2✔
1491
                        digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1492
                } else if isImageStreamSource(cron) {
3✔
1493
                        // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1494
                        digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1495
                        dv.Spec.Source.Registry.ImageStream = nil
1✔
1496
                }
1✔
1497
                dv.Spec.Source.Registry.URL = &digestedURL
1✔
1498
        }
1499
        dv.Name = dataVolumeName
1✔
1500
        dv.Namespace = cron.Namespace
1✔
1501
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1502
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1503
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1504
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1505

1✔
1506
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
1507
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1508
        }
1✔
1509

1510
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1511

1✔
1512
        return dv
1✔
1513
}
1514

1515
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1516
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1517
        labels := obj.GetLabels()
1✔
1518
        labels[common.DataImportCronLabel] = cron.Name
1✔
1519
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
2✔
1520
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1521
        }
1✔
1522
        obj.SetLabels(labels)
1✔
1523
}
1524

1525
func untagDigestedDockerURL(dockerURL string) string {
1✔
1526
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1527
                url := u.Host + u.Path
1✔
1528
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1529
                // Check for tag
1✔
1530
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1531
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
2✔
1532
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1533
                        }
1✔
1534
                }
1535
        }
1536
        return dockerURL
1✔
1537
}
1538

1539
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1540
        if val := cron.Labels[ann]; val != "" {
2✔
1541
                cc.AddLabel(dv, ann, val)
1✔
1542
        }
1✔
1543
}
1544

1545
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1546
        if val := cron.Annotations[ann]; val != "" {
1✔
1547
                cc.AddAnnotation(dv, ann, val)
×
1548
        }
×
1549
}
1550

1551
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1✔
1552
        dataSource := &cdiv1.DataSource{
1✔
1553
                ObjectMeta: metav1.ObjectMeta{
1✔
1554
                        Name:      cron.Spec.ManagedDataSource,
1✔
1555
                        Namespace: cron.Namespace,
1✔
1556
                },
1✔
1557
        }
1✔
1558
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1559
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1560
        return dataSource
1✔
1561
}
1✔
1562

1563
// Create DataVolume name based on the DataSource name + prefix of the digest
1564
func createDvName(prefix, digest string) (string, error) {
1✔
1565
        validDigestPrefixes := []string{digestSha256Prefix, digestUIDPrefix}
1✔
1566
        digestPrefix := ""
1✔
1567
        for _, p := range validDigestPrefixes {
2✔
1568
                if strings.HasPrefix(digest, p) {
2✔
1569
                        digestPrefix = p
1✔
1570
                        break
1✔
1571
                }
1572
        }
1573
        if digestPrefix == "" {
2✔
1574
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1575
        }
1✔
1576
        fromIdx := len(digestPrefix)
1✔
1577
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1578
        if len(digest) < toIdx {
2✔
1579
                return "", errors.Errorf("Digest is too short")
1✔
1580
        }
1✔
1581
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1582
}
1583

1584
// GetCronJobName get CronJob name based on cron name and UID
1585
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1586
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1587
}
1✔
1588

1589
// GetInitialJobName get initial job name based on cron name and UID
1590
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1591
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1592
}
1✔
1593

1594
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1595
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1596
}
1✔
1597

1598
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1599
        dv := &cron.Spec.Template
1✔
1600

1✔
1601
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1602
                return explicitVolumeMode, nil
×
1603
        }
×
1604

1605
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1606
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1607
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1608
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1609
                        AccessModes:      accessModes,
1✔
1610
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1611
                        Resources: corev1.VolumeResourceRequirements{
1✔
1612
                                Requests: corev1.ResourceList{
1✔
1613
                                        // Doesn't matter
1✔
1614
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1615
                                },
1✔
1616
                        },
1✔
1617
                },
1✔
1618
        }
1✔
1619
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1620
                return nil, err
×
1621
        }
×
1622

1623
        return inferredPvc.Spec.VolumeMode, nil
1✔
1624
}
1625

1626
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1627
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1628
        if dv.Spec.PVC != nil {
1✔
1629
                return dv.Spec.PVC.VolumeMode
×
1630
        }
×
1631

1632
        if dv.Spec.Storage != nil {
2✔
1633
                return dv.Spec.Storage.VolumeMode
1✔
1634
        }
1✔
1635

1636
        return nil
×
1637
}
1638

1639
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1640
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1641
        if dv.Spec.PVC != nil {
1✔
1642
                return dv.Spec.PVC.AccessModes
×
1643
        }
×
1644

1645
        if dv.Spec.Storage != nil {
2✔
1646
                return dv.Spec.Storage.AccessModes
1✔
1647
        }
1✔
1648

1649
        return nil
×
1650
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc