• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5789

22 Jan 2026 12:45PM UTC coverage: 49.487% (+0.03%) from 49.454%
#5789

Pull #3991

travis-ci

noamasu
Add provisioner-aware DataImportCron configuration via StorageProfile annotations

Some storage provisioners have specific requirements when creating boot source snapshots. For example, GKE Persistent Disk needs a VolumeSnapshotClass with snapshot-type: images parameter, and snapshots must be taken from an RWO PVC.
This change makes CDI automatically detect these provisioner-specific needs and configure DataImportCron accordingly.
The StorageProfile controller now reconciles two new annotations based on the underlying provisioner: cdi.kubevirt.io/useReadWriteOnceForDataImportCron signals that RWO should be used for access mode, and cdi.kubevirt.io/snapshotClassForDataImportCron specifies which VolumeSnapshotClass to use for boot source snapshots.
The DataImportCron controller watches these annotations and ensures the correct VolumeSnapshotClass and access mode are used.

Signed-off-by: Noam Assouline <nassouli@redhat.com>
Pull Request #3991: Add provisioner-aware VolumeSnapshotClass selection and RWO access mode for DataImportCron

80 of 142 new or added lines in 3 files covered. (56.34%)

9 existing lines in 2 files now uncovered.

14709 of 29723 relevant lines covered (49.49%)

0.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.86
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        secv1 "github.com/openshift/api/security/v1"
33
        "github.com/pkg/errors"
34
        cronexpr "github.com/robfig/cron/v3"
35

36
        authorizationv1 "k8s.io/api/authorization/v1"
37
        batchv1 "k8s.io/api/batch/v1"
38
        corev1 "k8s.io/api/core/v1"
39
        storagev1 "k8s.io/api/storage/v1"
40
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
41
        "k8s.io/apimachinery/pkg/api/meta"
42
        "k8s.io/apimachinery/pkg/api/resource"
43
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
44
        "k8s.io/apimachinery/pkg/labels"
45
        "k8s.io/apimachinery/pkg/runtime"
46
        "k8s.io/apimachinery/pkg/types"
47
        "k8s.io/client-go/tools/record"
48
        openapicommon "k8s.io/kube-openapi/pkg/common"
49
        "k8s.io/utils/ptr"
50

51
        "sigs.k8s.io/controller-runtime/pkg/client"
52
        "sigs.k8s.io/controller-runtime/pkg/controller"
53
        "sigs.k8s.io/controller-runtime/pkg/event"
54
        "sigs.k8s.io/controller-runtime/pkg/handler"
55
        "sigs.k8s.io/controller-runtime/pkg/manager"
56
        "sigs.k8s.io/controller-runtime/pkg/predicate"
57
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
58
        "sigs.k8s.io/controller-runtime/pkg/source"
59

60
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
61
        "kubevirt.io/containerized-data-importer/pkg/common"
62
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
63
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
64
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
65
        "kubevirt.io/containerized-data-importer/pkg/operator"
66
        "kubevirt.io/containerized-data-importer/pkg/util"
67
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
68
)
69

70
const (
71
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
72
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
73
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
74
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
75
)
76

77
// DataImportCronReconciler members
78
type DataImportCronReconciler struct {
79
        client          client.Client
80
        uncachedClient  client.Client
81
        recorder        record.EventRecorder
82
        scheme          *runtime.Scheme
83
        log             logr.Logger
84
        image           string
85
        pullPolicy      string
86
        cdiNamespace    string
87
        installerLabels map[string]string
88
}
89

90
const (
91
        // AnnSourceDesiredDigest is the digest of the pending updated image
92
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
93
        // AnnImageStreamDockerRef is the ImageStream Docker reference
94
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
95
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
96
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
97
        // AnnLastCronTime is the cron last execution time stamp
98
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
99
        // AnnLastUseTime is the PVC last use time stamp
100
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
101
        // AnnStorageClass is the cron DV's storage class
102
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
103

104
        dataImportControllerName    = "dataimportcron-controller"
105
        digestSha256Prefix          = "sha256:"
106
        digestUIDPrefix             = "uid:"
107
        digestDvNameSuffixLength    = 12
108
        cronJobUIDSuffixLength      = 8
109
        defaultImportsToKeepPerCron = 3
110
)
111

112
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
113

114
// Reconcile loop for DataImportCronReconciler
115
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
116
        dataImportCron := &cdiv1.DataImportCron{}
1✔
117
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
118
                return reconcile.Result{}, err
1✔
119
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
120
                err := r.cleanup(ctx, req.NamespacedName)
1✔
121
                return reconcile.Result{}, err
1✔
122
        }
1✔
123
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
124
        if !shouldReconcile || err != nil {
1✔
125
                return reconcile.Result{}, err
×
126
        }
×
127

128
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
129
                return reconcile.Result{}, err
×
130
        }
×
131

132
        return r.update(ctx, dataImportCron)
1✔
133
}
134

135
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
136
        dataSource := &cdiv1.DataSource{}
1✔
137
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
138
                if k8serrors.IsNotFound(err) {
2✔
139
                        return true, nil
1✔
140
                }
1✔
141
                return false, err
×
142
        }
143
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
144
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
145
                return true, nil
1✔
146
        }
1✔
147
        otherCron := &cdiv1.DataImportCron{}
1✔
148
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
149
                if k8serrors.IsNotFound(err) {
2✔
150
                        return true, nil
1✔
151
                }
1✔
152
                return false, err
×
153
        }
154
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
155
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
156
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
157
                r.log.V(3).Info(msg)
1✔
158
                return false, nil
1✔
159
        }
1✔
160
        return true, nil
×
161
}
162

163
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
164
        if dataImportCron.Spec.Schedule == "" {
2✔
165
                return nil
1✔
166
        }
1✔
167
        if isControllerPolledSource(dataImportCron) {
2✔
168
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
169
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
170
                }
1✔
171
                return nil
1✔
172
        }
173
        if !isURLSource(dataImportCron) {
1✔
174
                return nil
×
175
        }
×
176
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
177
        if exists || err != nil {
2✔
178
                return err
1✔
179
        }
1✔
180
        cronJob, err := r.newCronJob(dataImportCron)
1✔
181
        if err != nil {
1✔
182
                return err
×
183
        }
×
184
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
185
                return err
×
186
        }
×
187
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
188
        if err != nil {
1✔
189
                return err
×
190
        }
×
191
        if err := r.client.Create(ctx, job); err != nil {
1✔
192
                return err
×
193
        }
×
194
        return nil
1✔
195
}
196

197
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
198
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
199
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
200
        }
×
201
        imageStream := &imagev1.ImageStream{}
1✔
202
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
203
        if err != nil {
2✔
204
                return nil, "", err
1✔
205
        }
1✔
206
        imageStreamNamespacedName := types.NamespacedName{
1✔
207
                Namespace: imageStreamNamespace,
1✔
208
                Name:      name,
1✔
209
        }
1✔
210
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
211
                return nil, "", err
1✔
212
        }
1✔
213
        return imageStream, tag, nil
1✔
214
}
215

216
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
217
        if imageStream == nil {
1✔
218
                return "", "", errors.Errorf("No ImageStream")
×
219
        }
×
220
        tags := imageStream.Status.Tags
1✔
221
        if len(tags) == 0 {
1✔
222
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
223
        }
×
224

225
        tagIdx := 0
1✔
226
        if len(imageStreamTag) > 0 {
2✔
227
                tagIdx = -1
1✔
228
                for i, tag := range tags {
2✔
229
                        if tag.Tag == imageStreamTag {
2✔
230
                                tagIdx = i
1✔
231
                                break
1✔
232
                        }
233
                }
234
        }
235
        if tagIdx == -1 {
2✔
236
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
237
        }
1✔
238

239
        if len(tags[tagIdx].Items) == 0 {
2✔
240
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
241
        }
1✔
242
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
243
}
244

245
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
246
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
247
                return imageStreamName, "", nil
1✔
248
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
249
                return subs[0], subs[1], nil
1✔
250
        }
1✔
251
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
252
}
253

254
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
255
        nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]
1✔
256
        if nextTimeStr == "" {
1✔
257
                return r.setNextCronTime(dataImportCron)
×
258
        }
×
259
        nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
260
        if err != nil {
1✔
261
                return reconcile.Result{}, err
×
262
        }
×
263
        if nextTime.After(time.Now()) {
2✔
264
                return r.setNextCronTime(dataImportCron)
1✔
265
        }
1✔
266
        switch {
1✔
267
        case isImageStreamSource(dataImportCron):
1✔
268
                if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
269
                        return reconcile.Result{}, err
1✔
270
                }
1✔
271
        case isPvcSource(dataImportCron):
1✔
272
                if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
2✔
273
                        return reconcile.Result{}, err
1✔
274
                }
1✔
275
        case isNodePull(dataImportCron):
1✔
276
                if done, err := r.updateContainerImageDesiredDigest(ctx, dataImportCron); !done {
2✔
277
                        return reconcile.Result{RequeueAfter: 3 * time.Second}, err
1✔
278
                } else if err != nil {
2✔
279
                        return reconcile.Result{}, err
×
280
                }
×
281
        }
282

283
        return r.setNextCronTime(dataImportCron)
1✔
284
}
285

286
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
287
        now := time.Now()
1✔
288
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
289
        if err != nil {
1✔
290
                return reconcile.Result{}, err
×
291
        }
×
292
        nextTime := expr.Next(now)
1✔
293
        requeueAfter := nextTime.Sub(now)
1✔
294
        res := reconcile.Result{RequeueAfter: requeueAfter}
1✔
295
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
296
        return res, err
1✔
297
}
298

299
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
300
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
301
        return err == nil && regSource.ImageStream != nil
1✔
302
}
1✔
303

304
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
305
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
306
        return err == nil && regSource.URL != nil
1✔
307
}
1✔
308

309
func isNodePull(cron *cdiv1.DataImportCron) bool {
1✔
310
        regSource, err := getCronRegistrySource(cron)
1✔
311
        return err == nil && regSource != nil && regSource.PullMethod != nil &&
1✔
312
                *regSource.PullMethod == cdiv1.RegistryPullNode
1✔
313
}
1✔
314

315
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
316
        if !isCronRegistrySource(cron) {
2✔
317
                return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
1✔
318
        }
1✔
319
        return cron.Spec.Template.Spec.Source.Registry, nil
1✔
320
}
321

322
func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
1✔
323
        source := cron.Spec.Template.Spec.Source
1✔
324
        return source != nil && source.Registry != nil
1✔
325
}
1✔
326

327
func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
1✔
328
        if !isPvcSource(cron) {
1✔
329
                return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
×
330
        }
×
331
        return cron.Spec.Template.Spec.Source.PVC, nil
1✔
332
}
333

334
func isPvcSource(cron *cdiv1.DataImportCron) bool {
1✔
335
        source := cron.Spec.Template.Spec.Source
1✔
336
        return source != nil && source.PVC != nil
1✔
337
}
1✔
338

339
func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
1✔
340
        return isImageStreamSource(cron) || isPvcSource(cron) || isNodePull(cron)
1✔
341
}
1✔
342

343
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
344
        res := reconcile.Result{}
1✔
345

1✔
346
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
347
        if err != nil {
1✔
348
                return res, err
×
349
        }
×
350

351
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
352
        imports := dataImportCron.Status.CurrentImports
1✔
353
        importSucceeded := false
1✔
354

1✔
355
        dataVolume := dataImportCron.Spec.Template
1✔
356
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
357
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
358
        if err != nil {
1✔
359
                return res, err
×
360
        }
×
361
        if desiredStorageClass != nil {
2✔
362
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
363
                        return res, err
1✔
364
                }
1✔
365
                currentSc, hasCurrent := dataImportCron.Annotations[AnnStorageClass]
1✔
366
                desiredSc := desiredStorageClass.Name
1✔
367
                if hasCurrent && currentSc != desiredSc {
2✔
368
                        r.log.Info("Storage class changed, delete most recent source on the old sc as it's no longer the desired", "currentSc", currentSc, "desiredSc", desiredSc)
1✔
369
                        if err := r.handleStorageClassChange(ctx, dataImportCron, desiredSc); err != nil {
1✔
370
                                return res, err
×
371
                        }
×
372
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
373
                }
374
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
375
        }
376
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
377
        if err != nil {
1✔
378
                return res, err
×
379
        }
×
380
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
381
        if err != nil {
1✔
382
                return res, err
×
383
        }
×
384

385
        handlePopulatedPvc := func() error {
2✔
386
                if pvc != nil {
2✔
387
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
388
                                return err
×
389
                        }
×
390
                }
391
                importSucceeded = true
1✔
392
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
393
                        return err
×
394
                }
×
395

396
                return nil
1✔
397
        }
398

399
        switch {
1✔
400
        case dv != nil:
1✔
401
                switch dv.Status.Phase {
1✔
402
                case cdiv1.Succeeded:
1✔
403
                        if err := handlePopulatedPvc(); err != nil {
1✔
404
                                return res, err
×
405
                        }
×
406
                case cdiv1.ImportScheduled:
1✔
407
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
408
                case cdiv1.ImportInProgress:
1✔
409
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
410
                default:
1✔
411
                        dvPhase := string(dv.Status.Phase)
1✔
412
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
413
                }
414
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
415
                if err := handlePopulatedPvc(); err != nil {
1✔
416
                        return res, err
×
417
                }
×
418
        case snapshot != nil:
1✔
419
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
420
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
421
                                return res, err
×
422
                        }
×
423
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
424
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
425
                }
426
                // Check if snapshot class changed
427
                if desiredStorageClass != nil {
2✔
428
                        changed, err := r.handleSnapshotClassChange(ctx, snapshot, desiredStorageClass.Name)
1✔
429
                        if err != nil {
1✔
NEW
430
                                return res, err
×
NEW
431
                        }
×
432
                        if changed {
1✔
NEW
433
                                return reconcile.Result{RequeueAfter: time.Second}, nil
×
NEW
434
                        }
×
435
                }
436
                // Below k8s 1.29 there's no way to know the source volume mode
437
                // Let's at least expose this info on our own snapshots
438
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
439
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
440
                        if err != nil {
1✔
441
                                return res, err
×
442
                        }
×
443
                        if volMode != nil {
2✔
444
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
445
                        }
1✔
446
                }
447
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
2✔
448
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
449
                if err != nil {
2✔
450
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
451
                                return res, err
1✔
452
                        }
453
                } else {
454
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
455
                }
2✔
456
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
457
                        return res, err
×
458
                }
×
459
                importSucceeded = true
1✔
460
        default:
1✔
461
                if len(imports) > 0 {
1✔
462
                        imports = imports[1:]
1✔
463
                        dataImportCron.Status.CurrentImports = imports
×
464
                }
×
465
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
466
        }
1✔
467

2✔
468
        if importSucceeded {
1✔
469
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
470
                        return res, err
1✔
471
                }
1✔
472
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
473
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
474
                        return res, err
2✔
475
                }
1✔
476
        }
×
477

×
478
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
479
                return res, err
1✔
480
        }
×
481

×
482
        // Skip if schedule is disabled
483
        if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
484
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
485
                pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
×
486
                if err != nil {
×
487
                        return pollRes, err
488
                }
489
                res = pollRes
2✔
490
        }
1✔
491

1✔
492
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
2✔
493
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
494
        if digestUpdated {
1✔
495
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
496
                if dv != nil {
497
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
498
                                return res, err
1✔
499
                        }
1✔
500
                }
2✔
501
                if importSucceeded || len(imports) == 0 {
1✔
502
                        if err := r.createImportDataVolume(ctx, dataImportCron, desiredStorageClass); err != nil {
1✔
503
                                return res, err
×
504
                        }
×
505
                }
×
506
        } else if importSucceeded {
507
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
2✔
508
                        return res, err
2✔
509
                }
1✔
510
        } else if len(imports) > 0 {
1✔
511
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
512
        } else {
2✔
513
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
514
        }
×
515

×
516
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
2✔
517
                return res, err
1✔
518
        }
2✔
519

1✔
520
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
1✔
521
                if err := r.client.Update(ctx, dataImportCron); err != nil {
522
                        return res, err
1✔
523
                }
×
524
        }
×
525
        return res, nil
526
}
2✔
527

1✔
528
// Returns the current import DV if exists, and the last imported PVC
×
529
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
×
530
        imports := cron.Status.CurrentImports
531
        if len(imports) == 0 {
1✔
532
                return nil, nil, nil
533
        }
534

535
        dvName := imports[0].DataVolumeName
1✔
536
        dv := &cdiv1.DataVolume{}
1✔
537
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
538
                if !k8serrors.IsNotFound(err) {
1✔
539
                        return nil, nil, err
1✔
540
                }
541
                dv = nil
1✔
542
        }
1✔
543

2✔
544
        pvc := &corev1.PersistentVolumeClaim{}
1✔
545
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
×
546
                if !k8serrors.IsNotFound(err) {
×
547
                        return nil, nil, err
1✔
548
                }
549
                pvc = nil
550
        }
1✔
551
        return dv, pvc, nil
2✔
552
}
1✔
553

×
554
// Returns the current import DV if exists, and the last imported PVC
×
555
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
556
        imports := cron.Status.CurrentImports
557
        if len(imports) == 0 {
1✔
558
                return nil, nil
559
        }
560

561
        snapName := imports[0].DataVolumeName
1✔
562
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
563
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
564
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
565
                        return nil, err
1✔
566
                }
567
                return nil, nil
1✔
568
        }
1✔
569

2✔
570
        return snapshot, nil
1✔
571
}
×
572

×
573
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
574
        dataSourceName := dataImportCron.Spec.ManagedDataSource
575
        dataSource := &cdiv1.DataSource{}
576
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
1✔
577
                return nil, err
578
        }
579
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
580
                log := r.log.WithName("getCronManagedDataSource")
1✔
581
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
1✔
582
                return nil, ErrNotManagedByCron
2✔
583
        }
1✔
584
        return dataSource, nil
1✔
585
}
1✔
586

×
587
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
×
588
        objCopy := obj.DeepCopyObject()
×
589
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
×
590
        r.setDataImportCronResourceLabels(cron, obj)
1✔
591
        if !reflect.DeepEqual(obj, objCopy) {
592
                if err := r.client.Update(ctx, obj); err != nil {
593
                        return err
1✔
594
                }
1✔
595
        }
1✔
596
        return nil
1✔
597
}
2✔
598

1✔
599
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
600
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
601
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
602
                if cond.Status == corev1.ConditionFalse &&
1✔
603
                        (cond.Reason == common.GenericError || cond.Reason == ImagePullFailedReason) {
604
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
605
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
606
                        dv.Labels[common.DataImportCronLabel] = ""
×
607
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
608
                                return err
×
609
                        }
×
610
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
611
                                return err
×
612
                        }
×
613
                        cron.Status.CurrentImports = nil
×
614
                }
×
615
        }
×
616
        return nil
×
617
}
×
618

×
619
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
×
620
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
621
        regSource, err := getCronRegistrySource(dataImportCron)
622
        if err != nil {
×
623
                return err
624
        }
625
        if regSource.ImageStream == nil {
1✔
626
                return nil
1✔
627
        }
1✔
628
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
629
        if err != nil {
×
630
                return err
×
631
        }
1✔
632
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
×
633
        if err != nil {
×
634
                return err
1✔
635
        }
2✔
636
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
637
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
1✔
638
                log.Info("Updating DataImportCron", "digest", digest)
1✔
639
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
2✔
640
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
641
        }
1✔
642
        return nil
1✔
643
}
2✔
644

1✔
645
func (r *DataImportCronReconciler) updateContainerImageDesiredDigest(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
646
        log := r.log.WithValues("name", cron.Name).WithValues("uid", cron.UID)
1✔
647
        podName := getPollerPodName(cron)
1✔
648
        ns := cron.Namespace
1✔
649
        nn := types.NamespacedName{Name: podName, Namespace: ns}
650
        pod := &corev1.Pod{}
651

1✔
652
        if err := r.client.Get(ctx, nn, pod); err == nil {
1✔
653
                digest, err := fetchContainerImageDigest(pod)
1✔
654
                if err != nil || digest == "" {
1✔
655
                        return false, err
1✔
656
                }
1✔
657
                cc.AddAnnotation(cron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
658
                if cron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
659
                        log.Info("Updating DataImportCron", "digest", digest)
1✔
660
                        cc.AddAnnotation(cron, AnnSourceDesiredDigest, digest)
1✔
661
                }
×
662
                return true, r.client.Delete(ctx, pod)
×
663
        } else if cc.IgnoreNotFound(err) != nil {
1✔
664
                return false, err
2✔
665
        }
1✔
666

1✔
667
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(ctx, r.client)
1✔
668
        if err != nil {
1✔
669
                return false, err
1✔
670
        }
×
671
        platform := cron.Spec.Template.Spec.Source.Registry.Platform
×
672
        if platform != nil && platform.Architecture != "" {
673
                if workloadNodePlacement.NodeSelector == nil {
1✔
674
                        workloadNodePlacement.NodeSelector = map[string]string{}
1✔
675
                }
×
676
                workloadNodePlacement.NodeSelector[corev1.LabelArchStable] = platform.Architecture
×
677
        }
1✔
678

1✔
679
        containerImage := strings.TrimPrefix(*cron.Spec.Template.Spec.Source.Registry.URL, "docker://")
×
680

×
681
        pod = &corev1.Pod{
×
682
                ObjectMeta: metav1.ObjectMeta{
×
683
                        Name:      podName,
684
                        Namespace: ns,
685
                        OwnerReferences: []metav1.OwnerReference{
1✔
686
                                {
1✔
687
                                        APIVersion:         cron.APIVersion,
1✔
688
                                        Kind:               cron.Kind,
1✔
689
                                        Name:               cron.Name,
1✔
690
                                        UID:                cron.UID,
1✔
691
                                        BlockOwnerDeletion: ptr.To[bool](true),
1✔
692
                                        Controller:         ptr.To[bool](true),
1✔
693
                                },
1✔
694
                        },
1✔
695
                },
1✔
696
                Spec: corev1.PodSpec{
1✔
697
                        TerminationGracePeriodSeconds: ptr.To[int64](0),
1✔
698
                        RestartPolicy:                 corev1.RestartPolicyNever,
1✔
699
                        NodeSelector:                  workloadNodePlacement.NodeSelector,
1✔
700
                        Tolerations:                   workloadNodePlacement.Tolerations,
1✔
701
                        Affinity:                      workloadNodePlacement.Affinity,
1✔
702
                        Volumes: []corev1.Volume{
1✔
703
                                {
1✔
704
                                        Name: "shared-volume",
1✔
705
                                        VolumeSource: corev1.VolumeSource{
1✔
706
                                                EmptyDir: &corev1.EmptyDirVolumeSource{},
1✔
707
                                        },
1✔
708
                                },
1✔
709
                        },
1✔
710
                        InitContainers: []corev1.Container{
1✔
711
                                {
1✔
712
                                        Name:                     "init",
1✔
713
                                        Image:                    r.image,
1✔
714
                                        ImagePullPolicy:          corev1.PullPolicy(r.pullPolicy),
1✔
715
                                        Command:                  []string{"sh", "-c", "cp /usr/bin/cdi-containerimage-server /shared/server"},
1✔
716
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
717
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
718
                                },
1✔
719
                        },
1✔
720
                        Containers: []corev1.Container{
1✔
721
                                {
1✔
722
                                        Name:                     "image-container",
1✔
723
                                        Image:                    containerImage,
1✔
724
                                        ImagePullPolicy:          corev1.PullAlways,
1✔
725
                                        Command:                  []string{"/shared/server", "-h"},
1✔
726
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
727
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
728
                                },
1✔
729
                        },
1✔
730
                },
1✔
731
        }
1✔
732

1✔
733
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
734
        if pod.Spec.SecurityContext != nil {
1✔
735
                pod.Spec.SecurityContext.FSGroup = nil
1✔
736
        }
1✔
737

1✔
738
        return false, r.client.Create(ctx, pod)
1✔
739
}
1✔
740

2✔
741
func fetchContainerImageDigest(pod *corev1.Pod) (string, error) {
1✔
742
        if len(pod.Status.ContainerStatuses) == 0 {
1✔
743
                return "", nil
744
        }
1✔
745

746
        status := pod.Status.ContainerStatuses[0]
747
        if status.State.Waiting != nil {
1✔
748
                reason := status.State.Waiting.Reason
1✔
749
                switch reason {
×
750
                case "ImagePullBackOff", "ErrImagePull", "InvalidImageName":
×
751
                        return "", errors.Errorf("%s %s: %s", common.ImagePullFailureText, status.Image, reason)
752
                }
1✔
753
                return "", nil
1✔
754
        }
×
755

×
756
        if status.State.Terminated == nil {
×
757
                return "", nil
×
758
        }
759

×
760
        imageID := status.ImageID
761
        if imageID == "" {
762
                return "", errors.Errorf("Container has no imageID")
1✔
763
        }
×
764
        idx := strings.Index(imageID, digestSha256Prefix)
×
765
        if idx < 0 {
766
                return "", errors.Errorf("Container image %s ID has no digest: %s", status.Image, imageID)
1✔
767
        }
1✔
768

×
769
        return imageID[idx:], nil
×
770
}
1✔
771

1✔
772
func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
×
773
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
×
774
        pvcSource, err := getCronPvcSource(dataImportCron)
775
        if err != nil {
1✔
776
                return err
777
        }
778
        ns := pvcSource.Namespace
1✔
779
        if ns == "" {
1✔
780
                ns = dataImportCron.Namespace
1✔
781
        }
1✔
782
        pvc := &corev1.PersistentVolumeClaim{}
×
783
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
×
784
                return err
1✔
785
        }
2✔
786
        digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
1✔
787
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
788
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
1✔
789
                log.Info("Updating DataImportCron", "digest", digest)
2✔
790
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
791
        }
1✔
792
        return nil
1✔
793
}
1✔
794

2✔
795
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
796
        log := r.log.WithName("updateDataSource")
1✔
797
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
798
        if err != nil {
1✔
799
                if k8serrors.IsNotFound(err) {
800
                        dataSource = r.newDataSource(dataImportCron)
801
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
802
                                return err
1✔
803
                        }
1✔
804
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
2✔
805
                } else if errors.Is(err, ErrNotManagedByCron) {
2✔
806
                        return nil
1✔
807
                } else {
1✔
808
                        return err
×
809
                }
×
810
        }
1✔
811
        dataSourceCopy := dataSource.DeepCopy()
×
812
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
×
813

×
814
        sourcePVC := dataImportCron.Status.LastImportedPVC
×
815
        populateDataSource(format, dataSource, sourcePVC)
×
816

817
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
1✔
818
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
819
                        return err
1✔
820
                }
1✔
821
        }
1✔
822

1✔
823
        return nil
2✔
824
}
1✔
825

×
826
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
×
827
        if sourcePVC == nil {
828
                return
829
        }
1✔
830

831
        switch format {
832
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
833
                dataSource.Spec.Source = cdiv1.DataSourceSource{
2✔
834
                        PVC: sourcePVC,
1✔
835
                }
1✔
836
        case cdiv1.DataImportCronSourceFormatSnapshot:
837
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
838
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
839
                                Namespace: sourcePVC.Namespace,
1✔
840
                                Name:      sourcePVC.Name,
1✔
841
                        },
1✔
842
                }
1✔
843
        }
1✔
844
}
1✔
845

1✔
846
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
847
        if dataImportCron.Status.CurrentImports == nil {
1✔
848
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
1✔
849
        }
850
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
851
                Namespace: dataImportCron.Namespace,
852
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
853
        }
1✔
854
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
×
855
                dataImportCron.Status.LastImportedPVC = sourcePVC
×
856
                now := metav1.Now()
1✔
857
                dataImportCron.Status.LastImportTimestamp = &now
1✔
858
        }
1✔
859
        return nil
1✔
860
}
2✔
861

1✔
862
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
863
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
864
        if lastTimeStr == "" {
1✔
865
                return nil
1✔
866
        }
867
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
868
        if err != nil {
1✔
869
                return err
1✔
870
        }
2✔
871
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
1✔
872
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
873
        }
1✔
874
        return nil
1✔
875
}
×
876

×
877
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass *storagev1.StorageClass) error {
2✔
878
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
879
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
880
        if digest == "" {
1✔
881
                return nil
882
        }
883
        dvName, err := createDvName(dataSourceName, digest)
1✔
884
        if err != nil {
1✔
885
                return err
1✔
886
        }
1✔
887

×
888
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
×
889
        for _, src := range sources {
1✔
890
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
891
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
892
                                return err
1✔
893
                        }
894
                } else {
1✔
895
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
2✔
896
                                return err
2✔
897
                        }
1✔
898
                        // If source exists don't create DV
×
899
                        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
×
900
                        return nil
1✔
901
                }
1✔
902
        }
×
903

×
904
        storageProfile := &cdiv1.StorageProfile{}
905
        if desiredStorageClass != nil {
1✔
906
                if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
907
                        return err
908
                }
909
        }
910
        dv := r.newSourceDataVolume(dataImportCron, dvName, storageProfile)
1✔
911
        if allowed, err := r.authorizeCloneDataVolume(dataImportCron, dv); err != nil {
2✔
912
                return err
1✔
UNCOV
913
        } else if !allowed {
×
UNCOV
914
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse,
×
915
                        "Not authorized to create DataVolume", notAuthorized)
916
                return nil
1✔
917
        }
1✔
918
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
×
919
                return err
2✔
920
        }
1✔
921
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
922

1✔
923
        return nil
1✔
924
}
1✔
925

×
926
func (r *DataImportCronReconciler) authorizeCloneDataVolume(dataImportCron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) (bool, error) {
×
927
        if !isPvcSource(dataImportCron) {
1✔
928
                return true, nil
1✔
929
        }
1✔
930
        saName := "default"
931
        if dataImportCron.Spec.ServiceAccountName != nil {
932
                saName = *dataImportCron.Spec.ServiceAccountName
1✔
933
        }
2✔
934
        if resp, err := dv.AuthorizeSA(dv.Namespace, dv.Name, &authProxy{r.client}, dataImportCron.Namespace, saName); err != nil {
1✔
935
                return false, err
1✔
936
        } else if !resp.Allowed {
1✔
937
                r.log.Info("Not authorized to create DataVolume", "cron", dataImportCron.Name, "reason", resp.Reason)
1✔
938
                return false, nil
×
939
        }
×
940

1✔
941
        return true, nil
×
942
}
2✔
943

1✔
944
type authProxy struct {
1✔
945
        client client.Client
1✔
946
}
947

1✔
948
func (p *authProxy) CreateSar(sar *authorizationv1.SubjectAccessReview) (*authorizationv1.SubjectAccessReview, error) {
949
        if err := p.client.Create(context.TODO(), sar); err != nil {
950
                return nil, err
951
        }
952
        return sar, nil
953
}
954

1✔
955
func (p *authProxy) GetNamespace(name string) (*corev1.Namespace, error) {
1✔
956
        ns := &corev1.Namespace{}
×
957
        if err := p.client.Get(context.TODO(), types.NamespacedName{Name: name}, ns); err != nil {
×
958
                return nil, err
1✔
959
        }
960
        return ns, nil
961
}
1✔
962

1✔
963
func (p *authProxy) GetDataSource(namespace, name string) (*cdiv1.DataSource, error) {
1✔
964
        das := &cdiv1.DataSource{}
×
965
        if err := p.client.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, das); err != nil {
×
966
                return nil, err
1✔
967
        }
968
        return das, nil
969
}
×
970

×
971
func (r *DataImportCronReconciler) handleStorageClassChange(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
×
972
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
×
973
        if !ok {
×
974
                // nothing to delete
×
975
                return nil
976
        }
977
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
978
        if err != nil {
1✔
979
                return err
1✔
980
        }
×
981
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
×
982
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
×
983
        for _, src := range sources {
1✔
984
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
985
                        return err
×
986
                }
×
987
        }
1✔
988
        for _, src := range sources {
1✔
989
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
2✔
990
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
1✔
991
                }
×
992
        }
×
993
        // Only update desired storage class once garbage collection went through
994
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
2✔
995
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
996
        if err != nil {
×
997
                return err
×
998
        }
999

1000
        return nil
1✔
1001
}
1✔
1002

1✔
1003
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
×
1004
        switch format {
×
1005
        case cdiv1.DataImportCronSourceFormatPvc:
1006
                return nil
1✔
1007
        case cdiv1.DataImportCronSourceFormatSnapshot:
1008
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1009
        default:
1✔
1010
                return fmt.Errorf("unknown source format for snapshot")
1✔
1011
        }
1✔
1012
}
1✔
1013

1✔
1014
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
1015
        if pvc == nil {
×
1016
                return nil
×
1017
        }
1018
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
1019
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1020
                return nil
1✔
1021
        }
1✔
1022
        storageProfile := &cdiv1.StorageProfile{}
×
1023
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
×
1024
                return err
2✔
1025
        }
1✔
1026

1✔
1027
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
1028
                ObjectMeta: metav1.ObjectMeta{
1✔
1029
                        Name:      pvc.Name,
1✔
UNCOV
1030
                        Namespace: dataImportCron.Namespace,
×
1031
                        Labels: map[string]string{
×
1032
                                common.CDILabelKey:       common.CDILabelValue,
1033
                                common.CDIComponentLabel: "",
1✔
1034
                        },
1✔
1035
                },
1✔
1036
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
1037
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
1038
                                PersistentVolumeClaimName: &pvc.Name,
1✔
1039
                        },
1✔
1040
                },
1✔
1041
        }
1✔
1042
        // Select VolumeSnapshotClass for boot source snapshot
1✔
1043
        snapshotClassName, err := r.getSnapshotClassForDataImportCron(pvc, storageProfile)
1✔
1044
        if err != nil {
1✔
1045
                return err
1✔
1046
        }
1✔
1047
        if snapshotClassName != "" {
1✔
1048
                desiredSnapshot.Spec.VolumeSnapshotClassName = &snapshotClassName
1✔
1049
        }
1✔
1050
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
UNCOV
1051
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
×
UNCOV
1052

×
1053
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
2✔
1054
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
1✔
1055
                if !k8serrors.IsNotFound(err) {
1✔
1056
                        return err
1✔
1057
                }
1✔
1058
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1059
                if pvc.Spec.VolumeMode != nil {
1✔
1060
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
2✔
1061
                }
1✔
1062
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
×
1063
                        return err
×
1064
                }
1✔
1065
        } else {
1✔
1066
                if cc.IsSnapshotReady(currentSnapshot) {
1✔
1067
                        // Clean up DV/PVC as they are not needed anymore
2✔
1068
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
1069
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
1070
                                return err
2✔
1071
                        }
1✔
1072
                }
1✔
1073
        }
1✔
1074

×
1075
        return nil
×
1076
}
1✔
1077

2✔
1078
func (r *DataImportCronReconciler) handleSnapshotClassChange(ctx context.Context, snapshot *snapshotv1.VolumeSnapshot, storageClassName string) (bool, error) {
1✔
1079
        sp := &cdiv1.StorageProfile{}
1✔
1080
        if err := r.client.Get(ctx, types.NamespacedName{Name: storageClassName}, sp); err != nil {
1✔
NEW
1081
                return false, client.IgnoreNotFound(err)
×
NEW
1082
        }
×
1083

1084
        desiredVSC, err := r.getSnapshotClassForDataImportCron(nil, sp)
1085
        if err != nil {
1086
                return false, err
1✔
1087
        }
1088
        actualVSC := ""
1089
        if snapshot.Spec.VolumeSnapshotClassName != nil {
1✔
1090
                actualVSC = *snapshot.Spec.VolumeSnapshotClassName
1✔
1091
        }
1✔
NEW
1092
        if desiredVSC == "" || actualVSC == desiredVSC {
×
NEW
1093
                return false, nil
×
1094
        }
1095

1✔
1096
        r.log.Info("Snapshot class changed, deleting", "name", snapshot.Name, "from", actualVSC, "to", desiredVSC)
1✔
NEW
1097
        if err := r.client.Delete(ctx, snapshot); err != nil {
×
NEW
1098
                return false, client.IgnoreNotFound(err)
×
1099
        }
1✔
1100
        return true, nil
1✔
NEW
1101
}
×
NEW
1102

×
1103
// getSnapshotClassForDataImportCron returns the VolumeSnapshotClass name to use for DataImportCron snapshots.
2✔
1104
// Returns empty string if no snapshot class is found.
1✔
1105
func (r *DataImportCronReconciler) getSnapshotClassForDataImportCron(pvc *corev1.PersistentVolumeClaim, storageProfile *cdiv1.StorageProfile) (string, error) {
1✔
1106
        if vscName := storageProfile.Annotations[cc.AnnSnapshotClassForDataImportCron]; vscName != "" {
NEW
1107
                return vscName, nil
×
NEW
1108
        }
×
NEW
1109
        return cc.GetSnapshotClassForSmartClone(pvc, &storageProfile.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
×
NEW
1110
}
×
NEW
1111

×
1112
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1113
        dataImportCron.Status.SourceFormat = &format
1114

1115
        switch format {
1116
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1117
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
2✔
1118
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1119
                if snapshot == nil {
1✔
1120
                        // Snapshot create/update will trigger reconcile
1✔
1121
                        return nil
1122
                }
1123
                if cc.IsSnapshotReady(snapshot) {
1✔
1124
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1125
                } else {
1✔
1126
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
1127
                }
1✔
1128
        default:
1✔
1129
                return fmt.Errorf("unknown source format for snapshot")
1✔
1130
        }
2✔
1131

1✔
1132
        return nil
1✔
1133
}
1✔
1134

2✔
1135
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
1136
        format := cdiv1.DataImportCronSourceFormatPvc
2✔
1137
        if desiredStorageClass == nil {
1✔
1138
                return format, nil
1✔
1139
        }
×
1140

×
1141
        storageProfile := &cdiv1.StorageProfile{}
1142
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1143
                return format, err
1✔
1144
        }
1145
        if storageProfile.Status.DataImportCronSourceFormat != nil {
1146
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
1147
        }
1✔
1148

2✔
1149
        return format, nil
1✔
1150
}
1✔
1151

1152
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
1153
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
1154
                return nil
×
1155
        }
×
1156
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
2✔
1157
        if err != nil {
1✔
1158
                return err
1✔
1159
        }
1160

1✔
1161
        maxImports := defaultImportsToKeepPerCron
1162

1163
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
1✔
1164
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
1165
        }
×
1166

×
1167
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
1168
                return err
1✔
1169
        }
×
1170
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
×
1171
                return err
1172
        }
1✔
1173

1✔
1174
        return nil
2✔
1175
}
1✔
1176

1✔
1177
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1178
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
1179

×
1180
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
×
1181
                return err
1✔
1182
        }
×
1183
        if len(pvcList.Items) > maxImports {
×
1184
                sort.Slice(pvcList.Items, func(i, j int) bool {
1185
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
1186
                })
1187
                for _, pvc := range pvcList.Items[maxImports:] {
1188
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1189
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
1190
                                return err
1✔
1191
                        }
1✔
1192
                }
×
1193
        }
×
1194

2✔
1195
        dvList := &cdiv1.DataVolumeList{}
2✔
1196
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1197
                return err
1✔
1198
        }
2✔
1199

1✔
1200
        if len(dvList.Items) > maxImports {
1✔
1201
                for _, dv := range dvList.Items {
×
1202
                        pvc := &corev1.PersistentVolumeClaim{}
×
1203
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1204
                                return err
1205
                        }
1206

1✔
1207
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
1✔
1208
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
×
1209
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
×
1210
                                        return err
1211
                                }
2✔
1212
                        }
2✔
1213
                }
1✔
1214
        }
1✔
1215

×
1216
        return nil
×
1217
}
1218

2✔
1219
// deleteDvPvc deletes DV or PVC if DV was GCed
1✔
1220
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
1221
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
×
1222
        dv := &cdiv1.DataVolume{ObjectMeta: om}
×
1223
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
1224
                return err
1225
        }
1226
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1227
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
1228
                return err
1229
        }
1230
        return nil
1231
}
1✔
1232

1✔
1233
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
1234
        snapList := &snapshotv1.VolumeSnapshotList{}
2✔
1235

1✔
1236
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1237
                if meta.IsNoMatchError(err) {
1✔
1238
                        return nil
1✔
1239
                }
×
1240
                return err
×
1241
        }
1✔
1242
        if len(snapList.Items) > maxImports {
1243
                sort.Slice(snapList.Items, func(i, j int) bool {
1244
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
1✔
1245
                })
1✔
1246
                for _, snap := range snapList.Items[maxImports:] {
1✔
1247
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
1✔
1248
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
1249
                                return err
×
1250
                        }
×
1251
                }
×
1252
        }
1253

1✔
1254
        return nil
×
1255
}
×
1256

×
1257
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
×
1258
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
×
1259
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
×
1260

×
1261
        if err := r.deleteJobs(ctx, cron); err != nil {
×
1262
                return err
1263
        }
1264
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1265
        if err != nil {
1✔
1266
                return err
1267
        }
1268
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
1269
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
1270
                return err
1✔
1271
        }
1✔
1272
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
1273
                return err
×
1274
        }
×
1275
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
1276
                return err
1✔
1277
        }
×
1278
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
×
1279
                return err
1✔
1280
        }
1✔
1281
        return nil
×
1282
}
×
1283

1✔
1284
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
×
1285
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
×
1286
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
1287
        if err != nil {
×
1288
                return err
×
1289
        }
1✔
1290
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
×
1291
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
×
1292
                return err
1✔
1293
        }
1294
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1295
                return err
1✔
1296
        }
1✔
1297

1✔
1298
        return nil
1✔
1299
}
×
1300

×
1301
// NewDataImportCronController creates a new instance of the DataImportCron controller
1✔
1302
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
1✔
1303
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
1304
                Scheme: mgr.GetScheme(),
×
1305
                Mapper: mgr.GetRESTMapper(),
1✔
1306
        })
×
1307
        if err != nil {
×
1308
                return nil, err
1309
        }
1✔
1310
        reconciler := &DataImportCronReconciler{
1311
                client:          mgr.GetClient(),
1312
                uncachedClient:  uncachedClient,
1313
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1314
                scheme:          mgr.GetScheme(),
×
1315
                log:             log.WithName(dataImportControllerName),
×
1316
                image:           importerImage,
×
1317
                pullPolicy:      pullPolicy,
×
1318
                cdiNamespace:    util.GetNamespace(),
×
1319
                installerLabels: installerLabels,
×
1320
        }
×
1321
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1322
                MaxConcurrentReconciles: 3,
×
1323
                Reconciler:              reconciler,
×
1324
        })
×
1325
        if err != nil {
×
1326
                return nil, err
×
1327
        }
×
1328
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
1329
                return nil, err
×
1330
        }
×
1331
        log.Info("Initialized DataImportCron controller")
×
1332
        return dataImportCronController, nil
×
1333
}
×
1334

×
1335
func getCronName(obj client.Object) string {
×
1336
        return obj.GetLabels()[common.DataImportCronLabel]
×
1337
}
×
1338

×
1339
func getCronNs(obj client.Object) string {
×
1340
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
1341
}
×
1342

×
1343
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
1344
        if cronName := getCronName(obj); cronName != "" {
1345
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
1346
        }
×
1347
        return nil
×
1348
}
×
1349

1350
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1351
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1352
                return err
×
1353
        }
1354

×
1355
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1356
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1357
                // Otherwise we risk losing the storage profile event
×
1358
                var crons cdiv1.DataImportCronList
×
1359
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
1360
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
1361
                        return nil
×
1362
                }
×
1363
                // Storage profiles are 1:1 to storage classes
×
1364
                scName := obj.GetName()
×
1365
                var reqs []reconcile.Request
1366
                for _, cron := range crons.Items {
×
1367
                        dataVolume := cron.Spec.Template
×
1368
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1369
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1370
                        if err != nil || templateSc == nil {
×
1371
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1372
                                return reqs
×
1373
                        }
×
1374
                        if templateSc.Name == scName {
1375
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1376
                        }
×
1377
                }
×
1378
                return reqs
×
1379
        }
×
1380

×
1381
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1382
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1383
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1384
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1385
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1386
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1387
                },
×
1388
        )); err != nil {
1389
                return err
×
1390
        }
1391

1392
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1393
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1394
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1395
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1396
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1397
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1398
                },
1399
        )); err != nil {
×
1400
                return err
×
1401
        }
×
1402

1403
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1404
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1405
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1406
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1407
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1408
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1409
                },
1410
        )); err != nil {
×
1411
                return err
×
1412
        }
×
1413

1414
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1415
                return err
×
1416
        }
×
1417

×
1418
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1419
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1420
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
1421
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1422
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1423
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1424
                                return dicRelevantFieldsChanged(e.ObjectOld, e.ObjectNew)
1425
                        },
×
1426
                },
×
1427
        )); err != nil {
×
1428
                return err
1429
        }
×
1430

×
1431
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1432
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1433
        }
×
1434

×
1435
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1436
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1437
                predicate.TypedFuncs[*batchv1.CronJob]{
1438
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1439
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1440
                        },
×
1441
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
1442
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1443
                },
×
1444
        )); err != nil {
×
1445
                return err
1446
        }
×
1447

×
1448
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1449
                if meta.IsNoMatchError(err) {
×
1450
                        // Back out if there's no point to attempt watch
×
1451
                        return nil
×
1452
                }
×
1453
                if !cc.IsErrCacheNotStarted(err) {
×
1454
                        return err
1455
                }
×
1456
        }
×
1457
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1458
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
1459
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1460
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1461
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1462
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1463
                },
×
1464
        )); err != nil {
×
1465
                return err
×
1466
        }
×
1467

1468
        return nil
×
1469
}
×
1470

×
NEW
1471
func dicRelevantFieldsChanged(oldSp, newSp *cdiv1.StorageProfile) bool {
×
NEW
1472
        sourceFormatChanged := oldSp.Status.DataImportCronSourceFormat != newSp.Status.DataImportCronSourceFormat
×
NEW
1473
        rwoAnnotationChanged := oldSp.Annotations[cc.AnnUseReadWriteOnceForDataImportCron] != newSp.Annotations[cc.AnnUseReadWriteOnceForDataImportCron]
×
1474
        snapshotClassChanged := oldSp.Annotations[cc.AnnSnapshotClassForDataImportCron] != newSp.Annotations[cc.AnnSnapshotClassForDataImportCron]
NEW
1475
        return sourceFormatChanged || rwoAnnotationChanged || snapshotClassChanged
×
NEW
1476
}
×
NEW
1477

×
1478
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1479
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1480
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
1481
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
1482
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1483
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1484
                                log.Info("Update", "sc", obj.GetName(),
×
UNCOV
1485
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1486
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
UNCOV
1487
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
×
1488
                                if err != nil {
1489
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
1490
                                }
×
1491
                                return reqs
×
1492
                        },
×
1493
                ),
×
1494
                predicate.TypedFuncs[*storagev1.StorageClass]{
×
1495
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1496
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1497
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1498
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1499
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1500
                        },
×
1501
                },
×
1502
        )); err != nil {
×
1503
                return err
1504
        }
1505

1506
        return nil
×
1507
}
×
1508

×
1509
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1510
        dicList := &cdiv1.DataImportCronList{}
×
1511
        if err := c.List(ctx, dicList); err != nil {
×
1512
                return nil, err
1513
        }
×
1514
        reqs := []reconcile.Request{}
×
1515
        for _, dic := range dicList.Items {
×
1516
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
1517
                        continue
×
1518
                }
1519

1520
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1521
        }
×
1522

×
1523
        return reqs, nil
×
1524
}
×
1525

×
1526
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
×
1527
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
×
1528
                return false, nil
×
1529
        }
1530

1531
        sc := pvc.Spec.StorageClassName
×
1532
        if sc == nil || *sc == desiredStorageClass {
1533
                return false, nil
1534
        }
×
1535

1536
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1537
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
1538
                return false, err
2✔
1539
        }
1✔
1540

1✔
1541
        return true, nil
1542
}
1✔
1543

2✔
1544
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1545
        cronJob := &batchv1.CronJob{}
1✔
1546
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1547
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
1✔
1548
                return false, cc.IgnoreNotFound(err)
1✔
1549
        }
×
1550

×
1551
        cronJobCopy := cronJob.DeepCopy()
1552
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1553
                return false, err
1554
        }
1555

1✔
1556
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
1✔
1557
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1558
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
2✔
1559
                        return false, cc.IgnoreNotFound(err)
1✔
1560
                }
1✔
1561
        }
1562
        return true, nil
1✔
1563
}
1✔
1564

×
1565
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
×
1566
        cronJob := &batchv1.CronJob{
1567
                ObjectMeta: metav1.ObjectMeta{
2✔
1568
                        Name:      GetCronJobName(cron),
1✔
1569
                        Namespace: r.cdiNamespace,
1✔
1570
                },
×
1571
        }
×
1572
        if err := r.initCronJob(cron, cronJob); err != nil {
1573
                return nil, err
1✔
1574
        }
1575
        return cronJob, nil
1576
}
1✔
1577

1✔
1578
// InitPollerPod inits poller Pod
1✔
1579
func InitPollerPod(c client.Client, cron *cdiv1.DataImportCron, pod *corev1.PodTemplateSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1580
        regSource, err := getCronRegistrySource(cron)
1✔
1581
        if err != nil {
1✔
1582
                return err
1✔
1583
        }
1✔
1584
        if regSource.URL == nil {
×
1585
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1586
        }
1✔
1587
        cdiConfig := &cdiv1.CDIConfig{}
1588
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1589
                return err
1590
        }
1✔
1591
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1592
        if err != nil {
1✔
1593
                return err
×
1594
        }
×
1595
        container := corev1.Container{
1✔
1596
                Name:  "cdi-source-update-poller",
×
1597
                Image: image,
×
1598
                Command: []string{
1✔
1599
                        "/usr/bin/cdi-source-update-poller",
1✔
1600
                        "-ns", cron.Namespace,
×
1601
                        "-cron", cron.Name,
×
1602
                        "-url", *regSource.URL,
1✔
1603
                },
1✔
1604
                ImagePullPolicy:          pullPolicy,
×
1605
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
×
1606
                TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
1607
        }
1✔
1608

1✔
1609
        var volumes []corev1.Volume
1✔
1610
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1611
        if hasCertConfigMap {
1✔
1612
                vm := corev1.VolumeMount{
1✔
1613
                        Name:      CertVolName,
1✔
1614
                        MountPath: common.ImporterCertDir,
1✔
1615
                }
1✔
1616
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1617
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
1✔
1618
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
1✔
1619
        }
1✔
1620

1✔
1621
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
1✔
1622
                vm := corev1.VolumeMount{
1✔
1623
                        Name:      ProxyCertVolName,
×
1624
                        MountPath: common.ImporterProxyCertDir,
×
1625
                }
×
1626
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1627
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
×
1628
        }
×
1629

×
1630
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
×
1631
                container.Env = append(container.Env,
1632
                        corev1.EnvVar{
2✔
1633
                                Name: common.ImporterAccessKeyID,
1✔
1634
                                ValueFrom: &corev1.EnvVarSource{
1✔
1635
                                        SecretKeyRef: &corev1.SecretKeySelector{
1✔
1636
                                                LocalObjectReference: corev1.LocalObjectReference{
1✔
1637
                                                        Name: *regSource.SecretRef,
1✔
1638
                                                },
1✔
1639
                                                Key: common.KeyAccess,
1✔
1640
                                        },
1641
                                },
1✔
1642
                        },
×
1643
                        corev1.EnvVar{
×
1644
                                Name: common.ImporterSecretKey,
×
1645
                                ValueFrom: &corev1.EnvVarSource{
×
1646
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1647
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1648
                                                        Name: *regSource.SecretRef,
×
1649
                                                },
×
1650
                                                Key: common.KeySecret,
×
1651
                                        },
×
1652
                                },
×
1653
                        },
×
1654
                )
×
1655
        }
×
1656

×
1657
        addEnvVar := func(varName, value string) {
×
1658
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
×
1659
        }
×
1660

×
1661
        if insecureTLS {
×
1662
                addEnvVar(common.InsecureTLSVar, "true")
×
1663
        }
×
1664

×
1665
        addEnvVarFromImportProxyConfig := func(varName string) {
×
1666
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
×
1667
                        addEnvVar(varName, value)
1668
                }
2✔
1669
        }
1✔
1670

1✔
1671
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1672
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1673
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
×
1674

×
1675
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1676
        if err != nil {
2✔
1677
                return err
2✔
1678
        }
1✔
1679
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1680
        if err != nil {
1681
                return err
1682
        }
1✔
1683

1✔
1684
        podSpec := &pod.Spec
1✔
1685

1✔
1686
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1687
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1688
        podSpec.Containers = []corev1.Container{container}
×
1689
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
×
1690
        podSpec.Volumes = volumes
1✔
1691
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1692
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
×
1693
        podSpec.Tolerations = workloadNodePlacement.Tolerations
×
1694
        podSpec.Affinity = workloadNodePlacement.Affinity
1695

1✔
1696
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1697
        // No need for specifid uid/fsgroup here since this doesn't write or use qemu
1✔
1698
        if podSpec.SecurityContext != nil {
1✔
1699
                podSpec.SecurityContext.FSGroup = nil
1✔
1700
        }
1✔
1701
        if podSpec.Containers[0].SecurityContext != nil {
1✔
1702
                podSpec.Containers[0].SecurityContext.RunAsUser = nil
1✔
1703
        }
1✔
1704

1✔
1705
        if pod.Labels == nil {
1✔
1706
                pod.Labels = map[string]string{}
1✔
1707
        }
1✔
1708
        pod.Labels[common.DataImportCronPollerLabel] = ""
1✔
1709

2✔
1710
        return nil
1✔
1711
}
1✔
1712

2✔
1713
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1714
        cronJobSpec := &cronJob.Spec
1✔
1715
        cronJobSpec.Schedule = cron.Spec.Schedule
1716
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
2✔
1717
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1718
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1719

1✔
1720
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1721
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1722
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1723
        cc.AddAnnotation(&jobSpec.Template, secv1.RequiredSCCAnnotation, common.RestrictedSCCName)
1724

1✔
1725
        pod := &jobSpec.Template
1✔
1726
        if err := InitPollerPod(r.client, cron, pod, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1727
                return err
1✔
1728
        }
1✔
1729
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1730
                return err
1✔
1731
        }
1✔
1732
        return nil
1✔
1733
}
1✔
1734

1✔
1735
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1736
        job := &batchv1.Job{
1✔
1737
                ObjectMeta: metav1.ObjectMeta{
1✔
1738
                        Name:      GetInitialJobName(cron),
×
1739
                        Namespace: cronJob.Namespace,
×
1740
                },
1✔
1741
                Spec: cronJob.Spec.JobTemplate.Spec,
×
1742
        }
×
1743
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1744
                return nil, err
1745
        }
1746
        return job, nil
1✔
1747
}
1✔
1748

1✔
1749
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1750
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1751
                return err
1✔
1752
        }
1✔
1753
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1754
        labels := obj.GetLabels()
1✔
1755
        labels[common.DataImportCronNsLabel] = cron.Namespace
×
1756
        labels[common.DataImportCronLabel] = cron.Name
×
1757
        obj.SetLabels(labels)
1✔
1758
        return nil
1759
}
1760

1✔
1761
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string, storageProfile *cdiv1.StorageProfile) *cdiv1.DataVolume {
1✔
1762
        dv := cron.Spec.Template.DeepCopy()
×
1763
        if isCronRegistrySource(cron) {
×
1764
                var digestedURL string
1✔
1765
                if isURLSource(cron) {
1✔
1766
                        digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1767
                } else if isImageStreamSource(cron) {
1✔
1768
                        // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1769
                        digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1770
                        dv.Spec.Source.Registry.ImageStream = nil
1771
                }
1772
                dv.Spec.Source.Registry.URL = &digestedURL
1✔
1773
        }
1✔
1774
        dv.Name = dataVolumeName
2✔
1775
        dv.Namespace = cron.Namespace
1✔
1776
        r.setDataImportCronResourceLabels(cron, dv)
2✔
1777
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1778
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
3✔
1779
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1780

1✔
1781
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
1✔
1782
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1783
        }
1✔
1784

1785
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1786

1✔
1787
        // Apply RWO access mode as default for DataImportCron (from StorageProfile annotation)
1✔
1788
        // Only applies if the DV doesn't already have AccessModes configured
1✔
1789
        if storageProfile != nil && storageProfile.Annotations[cc.AnnUseReadWriteOnceForDataImportCron] == "true" {
1✔
1790
                if dv.Spec.Storage != nil && len(dv.Spec.Storage.AccessModes) == 0 {
1✔
1791
                        dv.Spec.Storage.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}
1✔
1792
                }
2✔
1793
        }
1✔
1794

1✔
1795
        return dv
1796
}
1✔
1797

1✔
1798
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1799
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1800
        cc.CopyAllowedLabels(cron.GetLabels(), obj, true)
2✔
1801
        labels := obj.GetLabels()
2✔
1802
        labels[common.DataImportCronLabel] = cron.Name
1✔
1803
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
1✔
1804
                labels[common.DataImportCronCleanupLabel] = "true"
1805
        }
1806
        obj.SetLabels(labels)
1✔
1807
}
1808

1809
func untagDigestedDockerURL(dockerURL string) string {
1✔
1810
        if u, err := url.Parse(dockerURL); err == nil {
1✔
1811
                url := u.Host + u.Path
1✔
1812
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1813
                // Check for tag
1✔
1814
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1815
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
1✔
1816
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1817
                        }
1✔
1818
                }
1819
        }
1820
        return dockerURL
1✔
1821
}
2✔
1822

1✔
1823
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1824
        if val := cron.Labels[ann]; val != "" {
1✔
1825
                cc.AddLabel(dv, ann, val)
2✔
1826
        }
2✔
1827
}
1✔
1828

1✔
1829
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1830
        if val := cron.Annotations[ann]; val != "" {
1831
                cc.AddAnnotation(dv, ann, val)
1✔
1832
        }
1833
}
1834

1✔
1835
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
2✔
1836
        dataSource := &cdiv1.DataSource{
1✔
1837
                ObjectMeta: metav1.ObjectMeta{
1✔
1838
                        Name:      cron.Spec.ManagedDataSource,
1839
                        Namespace: cron.Namespace,
1840
                },
1✔
1841
        }
1✔
1842
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
×
1843
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
×
1844
        cc.CopyAllowedLabels(cron.GetLabels(), dataSource, true)
1845
        return dataSource
1846
}
1✔
1847

1✔
1848
// Create DataVolume name based on the DataSource name + prefix of the digest
1✔
1849
func createDvName(prefix, digest string) (string, error) {
1✔
1850
        digestPrefix := ""
1✔
1851
        if strings.HasPrefix(digest, digestSha256Prefix) {
1✔
1852
                digestPrefix = digestSha256Prefix
1✔
1853
        } else if strings.HasPrefix(digest, digestUIDPrefix) {
1✔
1854
                digestPrefix = digestUIDPrefix
1✔
1855
        } else {
1✔
1856
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1857
        }
1✔
1858
        fromIdx := len(digestPrefix)
1859
        toIdx := fromIdx + digestDvNameSuffixLength
1860
        if len(digest) < toIdx {
1✔
1861
                return "", errors.Errorf("Digest is too short")
1✔
1862
        }
2✔
1863
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1864
}
3✔
1865

1✔
1866
// GetCronJobName get CronJob name based on cron name and UID
2✔
1867
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1868
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1869
}
1✔
1870

1✔
1871
// GetInitialJobName get initial job name based on cron name and UID
2✔
1872
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1873
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1874
}
1✔
1875

1876
func getPollerPodName(cron *cdiv1.DataImportCron) string {
1877
        return naming.GetResourceName("poller-"+cron.Name, string(cron.UID)[:8])
1878
}
1✔
1879

1✔
1880
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1881
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1882
}
1883

1✔
1884
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1885
        dv := &cron.Spec.Template
1✔
1886

1887
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1888
                return explicitVolumeMode, nil
1✔
1889
        }
1✔
1890

1891
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1892
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1893
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1894
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1895
                        AccessModes:      accessModes,
1✔
1896
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1897
                        Resources: corev1.VolumeResourceRequirements{
1✔
1898
                                Requests: corev1.ResourceList{
1✔
1899
                                        // Doesn't matter
×
1900
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
×
1901
                                },
1902
                        },
1✔
1903
                },
1✔
1904
        }
1✔
1905
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1906
                return nil, err
1✔
1907
        }
1✔
1908

1✔
1909
        return inferredPvc.Spec.VolumeMode, nil
1✔
1910
}
1✔
1911

1✔
1912
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1✔
1913
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1914
        if dv.Spec.PVC != nil {
1✔
1915
                return dv.Spec.PVC.VolumeMode
1✔
1916
        }
1✔
1917

×
1918
        if dv.Spec.Storage != nil {
×
1919
                return dv.Spec.Storage.VolumeMode
1920
        }
1✔
1921

1922
        return nil
1923
}
1924

1✔
1925
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1✔
1926
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
×
1927
        if dv.Spec.PVC != nil {
×
1928
                return dv.Spec.PVC.AccessModes
1929
        }
2✔
1930

1✔
1931
        if dv.Spec.Storage != nil {
1✔
1932
                return dv.Spec.Storage.AccessModes
1933
        }
×
1934

1935
        return nil
1936
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc