• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5790

22 Jan 2026 01:41PM UTC coverage: 49.487% (+0.03%) from 49.454%
#5790

Pull #3991

travis-ci

noamasu
Add provisioner-aware DataImportCron configuration via StorageProfile annotations

Some storage provisioners have specific requirements when creating boot source snapshots. For example, GKE Persistent Disk needs a VolumeSnapshotClass with snapshot-type: images parameter, and snapshots must be taken from an RWO PVC.
This change makes CDI automatically detect these provisioner-specific needs and configure DataImportCron accordingly.
The StorageProfile controller now reconciles two new annotations based on the underlying provisioner: cdi.kubevirt.io/useReadWriteOnceForDataImportCron signals that RWO should be used for access mode, and cdi.kubevirt.io/snapshotClassForDataImportCron specifies which VolumeSnapshotClass to use for boot source snapshots.
The DataImportCron controller watches these annotations and ensures the correct VolumeSnapshotClass and access mode are used.

Signed-off-by: Noam Assouline <nassouli@redhat.com>
Pull Request #3991: Add provisioner-aware VolumeSnapshotClass selection and RWO access mode for DataImportCron

80 of 141 new or added lines in 3 files covered. (56.74%)

10 existing lines in 2 files now uncovered.

14709 of 29723 relevant lines covered (49.49%)

0.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.86
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        secv1 "github.com/openshift/api/security/v1"
33
        "github.com/pkg/errors"
34
        cronexpr "github.com/robfig/cron/v3"
35

36
        authorizationv1 "k8s.io/api/authorization/v1"
37
        batchv1 "k8s.io/api/batch/v1"
38
        corev1 "k8s.io/api/core/v1"
39
        storagev1 "k8s.io/api/storage/v1"
40
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
41
        "k8s.io/apimachinery/pkg/api/meta"
42
        "k8s.io/apimachinery/pkg/api/resource"
43
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
44
        "k8s.io/apimachinery/pkg/labels"
45
        "k8s.io/apimachinery/pkg/runtime"
46
        "k8s.io/apimachinery/pkg/types"
47
        "k8s.io/client-go/tools/record"
48
        openapicommon "k8s.io/kube-openapi/pkg/common"
49
        "k8s.io/utils/ptr"
50

51
        "sigs.k8s.io/controller-runtime/pkg/client"
52
        "sigs.k8s.io/controller-runtime/pkg/controller"
53
        "sigs.k8s.io/controller-runtime/pkg/event"
54
        "sigs.k8s.io/controller-runtime/pkg/handler"
55
        "sigs.k8s.io/controller-runtime/pkg/manager"
56
        "sigs.k8s.io/controller-runtime/pkg/predicate"
57
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
58
        "sigs.k8s.io/controller-runtime/pkg/source"
59

60
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
61
        "kubevirt.io/containerized-data-importer/pkg/common"
62
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
63
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
64
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
65
        "kubevirt.io/containerized-data-importer/pkg/operator"
66
        "kubevirt.io/containerized-data-importer/pkg/util"
67
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
68
)
69

70
const (
71
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
72
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
73
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
74
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
75
)
76

77
// DataImportCronReconciler members
78
type DataImportCronReconciler struct {
79
        client          client.Client
80
        uncachedClient  client.Client
81
        recorder        record.EventRecorder
82
        scheme          *runtime.Scheme
83
        log             logr.Logger
84
        image           string
85
        pullPolicy      string
86
        cdiNamespace    string
87
        installerLabels map[string]string
88
}
89

90
const (
91
        // AnnSourceDesiredDigest is the digest of the pending updated image
92
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
93
        // AnnImageStreamDockerRef is the ImageStream Docker reference
94
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
95
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
96
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
97
        // AnnLastCronTime is the cron last execution time stamp
98
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
99
        // AnnLastUseTime is the PVC last use time stamp
100
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
101
        // AnnStorageClass is the cron DV's storage class
102
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
103

104
        dataImportControllerName    = "dataimportcron-controller"
105
        digestSha256Prefix          = "sha256:"
106
        digestUIDPrefix             = "uid:"
107
        digestDvNameSuffixLength    = 12
108
        cronJobUIDSuffixLength      = 8
109
        defaultImportsToKeepPerCron = 3
110
)
111

112
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
113

114
// Reconcile loop for DataImportCronReconciler
115
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
116
        dataImportCron := &cdiv1.DataImportCron{}
1✔
117
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
118
                return reconcile.Result{}, err
1✔
119
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
120
                err := r.cleanup(ctx, req.NamespacedName)
1✔
121
                return reconcile.Result{}, err
1✔
122
        }
1✔
123
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
124
        if !shouldReconcile || err != nil {
1✔
125
                return reconcile.Result{}, err
×
126
        }
×
127

128
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
129
                return reconcile.Result{}, err
×
130
        }
×
131

132
        return r.update(ctx, dataImportCron)
1✔
133
}
134

135
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
136
        dataSource := &cdiv1.DataSource{}
1✔
137
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
138
                if k8serrors.IsNotFound(err) {
2✔
139
                        return true, nil
1✔
140
                }
1✔
141
                return false, err
×
142
        }
143
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
144
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
145
                return true, nil
1✔
146
        }
1✔
147
        otherCron := &cdiv1.DataImportCron{}
1✔
148
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
149
                if k8serrors.IsNotFound(err) {
2✔
150
                        return true, nil
1✔
151
                }
1✔
152
                return false, err
×
153
        }
154
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
155
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
156
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
157
                r.log.V(3).Info(msg)
1✔
158
                return false, nil
1✔
159
        }
1✔
160
        return true, nil
×
161
}
162

163
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
164
        if dataImportCron.Spec.Schedule == "" {
2✔
165
                return nil
1✔
166
        }
1✔
167
        if isControllerPolledSource(dataImportCron) {
2✔
168
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
169
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
170
                }
1✔
171
                return nil
1✔
172
        }
173
        if !isURLSource(dataImportCron) {
1✔
174
                return nil
×
175
        }
×
176
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
177
        if exists || err != nil {
2✔
178
                return err
1✔
179
        }
1✔
180
        cronJob, err := r.newCronJob(dataImportCron)
1✔
181
        if err != nil {
1✔
182
                return err
×
183
        }
×
184
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
185
                return err
×
186
        }
×
187
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
188
        if err != nil {
1✔
189
                return err
×
190
        }
×
191
        if err := r.client.Create(ctx, job); err != nil {
1✔
192
                return err
×
193
        }
×
194
        return nil
1✔
195
}
196

197
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
198
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
199
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
200
        }
×
201
        imageStream := &imagev1.ImageStream{}
1✔
202
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
203
        if err != nil {
2✔
204
                return nil, "", err
1✔
205
        }
1✔
206
        imageStreamNamespacedName := types.NamespacedName{
1✔
207
                Namespace: imageStreamNamespace,
1✔
208
                Name:      name,
1✔
209
        }
1✔
210
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
211
                return nil, "", err
1✔
212
        }
1✔
213
        return imageStream, tag, nil
1✔
214
}
215

216
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
217
        if imageStream == nil {
1✔
218
                return "", "", errors.Errorf("No ImageStream")
×
219
        }
×
220
        tags := imageStream.Status.Tags
1✔
221
        if len(tags) == 0 {
1✔
222
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
223
        }
×
224

225
        tagIdx := 0
1✔
226
        if len(imageStreamTag) > 0 {
2✔
227
                tagIdx = -1
1✔
228
                for i, tag := range tags {
2✔
229
                        if tag.Tag == imageStreamTag {
2✔
230
                                tagIdx = i
1✔
231
                                break
1✔
232
                        }
233
                }
234
        }
235
        if tagIdx == -1 {
2✔
236
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
237
        }
1✔
238

239
        if len(tags[tagIdx].Items) == 0 {
2✔
240
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
241
        }
1✔
242
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
243
}
244

245
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
246
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
247
                return imageStreamName, "", nil
1✔
248
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
249
                return subs[0], subs[1], nil
1✔
250
        }
1✔
251
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
252
}
253

254
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
255
        nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]
1✔
256
        if nextTimeStr == "" {
1✔
257
                return r.setNextCronTime(dataImportCron)
×
258
        }
×
259
        nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
260
        if err != nil {
1✔
261
                return reconcile.Result{}, err
×
262
        }
×
263
        if nextTime.After(time.Now()) {
2✔
264
                return r.setNextCronTime(dataImportCron)
1✔
265
        }
1✔
266
        switch {
1✔
267
        case isImageStreamSource(dataImportCron):
1✔
268
                if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
269
                        return reconcile.Result{}, err
1✔
270
                }
1✔
271
        case isPvcSource(dataImportCron):
1✔
272
                if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
2✔
273
                        return reconcile.Result{}, err
1✔
274
                }
1✔
275
        case isNodePull(dataImportCron):
1✔
276
                if done, err := r.updateContainerImageDesiredDigest(ctx, dataImportCron); !done {
2✔
277
                        return reconcile.Result{RequeueAfter: 3 * time.Second}, err
1✔
278
                } else if err != nil {
2✔
279
                        return reconcile.Result{}, err
×
280
                }
×
281
        }
282

283
        return r.setNextCronTime(dataImportCron)
1✔
284
}
285

286
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
287
        now := time.Now()
1✔
288
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
289
        if err != nil {
1✔
290
                return reconcile.Result{}, err
×
291
        }
×
292
        nextTime := expr.Next(now)
1✔
293
        requeueAfter := nextTime.Sub(now)
1✔
294
        res := reconcile.Result{RequeueAfter: requeueAfter}
1✔
295
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
296
        return res, err
1✔
297
}
298

299
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
300
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
301
        return err == nil && regSource.ImageStream != nil
1✔
302
}
1✔
303

304
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
305
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
306
        return err == nil && regSource.URL != nil
1✔
307
}
1✔
308

309
func isNodePull(cron *cdiv1.DataImportCron) bool {
1✔
310
        regSource, err := getCronRegistrySource(cron)
1✔
311
        return err == nil && regSource != nil && regSource.PullMethod != nil &&
1✔
312
                *regSource.PullMethod == cdiv1.RegistryPullNode
1✔
313
}
1✔
314

315
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
316
        if !isCronRegistrySource(cron) {
2✔
317
                return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
1✔
318
        }
1✔
319
        return cron.Spec.Template.Spec.Source.Registry, nil
1✔
320
}
321

322
func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
1✔
323
        source := cron.Spec.Template.Spec.Source
1✔
324
        return source != nil && source.Registry != nil
1✔
325
}
1✔
326

327
func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
1✔
328
        if !isPvcSource(cron) {
1✔
329
                return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
×
330
        }
×
331
        return cron.Spec.Template.Spec.Source.PVC, nil
1✔
332
}
333

334
func isPvcSource(cron *cdiv1.DataImportCron) bool {
1✔
335
        source := cron.Spec.Template.Spec.Source
1✔
336
        return source != nil && source.PVC != nil
1✔
337
}
1✔
338

339
func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
1✔
340
        return isImageStreamSource(cron) || isPvcSource(cron) || isNodePull(cron)
1✔
341
}
1✔
342

343
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
344
        res := reconcile.Result{}
1✔
345

1✔
346
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
347
        if err != nil {
1✔
348
                return res, err
×
349
        }
×
350

351
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
352
        imports := dataImportCron.Status.CurrentImports
1✔
353
        importSucceeded := false
1✔
354

1✔
355
        dataVolume := dataImportCron.Spec.Template
1✔
356
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
357
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
358
        if err != nil {
1✔
359
                return res, err
×
360
        }
×
361
        if desiredStorageClass != nil {
2✔
362
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
363
                        return res, err
1✔
364
                }
1✔
365
                currentSc, hasCurrent := dataImportCron.Annotations[AnnStorageClass]
1✔
366
                desiredSc := desiredStorageClass.Name
1✔
367
                if hasCurrent && currentSc != desiredSc {
2✔
368
                        r.log.Info("Storage class changed, delete most recent source on the old sc as it's no longer the desired", "currentSc", currentSc, "desiredSc", desiredSc)
1✔
369
                        if err := r.handleStorageClassChange(ctx, dataImportCron, desiredSc); err != nil {
1✔
370
                                return res, err
×
371
                        }
×
372
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
373
                }
374
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
375
        }
376
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
377
        if err != nil {
1✔
378
                return res, err
×
379
        }
×
380
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
381
        if err != nil {
1✔
382
                return res, err
×
383
        }
×
384

385
        handlePopulatedPvc := func() error {
2✔
386
                if pvc != nil {
2✔
387
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
388
                                return err
×
389
                        }
×
390
                }
391
                importSucceeded = true
1✔
392
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
393
                        return err
×
394
                }
×
395

396
                return nil
1✔
397
        }
398

399
        switch {
1✔
400
        case dv != nil:
1✔
401
                switch dv.Status.Phase {
1✔
402
                case cdiv1.Succeeded:
1✔
403
                        if err := handlePopulatedPvc(); err != nil {
1✔
404
                                return res, err
×
405
                        }
×
406
                case cdiv1.ImportScheduled:
1✔
407
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
408
                case cdiv1.ImportInProgress:
1✔
409
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
410
                default:
1✔
411
                        dvPhase := string(dv.Status.Phase)
1✔
412
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
413
                }
414
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
415
                if err := handlePopulatedPvc(); err != nil {
1✔
416
                        return res, err
×
417
                }
×
418
        case snapshot != nil:
1✔
419
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
420
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
421
                                return res, err
×
422
                        }
×
423
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
424
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
425
                }
426
                // Check if snapshot class changed
427
                if desiredStorageClass != nil {
2✔
428
                        changed, err := r.handleSnapshotClassChange(ctx, snapshot, desiredStorageClass.Name)
1✔
429
                        if err != nil {
1✔
NEW
430
                                return res, err
×
NEW
431
                        }
×
432
                        if changed {
1✔
NEW
433
                                return reconcile.Result{RequeueAfter: time.Second}, nil
×
NEW
434
                        }
×
435
                }
436
                // Below k8s 1.29 there's no way to know the source volume mode
437
                // Let's at least expose this info on our own snapshots
438
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
439
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
440
                        if err != nil {
1✔
441
                                return res, err
×
442
                        }
×
443
                        if volMode != nil {
2✔
444
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
445
                        }
1✔
446
                }
447
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
2✔
448
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
449
                if err != nil {
2✔
450
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
451
                                return res, err
1✔
452
                        }
453
                } else {
454
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
455
                }
2✔
456
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
457
                        return res, err
×
458
                }
×
459
                importSucceeded = true
1✔
460
        default:
1✔
461
                if len(imports) > 0 {
1✔
462
                        imports = imports[1:]
1✔
463
                        dataImportCron.Status.CurrentImports = imports
×
464
                }
×
465
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
466
        }
1✔
467

2✔
468
        if importSucceeded {
1✔
469
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
470
                        return res, err
1✔
471
                }
1✔
472
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
473
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
474
                        return res, err
2✔
475
                }
1✔
476
        }
×
477

×
478
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
479
                return res, err
1✔
480
        }
×
481

×
482
        // Skip if schedule is disabled
483
        if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
484
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
485
                pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
×
486
                if err != nil {
×
487
                        return pollRes, err
488
                }
489
                res = pollRes
2✔
490
        }
1✔
491

1✔
492
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
2✔
493
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
494
        if digestUpdated {
1✔
495
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
496
                if dv != nil {
497
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
498
                                return res, err
1✔
499
                        }
1✔
500
                }
2✔
501
                if importSucceeded || len(imports) == 0 {
1✔
502
                        if err := r.createImportDataVolume(ctx, dataImportCron, desiredStorageClass); err != nil {
1✔
503
                                return res, err
×
504
                        }
×
505
                }
×
506
        } else if importSucceeded {
507
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
2✔
508
                        return res, err
2✔
509
                }
1✔
510
        } else if len(imports) > 0 {
1✔
511
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
512
        } else {
2✔
513
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
514
        }
×
515

×
516
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
2✔
517
                return res, err
1✔
518
        }
2✔
519

1✔
520
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
1✔
521
                if err := r.client.Update(ctx, dataImportCron); err != nil {
522
                        return res, err
1✔
523
                }
×
524
        }
×
525
        return res, nil
526
}
2✔
527

1✔
528
// Returns the current import DV if exists, and the last imported PVC
×
529
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
×
530
        imports := cron.Status.CurrentImports
531
        if len(imports) == 0 {
1✔
532
                return nil, nil, nil
533
        }
534

535
        dvName := imports[0].DataVolumeName
1✔
536
        dv := &cdiv1.DataVolume{}
1✔
537
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
538
                if !k8serrors.IsNotFound(err) {
1✔
539
                        return nil, nil, err
1✔
540
                }
541
                dv = nil
1✔
542
        }
1✔
543

2✔
544
        pvc := &corev1.PersistentVolumeClaim{}
1✔
545
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
×
546
                if !k8serrors.IsNotFound(err) {
×
547
                        return nil, nil, err
1✔
548
                }
549
                pvc = nil
550
        }
1✔
551
        return dv, pvc, nil
2✔
552
}
1✔
553

×
554
// Returns the current import DV if exists, and the last imported PVC
×
555
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
556
        imports := cron.Status.CurrentImports
557
        if len(imports) == 0 {
1✔
558
                return nil, nil
559
        }
560

561
        snapName := imports[0].DataVolumeName
1✔
562
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
563
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
564
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
565
                        return nil, err
1✔
566
                }
567
                return nil, nil
1✔
568
        }
1✔
569

2✔
570
        return snapshot, nil
1✔
571
}
×
572

×
573
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
574
        dataSourceName := dataImportCron.Spec.ManagedDataSource
575
        dataSource := &cdiv1.DataSource{}
576
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
1✔
577
                return nil, err
578
        }
579
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
580
                log := r.log.WithName("getCronManagedDataSource")
1✔
581
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
1✔
582
                return nil, ErrNotManagedByCron
2✔
583
        }
1✔
584
        return dataSource, nil
1✔
585
}
1✔
586

×
587
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
×
588
        objCopy := obj.DeepCopyObject()
×
589
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
×
590
        r.setDataImportCronResourceLabels(cron, obj)
1✔
591
        if !reflect.DeepEqual(obj, objCopy) {
592
                if err := r.client.Update(ctx, obj); err != nil {
593
                        return err
1✔
594
                }
1✔
595
        }
1✔
596
        return nil
1✔
597
}
2✔
598

1✔
599
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
600
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
601
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
602
                if cond.Status == corev1.ConditionFalse &&
1✔
603
                        (cond.Reason == common.GenericError || cond.Reason == ImagePullFailedReason) {
604
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
605
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
606
                        dv.Labels[common.DataImportCronLabel] = ""
×
607
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
608
                                return err
×
609
                        }
×
610
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
611
                                return err
×
612
                        }
×
613
                        cron.Status.CurrentImports = nil
×
614
                }
×
615
        }
×
616
        return nil
×
617
}
×
618

×
619
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
×
620
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
621
        regSource, err := getCronRegistrySource(dataImportCron)
622
        if err != nil {
×
623
                return err
624
        }
625
        if regSource.ImageStream == nil {
1✔
626
                return nil
1✔
627
        }
1✔
628
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
629
        if err != nil {
×
630
                return err
×
631
        }
1✔
632
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
×
633
        if err != nil {
×
634
                return err
1✔
635
        }
2✔
636
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
637
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
1✔
638
                log.Info("Updating DataImportCron", "digest", digest)
1✔
639
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
2✔
640
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
641
        }
1✔
642
        return nil
1✔
643
}
2✔
644

1✔
645
func (r *DataImportCronReconciler) updateContainerImageDesiredDigest(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
646
        log := r.log.WithValues("name", cron.Name).WithValues("uid", cron.UID)
1✔
647
        podName := getPollerPodName(cron)
1✔
648
        ns := cron.Namespace
1✔
649
        nn := types.NamespacedName{Name: podName, Namespace: ns}
650
        pod := &corev1.Pod{}
651

1✔
652
        if err := r.client.Get(ctx, nn, pod); err == nil {
1✔
653
                digest, err := fetchContainerImageDigest(pod)
1✔
654
                if err != nil || digest == "" {
1✔
655
                        return false, err
1✔
656
                }
1✔
657
                cc.AddAnnotation(cron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
658
                if cron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
659
                        log.Info("Updating DataImportCron", "digest", digest)
1✔
660
                        cc.AddAnnotation(cron, AnnSourceDesiredDigest, digest)
1✔
661
                }
×
662
                return true, r.client.Delete(ctx, pod)
×
663
        } else if cc.IgnoreNotFound(err) != nil {
1✔
664
                return false, err
2✔
665
        }
1✔
666

1✔
667
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(ctx, r.client)
1✔
668
        if err != nil {
1✔
669
                return false, err
1✔
670
        }
×
671
        platform := cron.Spec.Template.Spec.Source.Registry.Platform
×
672
        if platform != nil && platform.Architecture != "" {
673
                if workloadNodePlacement.NodeSelector == nil {
1✔
674
                        workloadNodePlacement.NodeSelector = map[string]string{}
1✔
675
                }
×
676
                workloadNodePlacement.NodeSelector[corev1.LabelArchStable] = platform.Architecture
×
677
        }
1✔
678

1✔
679
        containerImage := strings.TrimPrefix(*cron.Spec.Template.Spec.Source.Registry.URL, "docker://")
×
680

×
681
        pod = &corev1.Pod{
×
682
                ObjectMeta: metav1.ObjectMeta{
×
683
                        Name:      podName,
684
                        Namespace: ns,
685
                        OwnerReferences: []metav1.OwnerReference{
1✔
686
                                {
1✔
687
                                        APIVersion:         cron.APIVersion,
1✔
688
                                        Kind:               cron.Kind,
1✔
689
                                        Name:               cron.Name,
1✔
690
                                        UID:                cron.UID,
1✔
691
                                        BlockOwnerDeletion: ptr.To[bool](true),
1✔
692
                                        Controller:         ptr.To[bool](true),
1✔
693
                                },
1✔
694
                        },
1✔
695
                },
1✔
696
                Spec: corev1.PodSpec{
1✔
697
                        TerminationGracePeriodSeconds: ptr.To[int64](0),
1✔
698
                        RestartPolicy:                 corev1.RestartPolicyNever,
1✔
699
                        NodeSelector:                  workloadNodePlacement.NodeSelector,
1✔
700
                        Tolerations:                   workloadNodePlacement.Tolerations,
1✔
701
                        Affinity:                      workloadNodePlacement.Affinity,
1✔
702
                        Volumes: []corev1.Volume{
1✔
703
                                {
1✔
704
                                        Name: "shared-volume",
1✔
705
                                        VolumeSource: corev1.VolumeSource{
1✔
706
                                                EmptyDir: &corev1.EmptyDirVolumeSource{},
1✔
707
                                        },
1✔
708
                                },
1✔
709
                        },
1✔
710
                        InitContainers: []corev1.Container{
1✔
711
                                {
1✔
712
                                        Name:                     "init",
1✔
713
                                        Image:                    r.image,
1✔
714
                                        ImagePullPolicy:          corev1.PullPolicy(r.pullPolicy),
1✔
715
                                        Command:                  []string{"sh", "-c", "cp /usr/bin/cdi-containerimage-server /shared/server"},
1✔
716
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
717
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
718
                                },
1✔
719
                        },
1✔
720
                        Containers: []corev1.Container{
1✔
721
                                {
1✔
722
                                        Name:                     "image-container",
1✔
723
                                        Image:                    containerImage,
1✔
724
                                        ImagePullPolicy:          corev1.PullAlways,
1✔
725
                                        Command:                  []string{"/shared/server", "-h"},
1✔
726
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
727
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
728
                                },
1✔
729
                        },
1✔
730
                },
1✔
731
        }
1✔
732

1✔
733
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
734
        if pod.Spec.SecurityContext != nil {
1✔
735
                pod.Spec.SecurityContext.FSGroup = nil
1✔
736
        }
1✔
737

1✔
738
        return false, r.client.Create(ctx, pod)
1✔
739
}
1✔
740

2✔
741
func fetchContainerImageDigest(pod *corev1.Pod) (string, error) {
1✔
742
        if len(pod.Status.ContainerStatuses) == 0 {
1✔
743
                return "", nil
744
        }
1✔
745

746
        status := pod.Status.ContainerStatuses[0]
747
        if status.State.Waiting != nil {
1✔
748
                reason := status.State.Waiting.Reason
1✔
749
                switch reason {
×
750
                case "ImagePullBackOff", "ErrImagePull", "InvalidImageName":
×
751
                        return "", errors.Errorf("%s %s: %s", common.ImagePullFailureText, status.Image, reason)
752
                }
1✔
753
                return "", nil
1✔
754
        }
×
755

×
756
        if status.State.Terminated == nil {
×
757
                return "", nil
×
758
        }
759

×
760
        imageID := status.ImageID
761
        if imageID == "" {
762
                return "", errors.Errorf("Container has no imageID")
1✔
763
        }
×
764
        idx := strings.Index(imageID, digestSha256Prefix)
×
765
        if idx < 0 {
766
                return "", errors.Errorf("Container image %s ID has no digest: %s", status.Image, imageID)
1✔
767
        }
1✔
768

×
769
        return imageID[idx:], nil
×
770
}
1✔
771

1✔
772
func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
×
773
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
×
774
        pvcSource, err := getCronPvcSource(dataImportCron)
775
        if err != nil {
1✔
776
                return err
777
        }
778
        ns := pvcSource.Namespace
1✔
779
        if ns == "" {
1✔
780
                ns = dataImportCron.Namespace
1✔
781
        }
1✔
782
        pvc := &corev1.PersistentVolumeClaim{}
×
783
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
×
784
                return err
1✔
785
        }
2✔
786
        digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
1✔
787
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
788
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
1✔
789
                log.Info("Updating DataImportCron", "digest", digest)
2✔
790
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
791
        }
1✔
792
        return nil
1✔
793
}
1✔
794

2✔
795
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
796
        log := r.log.WithName("updateDataSource")
1✔
797
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
798
        if err != nil {
1✔
799
                if k8serrors.IsNotFound(err) {
800
                        dataSource = r.newDataSource(dataImportCron)
801
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
802
                                return err
1✔
803
                        }
1✔
804
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
2✔
805
                } else if errors.Is(err, ErrNotManagedByCron) {
2✔
806
                        return nil
1✔
807
                } else {
1✔
808
                        return err
×
809
                }
×
810
        }
1✔
811
        dataSourceCopy := dataSource.DeepCopy()
×
812
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
×
813

×
814
        sourcePVC := dataImportCron.Status.LastImportedPVC
×
815
        populateDataSource(format, dataSource, sourcePVC)
×
816

817
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
1✔
818
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
819
                        return err
1✔
820
                }
1✔
821
        }
1✔
822

1✔
823
        return nil
2✔
824
}
1✔
825

×
826
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
×
827
        if sourcePVC == nil {
828
                return
829
        }
1✔
830

831
        switch format {
832
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
833
                dataSource.Spec.Source = cdiv1.DataSourceSource{
2✔
834
                        PVC: sourcePVC,
1✔
835
                }
1✔
836
        case cdiv1.DataImportCronSourceFormatSnapshot:
837
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
838
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
839
                                Namespace: sourcePVC.Namespace,
1✔
840
                                Name:      sourcePVC.Name,
1✔
841
                        },
1✔
842
                }
1✔
843
        }
1✔
844
}
1✔
845

1✔
846
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
847
        if dataImportCron.Status.CurrentImports == nil {
1✔
848
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
1✔
849
        }
850
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
851
                Namespace: dataImportCron.Namespace,
852
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
853
        }
1✔
854
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
×
855
                dataImportCron.Status.LastImportedPVC = sourcePVC
×
856
                now := metav1.Now()
1✔
857
                dataImportCron.Status.LastImportTimestamp = &now
1✔
858
        }
1✔
859
        return nil
1✔
860
}
2✔
861

1✔
862
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
863
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
864
        if lastTimeStr == "" {
1✔
865
                return nil
1✔
866
        }
867
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
868
        if err != nil {
1✔
869
                return err
1✔
870
        }
2✔
871
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
1✔
872
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
873
        }
1✔
874
        return nil
1✔
875
}
×
876

×
877
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass *storagev1.StorageClass) error {
2✔
878
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
879
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
880
        if digest == "" {
1✔
881
                return nil
882
        }
883
        dvName, err := createDvName(dataSourceName, digest)
1✔
884
        if err != nil {
1✔
885
                return err
1✔
886
        }
1✔
887

×
888
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
×
889
        for _, src := range sources {
1✔
890
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
891
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
892
                                return err
1✔
893
                        }
894
                } else {
1✔
895
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
2✔
896
                                return err
2✔
897
                        }
1✔
898
                        // If source exists don't create DV
×
899
                        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
×
900
                        return nil
1✔
901
                }
1✔
902
        }
×
903

×
904
        storageProfile := &cdiv1.StorageProfile{}
905
        if desiredStorageClass != nil {
1✔
906
                if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
907
                        return err
908
                }
909
        }
910
        dv := r.newSourceDataVolume(dataImportCron, dvName, storageProfile)
1✔
911
        if allowed, err := r.authorizeCloneDataVolume(dataImportCron, dv); err != nil {
2✔
912
                return err
1✔
UNCOV
913
        } else if !allowed {
×
UNCOV
914
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse,
×
915
                        "Not authorized to create DataVolume", notAuthorized)
916
                return nil
1✔
917
        }
1✔
918
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
×
919
                return err
2✔
920
        }
1✔
921
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
922

1✔
923
        return nil
1✔
924
}
1✔
925

×
926
func (r *DataImportCronReconciler) authorizeCloneDataVolume(dataImportCron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) (bool, error) {
×
927
        if !isPvcSource(dataImportCron) {
1✔
928
                return true, nil
1✔
929
        }
1✔
930
        saName := "default"
931
        if dataImportCron.Spec.ServiceAccountName != nil {
932
                saName = *dataImportCron.Spec.ServiceAccountName
1✔
933
        }
2✔
934
        if resp, err := dv.AuthorizeSA(dv.Namespace, dv.Name, &authProxy{r.client}, dataImportCron.Namespace, saName); err != nil {
1✔
935
                return false, err
1✔
936
        } else if !resp.Allowed {
1✔
937
                r.log.Info("Not authorized to create DataVolume", "cron", dataImportCron.Name, "reason", resp.Reason)
1✔
938
                return false, nil
×
939
        }
×
940

1✔
941
        return true, nil
×
942
}
2✔
943

1✔
944
type authProxy struct {
1✔
945
        client client.Client
1✔
946
}
947

1✔
948
func (p *authProxy) CreateSar(sar *authorizationv1.SubjectAccessReview) (*authorizationv1.SubjectAccessReview, error) {
949
        if err := p.client.Create(context.TODO(), sar); err != nil {
950
                return nil, err
951
        }
952
        return sar, nil
953
}
954

1✔
955
func (p *authProxy) GetNamespace(name string) (*corev1.Namespace, error) {
1✔
956
        ns := &corev1.Namespace{}
×
957
        if err := p.client.Get(context.TODO(), types.NamespacedName{Name: name}, ns); err != nil {
×
958
                return nil, err
1✔
959
        }
960
        return ns, nil
961
}
1✔
962

1✔
963
func (p *authProxy) GetDataSource(namespace, name string) (*cdiv1.DataSource, error) {
1✔
964
        das := &cdiv1.DataSource{}
×
965
        if err := p.client.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, das); err != nil {
×
966
                return nil, err
1✔
967
        }
968
        return das, nil
969
}
×
970

×
971
func (r *DataImportCronReconciler) handleStorageClassChange(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
×
972
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
×
973
        if !ok {
×
974
                // nothing to delete
×
975
                return nil
976
        }
977
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
978
        if err != nil {
1✔
979
                return err
1✔
980
        }
×
981
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
×
982
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
×
983
        for _, src := range sources {
1✔
984
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
985
                        return err
×
986
                }
×
987
        }
1✔
988
        for _, src := range sources {
1✔
989
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
2✔
990
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
1✔
991
                }
×
992
        }
×
993
        // Only update desired storage class once garbage collection went through
994
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
2✔
995
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
996
        if err != nil {
×
997
                return err
×
998
        }
999

1000
        return nil
1✔
1001
}
1✔
1002

1✔
1003
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
×
1004
        switch format {
×
1005
        case cdiv1.DataImportCronSourceFormatPvc:
1006
                return nil
1✔
1007
        case cdiv1.DataImportCronSourceFormatSnapshot:
1008
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1009
        default:
1✔
1010
                return fmt.Errorf("unknown source format for snapshot")
1✔
1011
        }
1✔
1012
}
1✔
1013

1✔
1014
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
1015
        if pvc == nil {
×
1016
                return nil
×
1017
        }
1018
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
1019
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1020
                return nil
1✔
1021
        }
1✔
1022
        storageProfile := &cdiv1.StorageProfile{}
×
1023
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
×
1024
                return err
2✔
1025
        }
1✔
1026

1✔
1027
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
1028
                ObjectMeta: metav1.ObjectMeta{
1✔
1029
                        Name:      pvc.Name,
1✔
UNCOV
1030
                        Namespace: dataImportCron.Namespace,
×
1031
                        Labels: map[string]string{
×
1032
                                common.CDILabelKey:       common.CDILabelValue,
1033
                                common.CDIComponentLabel: "",
1✔
1034
                        },
1✔
1035
                },
1✔
1036
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
1037
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
1038
                                PersistentVolumeClaimName: &pvc.Name,
1✔
1039
                        },
1✔
1040
                },
1✔
1041
        }
1✔
1042
        // Select VolumeSnapshotClass for boot source snapshot
1✔
1043
        snapshotClassName, err := r.getSnapshotClassForDataImportCron(pvc, storageProfile)
1✔
1044
        if err != nil {
1✔
1045
                return err
1✔
1046
        }
1✔
1047
        if snapshotClassName != "" {
1✔
1048
                desiredSnapshot.Spec.VolumeSnapshotClassName = &snapshotClassName
1✔
1049
        }
1✔
1050
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
UNCOV
1051
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
×
UNCOV
1052

×
1053
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
2✔
1054
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
1✔
1055
                if !k8serrors.IsNotFound(err) {
1✔
1056
                        return err
1✔
1057
                }
1✔
1058
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1059
                if pvc.Spec.VolumeMode != nil {
1✔
1060
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
2✔
1061
                }
1✔
1062
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
×
1063
                        return err
×
1064
                }
1✔
1065
        } else {
1✔
1066
                if cc.IsSnapshotReady(currentSnapshot) {
1✔
1067
                        // Clean up DV/PVC as they are not needed anymore
2✔
1068
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
1069
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
1070
                                return err
2✔
1071
                        }
1✔
1072
                }
1✔
1073
        }
1✔
1074

×
1075
        return nil
×
1076
}
1✔
1077

2✔
1078
func (r *DataImportCronReconciler) handleSnapshotClassChange(ctx context.Context, snapshot *snapshotv1.VolumeSnapshot, storageClassName string) (bool, error) {
1✔
1079
        sp := &cdiv1.StorageProfile{}
1✔
1080
        if err := r.client.Get(ctx, types.NamespacedName{Name: storageClassName}, sp); err != nil {
1✔
NEW
1081
                return false, client.IgnoreNotFound(err)
×
NEW
1082
        }
×
1083

1084
        desiredVSC, err := r.getSnapshotClassForDataImportCron(nil, sp)
1085
        if err != nil {
1086
                return false, err
1✔
1087
        }
1088
        actualVSC := ""
1089
        if snapshot.Spec.VolumeSnapshotClassName != nil {
1✔
1090
                actualVSC = *snapshot.Spec.VolumeSnapshotClassName
1✔
1091
        }
1✔
NEW
1092
        if desiredVSC == "" || actualVSC == desiredVSC {
×
NEW
1093
                return false, nil
×
1094
        }
1095

1✔
1096
        r.log.Info("Snapshot class changed, deleting", "name", snapshot.Name, "from", actualVSC, "to", desiredVSC)
1✔
NEW
1097
        if err := r.client.Delete(ctx, snapshot); err != nil {
×
NEW
1098
                return false, client.IgnoreNotFound(err)
×
1099
        }
1✔
1100
        return true, nil
1✔
NEW
1101
}
×
NEW
1102

×
1103
// getSnapshotClassForDataImportCron returns the VolumeSnapshotClass name to use for DataImportCron snapshots.
2✔
1104
func (r *DataImportCronReconciler) getSnapshotClassForDataImportCron(pvc *corev1.PersistentVolumeClaim, storageProfile *cdiv1.StorageProfile) (string, error) {
1✔
1105
        if vscName := storageProfile.Annotations[cc.AnnSnapshotClassForDataImportCron]; vscName != "" {
1✔
1106
                return vscName, nil
NEW
1107
        }
×
NEW
1108
        return cc.GetSnapshotClassForSmartClone(pvc, &storageProfile.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
×
NEW
1109
}
×
NEW
1110

×
UNCOV
1111
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
×
1112
        dataImportCron.Status.SourceFormat = &format
1113

1114
        switch format {
1115
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1116
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
2✔
1117
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1118
                if snapshot == nil {
1✔
1119
                        // Snapshot create/update will trigger reconcile
1✔
1120
                        return nil
1121
                }
1122
                if cc.IsSnapshotReady(snapshot) {
1✔
1123
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1124
                } else {
1✔
1125
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
1126
                }
1✔
1127
        default:
1✔
1128
                return fmt.Errorf("unknown source format for snapshot")
1✔
1129
        }
2✔
1130

1✔
1131
        return nil
1✔
1132
}
1✔
1133

2✔
1134
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
1135
        format := cdiv1.DataImportCronSourceFormatPvc
2✔
1136
        if desiredStorageClass == nil {
1✔
1137
                return format, nil
1✔
1138
        }
×
1139

×
1140
        storageProfile := &cdiv1.StorageProfile{}
1141
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1142
                return format, err
1✔
1143
        }
1144
        if storageProfile.Status.DataImportCronSourceFormat != nil {
1145
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
1146
        }
1✔
1147

2✔
1148
        return format, nil
1✔
1149
}
1✔
1150

1151
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
1152
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
1153
                return nil
×
1154
        }
×
1155
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
2✔
1156
        if err != nil {
1✔
1157
                return err
1✔
1158
        }
1159

1✔
1160
        maxImports := defaultImportsToKeepPerCron
1161

1162
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
1✔
1163
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
1164
        }
×
1165

×
1166
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
1167
                return err
1✔
1168
        }
×
1169
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
×
1170
                return err
1171
        }
1✔
1172

1✔
1173
        return nil
2✔
1174
}
1✔
1175

1✔
1176
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1177
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
1178

×
1179
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
×
1180
                return err
1✔
1181
        }
×
1182
        if len(pvcList.Items) > maxImports {
×
1183
                sort.Slice(pvcList.Items, func(i, j int) bool {
1184
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
1185
                })
1186
                for _, pvc := range pvcList.Items[maxImports:] {
1187
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1188
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
1189
                                return err
1✔
1190
                        }
1✔
1191
                }
×
1192
        }
×
1193

2✔
1194
        dvList := &cdiv1.DataVolumeList{}
2✔
1195
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1196
                return err
1✔
1197
        }
2✔
1198

1✔
1199
        if len(dvList.Items) > maxImports {
1✔
1200
                for _, dv := range dvList.Items {
×
1201
                        pvc := &corev1.PersistentVolumeClaim{}
×
1202
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1203
                                return err
1204
                        }
1205

1✔
1206
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
1✔
1207
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
×
1208
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
×
1209
                                        return err
1210
                                }
2✔
1211
                        }
2✔
1212
                }
1✔
1213
        }
1✔
1214

×
1215
        return nil
×
1216
}
1217

2✔
1218
// deleteDvPvc deletes DV or PVC if DV was GCed
1✔
1219
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
1220
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
×
1221
        dv := &cdiv1.DataVolume{ObjectMeta: om}
×
1222
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
1223
                return err
1224
        }
1225
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1226
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
1227
                return err
1228
        }
1229
        return nil
1230
}
1✔
1231

1✔
1232
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
1233
        snapList := &snapshotv1.VolumeSnapshotList{}
2✔
1234

1✔
1235
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1236
                if meta.IsNoMatchError(err) {
1✔
1237
                        return nil
1✔
1238
                }
×
1239
                return err
×
1240
        }
1✔
1241
        if len(snapList.Items) > maxImports {
1242
                sort.Slice(snapList.Items, func(i, j int) bool {
1243
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
1✔
1244
                })
1✔
1245
                for _, snap := range snapList.Items[maxImports:] {
1✔
1246
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
1✔
1247
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
1248
                                return err
×
1249
                        }
×
1250
                }
×
1251
        }
1252

1✔
1253
        return nil
×
1254
}
×
1255

×
1256
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
×
1257
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
×
1258
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
×
1259

×
1260
        if err := r.deleteJobs(ctx, cron); err != nil {
×
1261
                return err
1262
        }
1263
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1264
        if err != nil {
1✔
1265
                return err
1266
        }
1267
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
1268
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
1269
                return err
1✔
1270
        }
1✔
1271
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
1272
                return err
×
1273
        }
×
1274
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
1275
                return err
1✔
1276
        }
×
1277
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
×
1278
                return err
1✔
1279
        }
1✔
1280
        return nil
×
1281
}
×
1282

1✔
1283
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
×
1284
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
×
1285
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
1286
        if err != nil {
×
1287
                return err
×
1288
        }
1✔
1289
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
×
1290
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
×
1291
                return err
1✔
1292
        }
1293
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1294
                return err
1✔
1295
        }
1✔
1296

1✔
1297
        return nil
1✔
1298
}
×
1299

×
1300
// NewDataImportCronController creates a new instance of the DataImportCron controller
1✔
1301
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
1✔
1302
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
1303
                Scheme: mgr.GetScheme(),
×
1304
                Mapper: mgr.GetRESTMapper(),
1✔
1305
        })
×
1306
        if err != nil {
×
1307
                return nil, err
1308
        }
1✔
1309
        reconciler := &DataImportCronReconciler{
1310
                client:          mgr.GetClient(),
1311
                uncachedClient:  uncachedClient,
1312
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1313
                scheme:          mgr.GetScheme(),
×
1314
                log:             log.WithName(dataImportControllerName),
×
1315
                image:           importerImage,
×
1316
                pullPolicy:      pullPolicy,
×
1317
                cdiNamespace:    util.GetNamespace(),
×
1318
                installerLabels: installerLabels,
×
1319
        }
×
1320
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1321
                MaxConcurrentReconciles: 3,
×
1322
                Reconciler:              reconciler,
×
1323
        })
×
1324
        if err != nil {
×
1325
                return nil, err
×
1326
        }
×
1327
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
1328
                return nil, err
×
1329
        }
×
1330
        log.Info("Initialized DataImportCron controller")
×
1331
        return dataImportCronController, nil
×
1332
}
×
1333

×
1334
func getCronName(obj client.Object) string {
×
1335
        return obj.GetLabels()[common.DataImportCronLabel]
×
1336
}
×
1337

×
1338
func getCronNs(obj client.Object) string {
×
1339
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
1340
}
×
1341

×
1342
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
1343
        if cronName := getCronName(obj); cronName != "" {
1344
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
1345
        }
×
1346
        return nil
×
1347
}
×
1348

1349
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1350
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1351
                return err
×
1352
        }
1353

×
1354
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1355
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1356
                // Otherwise we risk losing the storage profile event
×
1357
                var crons cdiv1.DataImportCronList
×
1358
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
1359
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
1360
                        return nil
×
1361
                }
×
1362
                // Storage profiles are 1:1 to storage classes
×
1363
                scName := obj.GetName()
×
1364
                var reqs []reconcile.Request
1365
                for _, cron := range crons.Items {
×
1366
                        dataVolume := cron.Spec.Template
×
1367
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1368
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1369
                        if err != nil || templateSc == nil {
×
1370
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1371
                                return reqs
×
1372
                        }
×
1373
                        if templateSc.Name == scName {
1374
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1375
                        }
×
1376
                }
×
1377
                return reqs
×
1378
        }
×
1379

×
1380
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1381
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1382
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1383
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1384
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1385
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1386
                },
×
1387
        )); err != nil {
1388
                return err
×
1389
        }
1390

1391
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1392
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1393
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1394
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1395
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1396
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1397
                },
1398
        )); err != nil {
×
1399
                return err
×
1400
        }
×
1401

1402
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1403
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1404
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1405
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1406
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1407
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1408
                },
1409
        )); err != nil {
×
1410
                return err
×
1411
        }
×
1412

1413
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1414
                return err
×
1415
        }
×
1416

×
1417
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1418
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1419
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
1420
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1421
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1422
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1423
                                return dicRelevantFieldsChanged(e.ObjectOld, e.ObjectNew)
1424
                        },
×
1425
                },
×
1426
        )); err != nil {
×
1427
                return err
1428
        }
×
1429

×
1430
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1431
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1432
        }
×
1433

×
1434
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1435
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1436
                predicate.TypedFuncs[*batchv1.CronJob]{
1437
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1438
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1439
                        },
×
1440
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
1441
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1442
                },
×
1443
        )); err != nil {
×
1444
                return err
1445
        }
×
1446

×
1447
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1448
                if meta.IsNoMatchError(err) {
×
1449
                        // Back out if there's no point to attempt watch
×
1450
                        return nil
×
1451
                }
×
1452
                if !cc.IsErrCacheNotStarted(err) {
×
1453
                        return err
1454
                }
×
1455
        }
×
1456
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1457
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
1458
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1459
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1460
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1461
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1462
                },
×
1463
        )); err != nil {
×
1464
                return err
×
1465
        }
×
1466

1467
        return nil
×
1468
}
×
1469

×
NEW
1470
func dicRelevantFieldsChanged(oldSp, newSp *cdiv1.StorageProfile) bool {
×
NEW
1471
        sourceFormatChanged := oldSp.Status.DataImportCronSourceFormat != newSp.Status.DataImportCronSourceFormat
×
NEW
1472
        rwoAnnotationChanged := oldSp.Annotations[cc.AnnUseReadWriteOnceForDataImportCron] != newSp.Annotations[cc.AnnUseReadWriteOnceForDataImportCron]
×
1473
        snapshotClassChanged := oldSp.Annotations[cc.AnnSnapshotClassForDataImportCron] != newSp.Annotations[cc.AnnSnapshotClassForDataImportCron]
NEW
1474
        return sourceFormatChanged || rwoAnnotationChanged || snapshotClassChanged
×
NEW
1475
}
×
NEW
1476

×
1477
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1478
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1479
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
1480
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
1481
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1482
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1483
                                log.Info("Update", "sc", obj.GetName(),
×
UNCOV
1484
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1485
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
UNCOV
1486
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
×
1487
                                if err != nil {
1488
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
1489
                                }
×
1490
                                return reqs
×
1491
                        },
×
1492
                ),
×
1493
                predicate.TypedFuncs[*storagev1.StorageClass]{
×
1494
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1495
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1496
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1497
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1498
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1499
                        },
×
1500
                },
×
1501
        )); err != nil {
×
1502
                return err
1503
        }
1504

1505
        return nil
×
1506
}
×
1507

×
1508
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1509
        dicList := &cdiv1.DataImportCronList{}
×
1510
        if err := c.List(ctx, dicList); err != nil {
×
1511
                return nil, err
1512
        }
×
1513
        reqs := []reconcile.Request{}
×
1514
        for _, dic := range dicList.Items {
×
1515
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
1516
                        continue
×
1517
                }
1518

1519
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1520
        }
×
1521

×
1522
        return reqs, nil
×
1523
}
×
1524

×
1525
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
×
1526
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
×
1527
                return false, nil
×
1528
        }
1529

1530
        sc := pvc.Spec.StorageClassName
×
1531
        if sc == nil || *sc == desiredStorageClass {
1532
                return false, nil
1533
        }
×
1534

1535
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1536
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
1537
                return false, err
2✔
1538
        }
1✔
1539

1✔
1540
        return true, nil
1541
}
1✔
1542

2✔
1543
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1544
        cronJob := &batchv1.CronJob{}
1✔
1545
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1546
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
1✔
1547
                return false, cc.IgnoreNotFound(err)
1✔
1548
        }
×
1549

×
1550
        cronJobCopy := cronJob.DeepCopy()
1551
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1552
                return false, err
1553
        }
1554

1✔
1555
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
1✔
1556
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1557
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
2✔
1558
                        return false, cc.IgnoreNotFound(err)
1✔
1559
                }
1✔
1560
        }
1561
        return true, nil
1✔
1562
}
1✔
1563

×
1564
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
×
1565
        cronJob := &batchv1.CronJob{
1566
                ObjectMeta: metav1.ObjectMeta{
2✔
1567
                        Name:      GetCronJobName(cron),
1✔
1568
                        Namespace: r.cdiNamespace,
1✔
1569
                },
×
1570
        }
×
1571
        if err := r.initCronJob(cron, cronJob); err != nil {
1572
                return nil, err
1✔
1573
        }
1574
        return cronJob, nil
1575
}
1✔
1576

1✔
1577
// InitPollerPod inits poller Pod
1✔
1578
func InitPollerPod(c client.Client, cron *cdiv1.DataImportCron, pod *corev1.PodTemplateSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1579
        regSource, err := getCronRegistrySource(cron)
1✔
1580
        if err != nil {
1✔
1581
                return err
1✔
1582
        }
1✔
1583
        if regSource.URL == nil {
×
1584
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1585
        }
1✔
1586
        cdiConfig := &cdiv1.CDIConfig{}
1587
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1588
                return err
1589
        }
1✔
1590
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1591
        if err != nil {
1✔
1592
                return err
×
1593
        }
×
1594
        container := corev1.Container{
1✔
1595
                Name:  "cdi-source-update-poller",
×
1596
                Image: image,
×
1597
                Command: []string{
1✔
1598
                        "/usr/bin/cdi-source-update-poller",
1✔
1599
                        "-ns", cron.Namespace,
×
1600
                        "-cron", cron.Name,
×
1601
                        "-url", *regSource.URL,
1✔
1602
                },
1✔
1603
                ImagePullPolicy:          pullPolicy,
×
1604
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
×
1605
                TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
1606
        }
1✔
1607

1✔
1608
        var volumes []corev1.Volume
1✔
1609
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1610
        if hasCertConfigMap {
1✔
1611
                vm := corev1.VolumeMount{
1✔
1612
                        Name:      CertVolName,
1✔
1613
                        MountPath: common.ImporterCertDir,
1✔
1614
                }
1✔
1615
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1616
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
1✔
1617
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
1✔
1618
        }
1✔
1619

1✔
1620
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
1✔
1621
                vm := corev1.VolumeMount{
1✔
1622
                        Name:      ProxyCertVolName,
×
1623
                        MountPath: common.ImporterProxyCertDir,
×
1624
                }
×
1625
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1626
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
×
1627
        }
×
1628

×
1629
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
×
1630
                container.Env = append(container.Env,
1631
                        corev1.EnvVar{
2✔
1632
                                Name: common.ImporterAccessKeyID,
1✔
1633
                                ValueFrom: &corev1.EnvVarSource{
1✔
1634
                                        SecretKeyRef: &corev1.SecretKeySelector{
1✔
1635
                                                LocalObjectReference: corev1.LocalObjectReference{
1✔
1636
                                                        Name: *regSource.SecretRef,
1✔
1637
                                                },
1✔
1638
                                                Key: common.KeyAccess,
1✔
1639
                                        },
1640
                                },
1✔
1641
                        },
×
1642
                        corev1.EnvVar{
×
1643
                                Name: common.ImporterSecretKey,
×
1644
                                ValueFrom: &corev1.EnvVarSource{
×
1645
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1646
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1647
                                                        Name: *regSource.SecretRef,
×
1648
                                                },
×
1649
                                                Key: common.KeySecret,
×
1650
                                        },
×
1651
                                },
×
1652
                        },
×
1653
                )
×
1654
        }
×
1655

×
1656
        addEnvVar := func(varName, value string) {
×
1657
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
×
1658
        }
×
1659

×
1660
        if insecureTLS {
×
1661
                addEnvVar(common.InsecureTLSVar, "true")
×
1662
        }
×
1663

×
1664
        addEnvVarFromImportProxyConfig := func(varName string) {
×
1665
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
×
1666
                        addEnvVar(varName, value)
1667
                }
2✔
1668
        }
1✔
1669

1✔
1670
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1671
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1672
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
×
1673

×
1674
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1675
        if err != nil {
2✔
1676
                return err
2✔
1677
        }
1✔
1678
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1679
        if err != nil {
1680
                return err
1681
        }
1✔
1682

1✔
1683
        podSpec := &pod.Spec
1✔
1684

1✔
1685
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1686
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1687
        podSpec.Containers = []corev1.Container{container}
×
1688
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
×
1689
        podSpec.Volumes = volumes
1✔
1690
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1691
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
×
1692
        podSpec.Tolerations = workloadNodePlacement.Tolerations
×
1693
        podSpec.Affinity = workloadNodePlacement.Affinity
1694

1✔
1695
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1696
        // No need for specifid uid/fsgroup here since this doesn't write or use qemu
1✔
1697
        if podSpec.SecurityContext != nil {
1✔
1698
                podSpec.SecurityContext.FSGroup = nil
1✔
1699
        }
1✔
1700
        if podSpec.Containers[0].SecurityContext != nil {
1✔
1701
                podSpec.Containers[0].SecurityContext.RunAsUser = nil
1✔
1702
        }
1✔
1703

1✔
1704
        if pod.Labels == nil {
1✔
1705
                pod.Labels = map[string]string{}
1✔
1706
        }
1✔
1707
        pod.Labels[common.DataImportCronPollerLabel] = ""
1✔
1708

2✔
1709
        return nil
1✔
1710
}
1✔
1711

2✔
1712
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1713
        cronJobSpec := &cronJob.Spec
1✔
1714
        cronJobSpec.Schedule = cron.Spec.Schedule
1715
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
2✔
1716
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1717
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1718

1✔
1719
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1720
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1721
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1722
        cc.AddAnnotation(&jobSpec.Template, secv1.RequiredSCCAnnotation, common.RestrictedSCCName)
1723

1✔
1724
        pod := &jobSpec.Template
1✔
1725
        if err := InitPollerPod(r.client, cron, pod, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1726
                return err
1✔
1727
        }
1✔
1728
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1729
                return err
1✔
1730
        }
1✔
1731
        return nil
1✔
1732
}
1✔
1733

1✔
1734
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1735
        job := &batchv1.Job{
1✔
1736
                ObjectMeta: metav1.ObjectMeta{
1✔
1737
                        Name:      GetInitialJobName(cron),
×
1738
                        Namespace: cronJob.Namespace,
×
1739
                },
1✔
1740
                Spec: cronJob.Spec.JobTemplate.Spec,
×
1741
        }
×
1742
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1743
                return nil, err
1744
        }
1745
        return job, nil
1✔
1746
}
1✔
1747

1✔
1748
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1749
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1750
                return err
1✔
1751
        }
1✔
1752
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1753
        labels := obj.GetLabels()
1✔
1754
        labels[common.DataImportCronNsLabel] = cron.Namespace
×
1755
        labels[common.DataImportCronLabel] = cron.Name
×
1756
        obj.SetLabels(labels)
1✔
1757
        return nil
1758
}
1759

1✔
1760
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string, storageProfile *cdiv1.StorageProfile) *cdiv1.DataVolume {
1✔
1761
        dv := cron.Spec.Template.DeepCopy()
×
1762
        if isCronRegistrySource(cron) {
×
1763
                var digestedURL string
1✔
1764
                if isURLSource(cron) {
1✔
1765
                        digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1766
                } else if isImageStreamSource(cron) {
1✔
1767
                        // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1768
                        digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1769
                        dv.Spec.Source.Registry.ImageStream = nil
1770
                }
1771
                dv.Spec.Source.Registry.URL = &digestedURL
1✔
1772
        }
1✔
1773
        dv.Name = dataVolumeName
2✔
1774
        dv.Namespace = cron.Namespace
1✔
1775
        r.setDataImportCronResourceLabels(cron, dv)
2✔
1776
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1777
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
3✔
1778
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1779

1✔
1780
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
1✔
1781
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1782
        }
1✔
1783

1784
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1785

1✔
1786
        // Apply RWO access mode as default for DataImportCron (from StorageProfile annotation)
1✔
1787
        // Only applies if the DV doesn't already have AccessModes configured
1✔
1788
        if storageProfile != nil && storageProfile.Annotations[cc.AnnUseReadWriteOnceForDataImportCron] == "true" {
1✔
1789
                if dv.Spec.Storage != nil && len(dv.Spec.Storage.AccessModes) == 0 {
1✔
1790
                        dv.Spec.Storage.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}
1✔
1791
                }
2✔
1792
        }
1✔
1793

1✔
1794
        return dv
1795
}
1✔
1796

1✔
1797
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1798
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1799
        cc.CopyAllowedLabels(cron.GetLabels(), obj, true)
2✔
1800
        labels := obj.GetLabels()
2✔
1801
        labels[common.DataImportCronLabel] = cron.Name
1✔
1802
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
1✔
1803
                labels[common.DataImportCronCleanupLabel] = "true"
1804
        }
1805
        obj.SetLabels(labels)
1✔
1806
}
1807

1808
func untagDigestedDockerURL(dockerURL string) string {
1✔
1809
        if u, err := url.Parse(dockerURL); err == nil {
1✔
1810
                url := u.Host + u.Path
1✔
1811
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1812
                // Check for tag
1✔
1813
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1814
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
1✔
1815
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1816
                        }
1✔
1817
                }
1818
        }
1819
        return dockerURL
1✔
1820
}
2✔
1821

1✔
1822
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1823
        if val := cron.Labels[ann]; val != "" {
1✔
1824
                cc.AddLabel(dv, ann, val)
2✔
1825
        }
2✔
1826
}
1✔
1827

1✔
1828
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1829
        if val := cron.Annotations[ann]; val != "" {
1830
                cc.AddAnnotation(dv, ann, val)
1✔
1831
        }
1832
}
1833

1✔
1834
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
2✔
1835
        dataSource := &cdiv1.DataSource{
1✔
1836
                ObjectMeta: metav1.ObjectMeta{
1✔
1837
                        Name:      cron.Spec.ManagedDataSource,
1838
                        Namespace: cron.Namespace,
1839
                },
1✔
1840
        }
1✔
1841
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
×
1842
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
×
1843
        cc.CopyAllowedLabels(cron.GetLabels(), dataSource, true)
1844
        return dataSource
1845
}
1✔
1846

1✔
1847
// Create DataVolume name based on the DataSource name + prefix of the digest
1✔
1848
func createDvName(prefix, digest string) (string, error) {
1✔
1849
        digestPrefix := ""
1✔
1850
        if strings.HasPrefix(digest, digestSha256Prefix) {
1✔
1851
                digestPrefix = digestSha256Prefix
1✔
1852
        } else if strings.HasPrefix(digest, digestUIDPrefix) {
1✔
1853
                digestPrefix = digestUIDPrefix
1✔
1854
        } else {
1✔
1855
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1856
        }
1✔
1857
        fromIdx := len(digestPrefix)
1858
        toIdx := fromIdx + digestDvNameSuffixLength
1859
        if len(digest) < toIdx {
1✔
1860
                return "", errors.Errorf("Digest is too short")
1✔
1861
        }
2✔
1862
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1863
}
3✔
1864

1✔
1865
// GetCronJobName get CronJob name based on cron name and UID
2✔
1866
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1867
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1868
}
1✔
1869

1✔
1870
// GetInitialJobName get initial job name based on cron name and UID
2✔
1871
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1872
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1873
}
1✔
1874

1875
func getPollerPodName(cron *cdiv1.DataImportCron) string {
1876
        return naming.GetResourceName("poller-"+cron.Name, string(cron.UID)[:8])
1877
}
1✔
1878

1✔
1879
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1880
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1881
}
1882

1✔
1883
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1884
        dv := &cron.Spec.Template
1✔
1885

1886
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1887
                return explicitVolumeMode, nil
1✔
1888
        }
1✔
1889

1890
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1891
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1892
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1893
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1894
                        AccessModes:      accessModes,
1✔
1895
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1896
                        Resources: corev1.VolumeResourceRequirements{
1✔
1897
                                Requests: corev1.ResourceList{
1✔
1898
                                        // Doesn't matter
×
1899
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
×
1900
                                },
1901
                        },
1✔
1902
                },
1✔
1903
        }
1✔
1904
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1905
                return nil, err
1✔
1906
        }
1✔
1907

1✔
1908
        return inferredPvc.Spec.VolumeMode, nil
1✔
1909
}
1✔
1910

1✔
1911
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1✔
1912
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1913
        if dv.Spec.PVC != nil {
1✔
1914
                return dv.Spec.PVC.VolumeMode
1✔
1915
        }
1✔
1916

×
1917
        if dv.Spec.Storage != nil {
×
1918
                return dv.Spec.Storage.VolumeMode
1919
        }
1✔
1920

1921
        return nil
1922
}
1923

1✔
1924
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1✔
1925
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
×
1926
        if dv.Spec.PVC != nil {
×
1927
                return dv.Spec.PVC.AccessModes
1928
        }
2✔
1929

1✔
1930
        if dv.Spec.Storage != nil {
1✔
1931
                return dv.Spec.Storage.AccessModes
1932
        }
×
1933

1934
        return nil
1935
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc