• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #6091

01 Jul 2026 07:39AM UTC coverage: 49.73% (-0.05%) from 49.781%
#6091

Pull #4074

travis-ci

noamasu
Fix DataImportCron reconciliation when first default StorageClass is set

The default SC change condition only handled changes from one SC to another, but did not handle the first default SC being set.

To get the DIC-created DV with the DIC-special RWO preference, it must be deleted (as DV is immutable) and recreated with the right settings from the new default StorageClass.

Signed-off-by: Noam Assouline <nassouli@redhat.com>
Pull Request #4074: Recreate DIC import DV when first default SC is set

30 of 32 new or added lines in 1 file covered. (93.75%)

21 existing lines in 4 files now uncovered.

15125 of 30414 relevant lines covered (49.73%)

0.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.98
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        secv1 "github.com/openshift/api/security/v1"
33
        "github.com/pkg/errors"
34
        cronexpr "github.com/robfig/cron/v3"
35
        authorizationv1 "k8s.io/api/authorization/v1"
36
        batchv1 "k8s.io/api/batch/v1"
37
        corev1 "k8s.io/api/core/v1"
38
        storagev1 "k8s.io/api/storage/v1"
39
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
40
        "k8s.io/apimachinery/pkg/api/meta"
41
        "k8s.io/apimachinery/pkg/api/resource"
42
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
43
        "k8s.io/apimachinery/pkg/labels"
44
        "k8s.io/apimachinery/pkg/runtime"
45
        "k8s.io/apimachinery/pkg/types"
46
        "k8s.io/client-go/tools/record"
47
        openapicommon "k8s.io/kube-openapi/pkg/common"
48
        "k8s.io/utils/ptr"
49
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
50
        "kubevirt.io/containerized-data-importer/pkg/common"
51
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
52
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
53
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
54
        "kubevirt.io/containerized-data-importer/pkg/operator"
55
        "kubevirt.io/containerized-data-importer/pkg/util"
56
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
57
        "sigs.k8s.io/controller-runtime/pkg/client"
58
        "sigs.k8s.io/controller-runtime/pkg/controller"
59
        "sigs.k8s.io/controller-runtime/pkg/event"
60
        "sigs.k8s.io/controller-runtime/pkg/handler"
61
        "sigs.k8s.io/controller-runtime/pkg/manager"
62
        "sigs.k8s.io/controller-runtime/pkg/predicate"
63
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
64
        "sigs.k8s.io/controller-runtime/pkg/source"
65
)
66

67
const (
68
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
69
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
70
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
71
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
72
)
73

74
// DataImportCronReconciler members
75
type DataImportCronReconciler struct {
76
        client          client.Client
77
        uncachedClient  client.Client
78
        recorder        record.EventRecorder
79
        scheme          *runtime.Scheme
80
        log             logr.Logger
81
        image           string
82
        pullPolicy      string
83
        cdiNamespace    string
84
        installerLabels map[string]string
85
}
86

87
const (
88
        // AnnSourceDesiredDigest is the digest of the pending updated image
89
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
90
        // AnnImageStreamDockerRef is the ImageStream Docker reference
91
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
92
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
93
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
94
        // AnnLastCronTime is the cron last execution time stamp
95
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
96
        // AnnLastUseTime is the PVC last use time stamp
97
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
98
        // AnnStorageClass is the cron DV's storage class
99
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
100

101
        dataImportControllerName    = "dataimportcron-controller"
102
        digestSha256Prefix          = "sha256:"
103
        digestUIDPrefix             = "uid:"
104
        digestDvNameSuffixLength    = 12
105
        cronJobUIDSuffixLength      = 8
106
        defaultImportsToKeepPerCron = 3
107
)
108

109
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
110

111
// Reconcile loop for DataImportCronReconciler
112
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
113
        dataImportCron := &cdiv1.DataImportCron{}
1✔
114
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
115
                return reconcile.Result{}, err
1✔
116
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
117
                err := r.cleanup(ctx, req.NamespacedName)
1✔
118
                return reconcile.Result{}, err
1✔
119
        }
1✔
120
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
121
        if !shouldReconcile || err != nil {
1✔
122
                return reconcile.Result{}, err
×
123
        }
×
124

125
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
126
                return reconcile.Result{}, err
×
127
        }
×
128

129
        return r.update(ctx, dataImportCron)
1✔
130
}
131

132
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
133
        dataSource := &cdiv1.DataSource{}
1✔
134
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
135
                if k8serrors.IsNotFound(err) {
2✔
136
                        return true, nil
1✔
137
                }
1✔
138
                return false, err
×
139
        }
140
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
141
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
142
                return true, nil
1✔
143
        }
1✔
144
        otherCron := &cdiv1.DataImportCron{}
1✔
145
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
146
                if k8serrors.IsNotFound(err) {
2✔
147
                        return true, nil
1✔
148
                }
1✔
149
                return false, err
×
150
        }
151
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
152
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
153
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
154
                r.log.V(3).Info(msg)
1✔
155
                return false, nil
1✔
156
        }
1✔
157
        return true, nil
×
158
}
159

160
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
161
        if dataImportCron.Spec.Schedule == "" {
2✔
162
                return nil
1✔
163
        }
1✔
164
        if isControllerPolledSource(dataImportCron) {
2✔
165
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
166
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
167
                }
1✔
168
                return nil
1✔
169
        }
170
        if !isURLSource(dataImportCron) {
1✔
171
                return nil
×
172
        }
×
173
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
174
        if exists || err != nil {
2✔
175
                return err
1✔
176
        }
1✔
177
        cronJob, err := r.newCronJob(dataImportCron)
1✔
178
        if err != nil {
1✔
179
                return err
×
180
        }
×
181
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
182
                return err
×
183
        }
×
184
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
185
        if err != nil {
1✔
186
                return err
×
187
        }
×
188
        if err := r.client.Create(ctx, job); err != nil {
1✔
189
                return err
×
190
        }
×
191
        return nil
1✔
192
}
193

194
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
195
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
196
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
197
        }
×
198
        imageStream := &imagev1.ImageStream{}
1✔
199
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
200
        if err != nil {
2✔
201
                return nil, "", err
1✔
202
        }
1✔
203
        imageStreamNamespacedName := types.NamespacedName{
1✔
204
                Namespace: imageStreamNamespace,
1✔
205
                Name:      name,
1✔
206
        }
1✔
207
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
208
                return nil, "", err
1✔
209
        }
1✔
210
        return imageStream, tag, nil
1✔
211
}
212

213
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
214
        if imageStream == nil {
1✔
215
                return "", "", errors.Errorf("No ImageStream")
×
216
        }
×
217
        tags := imageStream.Status.Tags
1✔
218
        if len(tags) == 0 {
1✔
219
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
220
        }
×
221

222
        tagIdx := 0
1✔
223
        if len(imageStreamTag) > 0 {
2✔
224
                tagIdx = -1
1✔
225
                for i, tag := range tags {
2✔
226
                        if tag.Tag == imageStreamTag {
2✔
227
                                tagIdx = i
1✔
228
                                break
1✔
229
                        }
230
                }
231
        }
232
        if tagIdx == -1 {
2✔
233
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
234
        }
1✔
235

236
        if len(tags[tagIdx].Items) == 0 {
2✔
237
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
238
        }
1✔
239
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
240
}
241

242
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
243
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
244
                return imageStreamName, "", nil
1✔
245
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
246
                return subs[0], subs[1], nil
1✔
247
        }
1✔
248
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
249
}
250

251
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
252
        nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]
1✔
253
        if nextTimeStr == "" {
1✔
254
                return r.setNextCronTime(dataImportCron)
×
255
        }
×
256
        nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
257
        if err != nil {
1✔
258
                return reconcile.Result{}, err
×
259
        }
×
260
        if nextTime.After(time.Now()) {
2✔
261
                return r.setNextCronTime(dataImportCron)
1✔
262
        }
1✔
263
        switch {
1✔
264
        case isImageStreamSource(dataImportCron):
1✔
265
                if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
266
                        return reconcile.Result{}, err
1✔
267
                }
1✔
268
        case isPvcSource(dataImportCron):
1✔
269
                if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
2✔
270
                        return reconcile.Result{}, err
1✔
271
                }
1✔
272
        case isNodePull(dataImportCron):
1✔
273
                if done, err := r.updateContainerImageDesiredDigest(ctx, dataImportCron); !done {
2✔
274
                        return reconcile.Result{RequeueAfter: 3 * time.Second}, err
1✔
275
                } else if err != nil {
2✔
276
                        return reconcile.Result{}, err
×
277
                }
×
278
        }
279

280
        return r.setNextCronTime(dataImportCron)
1✔
281
}
282

283
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
284
        now := time.Now()
1✔
285
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
286
        if err != nil {
1✔
287
                return reconcile.Result{}, err
×
288
        }
×
289
        nextTime := expr.Next(now)
1✔
290
        requeueAfter := nextTime.Sub(now)
1✔
291
        res := reconcile.Result{RequeueAfter: requeueAfter}
1✔
292
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
293
        return res, err
1✔
294
}
295

296
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
297
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
298
        return err == nil && regSource.ImageStream != nil
1✔
299
}
1✔
300

301
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
302
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
303
        return err == nil && regSource.URL != nil
1✔
304
}
1✔
305

306
func isNodePull(cron *cdiv1.DataImportCron) bool {
1✔
307
        regSource, err := getCronRegistrySource(cron)
1✔
308
        return err == nil && regSource != nil && regSource.PullMethod != nil &&
1✔
309
                *regSource.PullMethod == cdiv1.RegistryPullNode
1✔
310
}
1✔
311

312
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
313
        if !isCronRegistrySource(cron) {
2✔
314
                return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
1✔
315
        }
1✔
316
        return cron.Spec.Template.Spec.Source.Registry, nil
1✔
317
}
318

319
func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
1✔
320
        source := cron.Spec.Template.Spec.Source
1✔
321
        return source != nil && source.Registry != nil
1✔
322
}
1✔
323

324
func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
1✔
325
        if !isPvcSource(cron) {
1✔
326
                return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
×
327
        }
×
328
        return cron.Spec.Template.Spec.Source.PVC, nil
1✔
329
}
330

331
func isPvcSource(cron *cdiv1.DataImportCron) bool {
1✔
332
        source := cron.Spec.Template.Spec.Source
1✔
333
        return source != nil && source.PVC != nil
1✔
334
}
1✔
335

336
func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
1✔
337
        return isImageStreamSource(cron) || isPvcSource(cron) || isNodePull(cron)
1✔
338
}
1✔
339

340
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
341
        res := reconcile.Result{}
1✔
342

1✔
343
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
344
        if err != nil {
1✔
345
                return res, err
×
346
        }
×
347

348
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
349
        imports := dataImportCron.Status.CurrentImports
1✔
350
        importSucceeded := false
1✔
351

1✔
352
        dataVolume := dataImportCron.Spec.Template
1✔
353
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
354
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
355
        if err != nil {
1✔
356
                return res, err
×
357
        }
×
358
        if desiredStorageClass != nil {
2✔
359
                desiredSc := desiredStorageClass.Name
1✔
360
                outdated, err := r.importOutdated(ctx, dataImportCron, dv, pvc, desiredStorageClass)
1✔
361
                if err != nil {
1✔
UNCOV
362
                        return res, err
×
UNCOV
363
                }
×
364
                if outdated {
2✔
365
                        if err := r.recreateImport(ctx, dataImportCron, desiredSc); err != nil {
1✔
366
                                return res, err
×
367
                        }
×
368
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
369
                }
370
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredSc)
1✔
371
        }
372
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
373
        if err != nil {
1✔
374
                return res, err
×
375
        }
×
376
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
377
        if err != nil {
1✔
378
                return res, err
×
379
        }
×
380

381
        handlePopulatedPvc := func() error {
2✔
382
                if pvc != nil {
2✔
383
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
384
                                return err
×
385
                        }
×
386
                }
387
                importSucceeded = true
1✔
388
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
389
                        return err
×
390
                }
×
391

392
                return nil
1✔
393
        }
394

395
        switch {
1✔
396
        case dv != nil:
1✔
397
                switch dv.Status.Phase {
1✔
398
                case cdiv1.Succeeded:
1✔
399
                        if err := handlePopulatedPvc(); err != nil {
1✔
400
                                return res, err
×
401
                        }
×
402
                case cdiv1.ImportScheduled:
1✔
403
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
404
                case cdiv1.ImportInProgress:
1✔
405
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
406
                default:
1✔
407
                        dvPhase := string(dv.Status.Phase)
1✔
408
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
409
                }
410
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
411
                if err := handlePopulatedPvc(); err != nil {
1✔
412
                        return res, err
×
413
                }
×
414
        case snapshot != nil:
1✔
415
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
416
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
417
                                return res, err
×
418
                        }
×
419
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
420
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
421
                }
422
                // Check if snapshot class changed
423
                if desiredStorageClass != nil {
2✔
424
                        changed, err := r.handleSnapshotClassChange(ctx, snapshot, desiredStorageClass.Name)
1✔
425
                        if err != nil {
1✔
426
                                return res, err
×
427
                        }
×
428
                        if changed {
1✔
429
                                return reconcile.Result{RequeueAfter: time.Second}, nil
×
430
                        }
×
431
                }
432
                // Below k8s 1.29 there's no way to know the source volume mode
433
                // Let's at least expose this info on our own snapshots
434
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
435
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
436
                        if err != nil {
1✔
437
                                return res, err
×
438
                        }
×
439
                        if volMode != nil {
2✔
440
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
441
                        }
1✔
442
                }
443
                if _, ok := snapshot.Annotations[cc.AnnAdvisedRestoreSize]; !ok {
2✔
444
                        size := inferAdvisedRestoreSizeForSnapshot(&dataImportCron.Spec.Template, snapshot, nil)
1✔
445
                        if size != nil {
2✔
446
                                cc.AddAnnotation(snapshot, cc.AnnAdvisedRestoreSize, size.String())
1✔
447
                        }
1✔
448
                }
449
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
450
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
451
                if err != nil {
2✔
452
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
453
                                return res, err
×
454
                        }
×
455
                } else {
1✔
456
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
457
                }
1✔
458
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
459
                        return res, err
×
460
                }
×
461
                importSucceeded = true
1✔
462
        default:
1✔
463
                if len(imports) > 0 {
2✔
464
                        imports = imports[1:]
1✔
465
                        dataImportCron.Status.CurrentImports = imports
1✔
466
                }
1✔
467
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
468
        }
469

470
        if importSucceeded {
2✔
471
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
472
                        return res, err
×
473
                }
×
474
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
475
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
476
                        return res, err
×
477
                }
×
478
        }
479

480
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
481
                return res, err
×
482
        }
×
483

484
        // Skip if schedule is disabled
485
        if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
486
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
487
                pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
1✔
488
                if err != nil {
2✔
489
                        return pollRes, err
1✔
490
                }
1✔
491
                res = pollRes
1✔
492
        }
493

494
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
495
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
496
        if digestUpdated {
2✔
497
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
498
                if dv != nil {
1✔
499
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
500
                                return res, err
×
501
                        }
×
502
                }
503
                if importSucceeded || len(imports) == 0 {
2✔
504
                        if err := r.createImportDataVolume(ctx, dataImportCron, desiredStorageClass); err != nil {
2✔
505
                                return res, err
1✔
506
                        }
1✔
507
                }
508
        } else if importSucceeded {
2✔
509
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
510
                        return res, err
×
511
                }
×
512
        } else if len(imports) > 0 {
2✔
513
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
514
        } else {
2✔
515
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
516
        }
1✔
517

518
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
519
                return res, err
×
520
        }
×
521

522
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
523
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
524
                        return res, err
×
525
                }
×
526
        }
527
        return res, nil
1✔
528
}
529

530
// Returns the current import DV if exists, and the last imported PVC
531
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
532
        imports := cron.Status.CurrentImports
1✔
533
        if len(imports) == 0 {
2✔
534
                return nil, nil, nil
1✔
535
        }
1✔
536

537
        dvName := imports[0].DataVolumeName
1✔
538
        dv := &cdiv1.DataVolume{}
1✔
539
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
540
                if !k8serrors.IsNotFound(err) {
1✔
541
                        return nil, nil, err
×
542
                }
×
543
                dv = nil
1✔
544
        }
545

546
        pvc := &corev1.PersistentVolumeClaim{}
1✔
547
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
548
                if !k8serrors.IsNotFound(err) {
1✔
549
                        return nil, nil, err
×
550
                }
×
551
                pvc = nil
1✔
552
        }
553
        return dv, pvc, nil
1✔
554
}
555

556
// Returns the current import DV if exists, and the last imported PVC
557
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
558
        imports := cron.Status.CurrentImports
1✔
559
        if len(imports) == 0 {
2✔
560
                return nil, nil
1✔
561
        }
1✔
562

563
        snapName := imports[0].DataVolumeName
1✔
564
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
565
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
566
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
567
                        return nil, err
×
568
                }
×
569
                return nil, nil
1✔
570
        }
571

572
        return snapshot, nil
1✔
573
}
574

575
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
576
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
577
        dataSource := &cdiv1.DataSource{}
1✔
578
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
579
                return nil, err
1✔
580
        }
1✔
581
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
582
                log := r.log.WithName("getCronManagedDataSource")
×
583
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
×
584
                return nil, ErrNotManagedByCron
×
585
        }
×
586
        return dataSource, nil
1✔
587
}
588

589
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
590
        objCopy := obj.DeepCopyObject()
1✔
591
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
592
        r.setDataImportCronResourceLabels(cron, obj)
1✔
593
        if !reflect.DeepEqual(obj, objCopy) {
2✔
594
                if err := r.client.Update(ctx, obj); err != nil {
1✔
595
                        return err
×
596
                }
×
597
        }
598
        return nil
1✔
599
}
600

601
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
602
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
603
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
604
                if cond.Status == corev1.ConditionFalse &&
×
605
                        (cond.Reason == common.GenericError || cond.Reason == ImagePullFailedReason) {
×
606
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
607
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
608
                        dv.Labels[common.DataImportCronLabel] = ""
×
609
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
610
                                return err
×
611
                        }
×
612
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
613
                                return err
×
614
                        }
×
615
                        cron.Status.CurrentImports = nil
×
616
                }
617
        }
618
        return nil
×
619
}
620

621
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
622
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
623
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
624
        if err != nil {
1✔
625
                return err
×
626
        }
×
627
        if regSource.ImageStream == nil {
1✔
628
                return nil
×
629
        }
×
630
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
631
        if err != nil {
2✔
632
                return err
1✔
633
        }
1✔
634
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
635
        if err != nil {
2✔
636
                return err
1✔
637
        }
1✔
638
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
639
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
640
                log.Info("Updating DataImportCron", "digest", digest)
1✔
641
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
642
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
643
        }
1✔
644
        return nil
1✔
645
}
646

647
func (r *DataImportCronReconciler) updateContainerImageDesiredDigest(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
648
        log := r.log.WithValues("name", cron.Name).WithValues("uid", cron.UID)
1✔
649
        podName := getPollerPodName(cron)
1✔
650
        ns := cron.Namespace
1✔
651
        nn := types.NamespacedName{Name: podName, Namespace: ns}
1✔
652
        pod := &corev1.Pod{}
1✔
653

1✔
654
        if err := r.client.Get(ctx, nn, pod); err == nil {
2✔
655
                digest, err := fetchContainerImageDigest(pod)
1✔
656
                if err != nil || digest == "" {
1✔
657
                        return false, err
×
658
                }
×
659
                cc.AddAnnotation(cron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
660
                if cron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
661
                        log.Info("Updating DataImportCron", "digest", digest)
1✔
662
                        cc.AddAnnotation(cron, AnnSourceDesiredDigest, digest)
1✔
663
                }
1✔
664
                return true, r.client.Delete(ctx, pod)
1✔
665
        } else if cc.IgnoreNotFound(err) != nil {
1✔
666
                return false, err
×
667
        }
×
668

669
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(ctx, r.client)
1✔
670
        if err != nil {
1✔
671
                return false, err
×
672
        }
×
673
        platform := cron.Spec.Template.Spec.Source.Registry.Platform
1✔
674
        if platform != nil && platform.Architecture != "" {
1✔
675
                if workloadNodePlacement.NodeSelector == nil {
×
676
                        workloadNodePlacement.NodeSelector = map[string]string{}
×
677
                }
×
678
                workloadNodePlacement.NodeSelector[corev1.LabelArchStable] = platform.Architecture
×
679
        }
680

681
        containerImage := strings.TrimPrefix(*cron.Spec.Template.Spec.Source.Registry.URL, "docker://")
1✔
682

1✔
683
        pod = &corev1.Pod{
1✔
684
                ObjectMeta: metav1.ObjectMeta{
1✔
685
                        Name:      podName,
1✔
686
                        Namespace: ns,
1✔
687
                        OwnerReferences: []metav1.OwnerReference{
1✔
688
                                {
1✔
689
                                        APIVersion:         cron.APIVersion,
1✔
690
                                        Kind:               cron.Kind,
1✔
691
                                        Name:               cron.Name,
1✔
692
                                        UID:                cron.UID,
1✔
693
                                        BlockOwnerDeletion: ptr.To[bool](true),
1✔
694
                                        Controller:         ptr.To[bool](true),
1✔
695
                                },
1✔
696
                        },
1✔
697
                },
1✔
698
                Spec: corev1.PodSpec{
1✔
699
                        TerminationGracePeriodSeconds: ptr.To[int64](0),
1✔
700
                        RestartPolicy:                 corev1.RestartPolicyNever,
1✔
701
                        NodeSelector:                  workloadNodePlacement.NodeSelector,
1✔
702
                        Tolerations:                   workloadNodePlacement.Tolerations,
1✔
703
                        Affinity:                      workloadNodePlacement.Affinity,
1✔
704
                        Volumes: []corev1.Volume{
1✔
705
                                {
1✔
706
                                        Name: "shared-volume",
1✔
707
                                        VolumeSource: corev1.VolumeSource{
1✔
708
                                                EmptyDir: &corev1.EmptyDirVolumeSource{},
1✔
709
                                        },
1✔
710
                                },
1✔
711
                        },
1✔
712
                        InitContainers: []corev1.Container{
1✔
713
                                {
1✔
714
                                        Name:                     "init",
1✔
715
                                        Image:                    r.image,
1✔
716
                                        ImagePullPolicy:          corev1.PullPolicy(r.pullPolicy),
1✔
717
                                        Command:                  []string{"sh", "-c", "cp /usr/bin/cdi-containerimage-server /shared/server"},
1✔
718
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
719
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
720
                                },
1✔
721
                        },
1✔
722
                        Containers: []corev1.Container{
1✔
723
                                {
1✔
724
                                        Name:                     "image-container",
1✔
725
                                        Image:                    containerImage,
1✔
726
                                        ImagePullPolicy:          corev1.PullAlways,
1✔
727
                                        Command:                  []string{"/shared/server", "-h"},
1✔
728
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
729
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
730
                                },
1✔
731
                        },
1✔
732
                },
1✔
733
        }
1✔
734

1✔
735
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
736
        if pod.Spec.SecurityContext != nil {
2✔
737
                pod.Spec.SecurityContext.FSGroup = nil
1✔
738
        }
1✔
739

740
        return false, r.client.Create(ctx, pod)
1✔
741
}
742

743
func fetchContainerImageDigest(pod *corev1.Pod) (string, error) {
1✔
744
        if len(pod.Status.ContainerStatuses) == 0 {
1✔
745
                return "", nil
×
746
        }
×
747

748
        status := pod.Status.ContainerStatuses[0]
1✔
749
        if status.State.Waiting != nil {
1✔
750
                reason := status.State.Waiting.Reason
×
751
                switch reason {
×
752
                case "ImagePullBackOff", "ErrImagePull", "InvalidImageName":
×
753
                        return "", errors.Errorf("%s %s: %s", common.ImagePullFailureText, status.Image, reason)
×
754
                }
755
                return "", nil
×
756
        }
757

758
        if status.State.Terminated == nil {
1✔
759
                return "", nil
×
760
        }
×
761

762
        imageID := status.ImageID
1✔
763
        if imageID == "" {
1✔
764
                return "", errors.Errorf("Container has no imageID")
×
765
        }
×
766
        idx := strings.Index(imageID, digestSha256Prefix)
1✔
767
        if idx < 0 {
1✔
768
                return "", errors.Errorf("Container image %s ID has no digest: %s", status.Image, imageID)
×
769
        }
×
770

771
        return imageID[idx:], nil
1✔
772
}
773

774
func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
775
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
776
        pvcSource, err := getCronPvcSource(dataImportCron)
1✔
777
        if err != nil {
1✔
778
                return err
×
779
        }
×
780
        ns := pvcSource.Namespace
1✔
781
        if ns == "" {
2✔
782
                ns = dataImportCron.Namespace
1✔
783
        }
1✔
784
        pvc := &corev1.PersistentVolumeClaim{}
1✔
785
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
2✔
786
                return err
1✔
787
        }
1✔
788
        digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
1✔
789
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
790
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
791
                log.Info("Updating DataImportCron", "digest", digest)
1✔
792
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
793
        }
1✔
794
        return nil
1✔
795
}
796

797
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
798
        log := r.log.WithName("updateDataSource")
1✔
799
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
800
        if err != nil {
2✔
801
                if k8serrors.IsNotFound(err) {
2✔
802
                        dataSource = r.newDataSource(dataImportCron)
1✔
803
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
804
                                return err
×
805
                        }
×
806
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
1✔
807
                } else if errors.Is(err, ErrNotManagedByCron) {
×
808
                        return nil
×
809
                } else {
×
810
                        return err
×
811
                }
×
812
        }
813
        dataSourceCopy := dataSource.DeepCopy()
1✔
814
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
815

1✔
816
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
817
        populateDataSource(format, dataSource, sourcePVC)
1✔
818

1✔
819
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
820
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
821
                        return err
×
822
                }
×
823
        }
824

825
        return nil
1✔
826
}
827

828
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
829
        if sourcePVC == nil {
2✔
830
                return
1✔
831
        }
1✔
832

833
        switch format {
1✔
834
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
835
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
836
                        PVC: sourcePVC,
1✔
837
                }
1✔
838
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
839
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
840
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
841
                                Namespace: sourcePVC.Namespace,
1✔
842
                                Name:      sourcePVC.Name,
1✔
843
                        },
1✔
844
                }
1✔
845
        }
846
}
847

848
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
849
        if dataImportCron.Status.CurrentImports == nil {
1✔
850
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
851
        }
×
852
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
853
                Namespace: dataImportCron.Namespace,
1✔
854
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
855
        }
1✔
856
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
857
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
858
                now := metav1.Now()
1✔
859
                dataImportCron.Status.LastImportTimestamp = &now
1✔
860
        }
1✔
861
        return nil
1✔
862
}
863

864
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
865
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
866
        if lastTimeStr == "" {
2✔
867
                return nil
1✔
868
        }
1✔
869
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
870
        if err != nil {
1✔
871
                return err
×
872
        }
×
873
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
874
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
875
        }
1✔
876
        return nil
1✔
877
}
878

879
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass *storagev1.StorageClass) error {
1✔
880
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
881
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
882
        if digest == "" {
1✔
883
                return nil
×
884
        }
×
885
        dvName, err := createDvName(dataSourceName, digest)
1✔
886
        if err != nil {
2✔
887
                return err
1✔
888
        }
1✔
889

890
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
891
        for _, src := range sources {
2✔
892
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
893
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
894
                                return err
×
895
                        }
×
896
                } else {
1✔
897
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
898
                                return err
×
899
                        }
×
900
                        // If source exists don't create DV
901
                        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
902
                        return nil
1✔
903
                }
904
        }
905

906
        storageProfile := &cdiv1.StorageProfile{}
1✔
907
        if desiredStorageClass != nil {
2✔
908
                if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
909
                        return err
×
910
                }
×
911
        }
912
        dv := r.newSourceDataVolume(dataImportCron, dvName, storageProfile)
1✔
913
        if allowed, err := r.authorizeCloneDataVolume(dataImportCron, dv); err != nil {
1✔
914
                return err
×
915
        } else if !allowed {
2✔
916
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse,
1✔
917
                        "Not authorized to create DataVolume", notAuthorized)
1✔
918
                return nil
1✔
919
        }
1✔
920
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
921
                return err
×
922
        }
×
923
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
924

1✔
925
        return nil
1✔
926
}
927

928
func (r *DataImportCronReconciler) authorizeCloneDataVolume(dataImportCron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) (bool, error) {
1✔
929
        if !isPvcSource(dataImportCron) {
2✔
930
                return true, nil
1✔
931
        }
1✔
932
        saName := "default"
1✔
933
        if dataImportCron.Spec.ServiceAccountName != nil {
1✔
934
                saName = *dataImportCron.Spec.ServiceAccountName
×
935
        }
×
936
        if resp, err := dv.AuthorizeSA(dv.Namespace, dv.Name, &authProxy{r.client}, dataImportCron.Namespace, saName); err != nil {
1✔
937
                return false, err
×
938
        } else if !resp.Allowed {
2✔
939
                r.log.Info("Not authorized to create DataVolume", "cron", dataImportCron.Name, "reason", resp.Reason)
1✔
940
                return false, nil
1✔
941
        }
1✔
942

943
        return true, nil
1✔
944
}
945

946
type authProxy struct {
947
        client client.Client
948
}
949

950
func (p *authProxy) CreateSar(sar *authorizationv1.SubjectAccessReview) (*authorizationv1.SubjectAccessReview, error) {
1✔
951
        if err := p.client.Create(context.TODO(), sar); err != nil {
1✔
952
                return nil, err
×
953
        }
×
954
        return sar, nil
1✔
955
}
956

957
func (p *authProxy) GetNamespace(name string) (*corev1.Namespace, error) {
1✔
958
        ns := &corev1.Namespace{}
1✔
959
        if err := p.client.Get(context.TODO(), types.NamespacedName{Name: name}, ns); err != nil {
1✔
960
                return nil, err
×
961
        }
×
962
        return ns, nil
1✔
963
}
964

965
func (p *authProxy) GetDataSource(namespace, name string) (*cdiv1.DataSource, error) {
×
966
        das := &cdiv1.DataSource{}
×
967
        if err := p.client.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, das); err != nil {
×
968
                return nil, err
×
969
        }
×
970
        return das, nil
×
971
}
972

973
func (r *DataImportCronReconciler) recreateImport(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
1✔
974
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
975
        if !ok {
1✔
976
                // nothing to delete
×
977
                return nil
×
978
        }
×
979
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
980
        if err != nil {
1✔
981
                return err
×
982
        }
×
983
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
1✔
984
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
1✔
985
        for _, src := range sources {
2✔
986
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
987
                        return err
×
988
                }
×
989
        }
990
        for _, src := range sources {
2✔
991
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
1✔
992
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
×
993
                }
×
994
        }
995
        // Only update desired storage class once garbage collection went through
996
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
1✔
997
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
998
        if err != nil {
1✔
999
                return err
×
1000
        }
×
1001

1002
        return nil
1✔
1003
}
1004

1005
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
1006
        switch format {
1✔
1007
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1008
                return nil
1✔
1009
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1010
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
1011
        default:
×
1012
                return fmt.Errorf("unknown source format for snapshot")
×
1013
        }
1014
}
1015

1016
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
1017
        if pvc == nil {
1✔
1018
                return nil
×
1019
        }
×
1020
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
1021
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
1022
                return nil
1✔
1023
        }
1✔
1024
        storageProfile := &cdiv1.StorageProfile{}
1✔
1025
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1026
                return err
×
1027
        }
×
1028

1029
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
1030
                ObjectMeta: metav1.ObjectMeta{
1✔
1031
                        Name:      pvc.Name,
1✔
1032
                        Namespace: dataImportCron.Namespace,
1✔
1033
                        Labels: map[string]string{
1✔
1034
                                common.CDILabelKey:       common.CDILabelValue,
1✔
1035
                                common.CDIComponentLabel: "",
1✔
1036
                        },
1✔
1037
                },
1✔
1038
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
1039
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
1040
                                PersistentVolumeClaimName: &pvc.Name,
1✔
1041
                        },
1✔
1042
                },
1✔
1043
        }
1✔
1044
        // Select VolumeSnapshotClass for boot source snapshot
1✔
1045
        snapshotClassName, err := r.getSnapshotClassForDataImportCron(pvc, storageProfile)
1✔
1046
        if err != nil {
1✔
1047
                return err
×
1048
        }
×
1049
        if snapshotClassName != "" {
2✔
1050
                desiredSnapshot.Spec.VolumeSnapshotClassName = &snapshotClassName
1✔
1051
        }
1✔
1052
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
1053
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
1✔
1054

1✔
1055
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
1056
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
1057
                if !k8serrors.IsNotFound(err) {
1✔
1058
                        return err
×
1059
                }
×
1060
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1061
                pvcSize := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
1✔
1062
                size := inferAdvisedRestoreSizeForSnapshot(&dataImportCron.Spec.Template, desiredSnapshot, &pvcSize)
1✔
1063
                if size != nil {
2✔
1064
                        cc.AddAnnotation(desiredSnapshot, cc.AnnAdvisedRestoreSize, size.String())
1✔
1065
                }
1✔
1066
                if pvc.Spec.VolumeMode != nil {
2✔
1067
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
1068
                }
1✔
1069
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
1070
                        return err
×
1071
                }
×
1072
        } else {
1✔
1073
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
1074
                        // Clean up DV/PVC as they are not needed anymore
1✔
1075
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
1076
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
1077
                                return err
×
1078
                        }
×
1079
                }
1080
        }
1081

1082
        return nil
1✔
1083
}
1084

1085
func (r *DataImportCronReconciler) handleSnapshotClassChange(ctx context.Context, snapshot *snapshotv1.VolumeSnapshot, storageClassName string) (bool, error) {
1✔
1086
        sp := &cdiv1.StorageProfile{}
1✔
1087
        if err := r.client.Get(ctx, types.NamespacedName{Name: storageClassName}, sp); err != nil {
1✔
1088
                return false, client.IgnoreNotFound(err)
×
1089
        }
×
1090

1091
        desiredVSC, err := r.getSnapshotClassForDataImportCron(nil, sp)
1✔
1092
        if err != nil {
1✔
1093
                return false, err
×
1094
        }
×
1095
        actualVSC := ""
1✔
1096
        if snapshot.Spec.VolumeSnapshotClassName != nil {
1✔
1097
                actualVSC = *snapshot.Spec.VolumeSnapshotClassName
×
1098
        }
×
1099
        if desiredVSC == "" || actualVSC == desiredVSC {
2✔
1100
                return false, nil
1✔
1101
        }
1✔
1102

1103
        r.log.Info("Snapshot class changed, deleting", "name", snapshot.Name, "from", actualVSC, "to", desiredVSC)
×
1104
        if err := r.client.Delete(ctx, snapshot); err != nil {
×
1105
                return false, client.IgnoreNotFound(err)
×
1106
        }
×
1107
        return true, nil
×
1108
}
1109

1110
func (r *DataImportCronReconciler) importOutdated(ctx context.Context, cron *cdiv1.DataImportCron, currentDV *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim, desiredSC *storagev1.StorageClass) (bool, error) {
1✔
1111
        currentSc, hasCurrent := cron.Annotations[AnnStorageClass]
1✔
1112
        scChanged := currentSc != desiredSC.Name
1✔
1113
        needsReset := hasCurrent || len(cron.Status.CurrentImports) > 0
1✔
1114
        if scChanged && needsReset {
2✔
1115
                r.log.Info("Storage class changed, resetting import", "currentSc", currentSc, "desiredSc", desiredSC.Name)
1✔
1116
                return true, nil
1✔
1117
        }
1✔
1118

1119
        if pvc != nil && pvc.Status.Phase == corev1.ClaimPending &&
1✔
1120
                pvc.Labels[common.DataImportCronLabel] == cron.Name &&
1✔
1121
                pvc.Spec.StorageClassName != nil && *pvc.Spec.StorageClassName != desiredSC.Name {
2✔
1122
                r.log.Info("Pending PVC has outdated storage class, resetting import", "pvc", pvc.Name, "pvcSc", *pvc.Spec.StorageClassName, "desiredSc", desiredSC.Name)
1✔
1123
                return true, nil
1✔
1124
        }
1✔
1125

1126
        if currentDV != nil {
2✔
1127
                sp := &cdiv1.StorageProfile{}
1✔
1128
                if err := r.client.Get(ctx, types.NamespacedName{Name: desiredSC.Name}, sp); err != nil {
1✔
NEW
1129
                        return false, err
×
NEW
1130
                }
×
1131
                desiredDV := r.newSourceDataVolume(cron, currentDV.Name, sp)
1✔
1132
                if !reflect.DeepEqual(currentDV.Spec, desiredDV.Spec) {
2✔
1133
                        r.log.Info("Import DV spec changed, resetting import", "dv", currentDV.Name)
1✔
1134
                        return true, nil
1✔
1135
                }
1✔
1136
        }
1137

1138
        return false, nil
1✔
1139
}
1140

1141
// getSnapshotClassForDataImportCron returns the VolumeSnapshotClass name to use for DataImportCron snapshots.
1142
func (r *DataImportCronReconciler) getSnapshotClassForDataImportCron(pvc *corev1.PersistentVolumeClaim, storageProfile *cdiv1.StorageProfile) (string, error) {
1✔
1143
        if vscName := storageProfile.Annotations[cc.AnnSnapshotClassForDataImportCron]; vscName != "" {
2✔
1144
                return vscName, nil
1✔
1145
        }
1✔
1146
        return cc.GetSnapshotClassForSmartClone(pvc, &storageProfile.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
1147
}
1148

1149
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
1150
        dataImportCron.Status.SourceFormat = &format
1✔
1151

1✔
1152
        switch format {
1✔
1153
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1154
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1155
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1156
                if snapshot == nil {
2✔
1157
                        // Snapshot create/update will trigger reconcile
1✔
1158
                        return nil
1✔
1159
                }
1✔
1160
                if cc.IsSnapshotReady(snapshot) {
2✔
1161
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1162
                } else {
2✔
1163
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
1164
                }
1✔
1165
        default:
×
1166
                return fmt.Errorf("unknown source format for snapshot")
×
1167
        }
1168

1169
        return nil
1✔
1170
}
1171

1172
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
1173
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
1174
        if desiredStorageClass == nil {
2✔
1175
                return format, nil
1✔
1176
        }
1✔
1177

1178
        storageProfile := &cdiv1.StorageProfile{}
1✔
1179
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1180
                return format, err
×
1181
        }
×
1182
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
1183
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
1184
        }
1✔
1185

1186
        return format, nil
1✔
1187
}
1188

1189
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
1190
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
1191
                return nil
×
1192
        }
×
1193
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
1194
        if err != nil {
1✔
1195
                return err
×
1196
        }
×
1197

1198
        maxImports := defaultImportsToKeepPerCron
1✔
1199

1✔
1200
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
1201
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
1202
        }
1✔
1203

1204
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
1205
                return err
×
1206
        }
×
1207
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
1208
                return err
×
1209
        }
×
1210

1211
        return nil
1✔
1212
}
1213

1214
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
1215
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
1216

1✔
1217
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1218
                return err
×
1219
        }
×
1220
        if len(pvcList.Items) > maxImports {
2✔
1221
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
1222
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
1223
                })
1✔
1224
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
1225
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1226
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
1227
                                return err
×
1228
                        }
×
1229
                }
1230
        }
1231

1232
        dvList := &cdiv1.DataVolumeList{}
1✔
1233
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1234
                return err
×
1235
        }
×
1236

1237
        if len(dvList.Items) > maxImports {
2✔
1238
                for _, dv := range dvList.Items {
2✔
1239
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
1240
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
1241
                                return err
×
1242
                        }
×
1243

1244
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1245
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1246
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
1247
                                        return err
×
1248
                                }
×
1249
                        }
1250
                }
1251
        }
1252

1253
        return nil
1✔
1254
}
1255

1256
// deleteDvPvc deletes DV or PVC if DV was GCed
1257
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
1258
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
1259
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
1260
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
1261
                return err
1✔
1262
        }
1✔
1263
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
1264
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
1265
                return err
×
1266
        }
×
1267
        return nil
1✔
1268
}
1269

1270
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
1271
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
1272

1✔
1273
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1274
                if meta.IsNoMatchError(err) {
×
1275
                        return nil
×
1276
                }
×
1277
                return err
×
1278
        }
1279
        if len(snapList.Items) > maxImports {
1✔
1280
                sort.Slice(snapList.Items, func(i, j int) bool {
×
1281
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
1282
                })
×
1283
                for _, snap := range snapList.Items[maxImports:] {
×
1284
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
1285
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
1286
                                return err
×
1287
                        }
×
1288
                }
1289
        }
1290

1291
        return nil
1✔
1292
}
1293

1294
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
1295
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
1296
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
1297

1✔
1298
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
1299
                return err
×
1300
        }
×
1301
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
1302
        if err != nil {
1✔
1303
                return err
×
1304
        }
×
1305
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
1306
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
1307
                return err
×
1308
        }
×
1309
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
1310
                return err
×
1311
        }
×
1312
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
1313
                return err
×
1314
        }
×
1315
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
1316
                return err
×
1317
        }
×
1318
        return nil
1✔
1319
}
1320

1321
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
1322
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
1323
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
1324
        if err != nil {
1✔
1325
                return err
×
1326
        }
×
1327
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
1328
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
1329
                return err
×
1330
        }
×
1331
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
1332
                return err
×
1333
        }
×
1334

1335
        return nil
1✔
1336
}
1337

1338
// NewDataImportCronController creates a new instance of the DataImportCron controller
1339
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
1340
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
1341
                Scheme: mgr.GetScheme(),
×
1342
                Mapper: mgr.GetRESTMapper(),
×
1343
        })
×
1344
        if err != nil {
×
1345
                return nil, err
×
1346
        }
×
1347
        reconciler := &DataImportCronReconciler{
×
1348
                client:          mgr.GetClient(),
×
1349
                uncachedClient:  uncachedClient,
×
1350
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1351
                scheme:          mgr.GetScheme(),
×
1352
                log:             log.WithName(dataImportControllerName),
×
1353
                image:           importerImage,
×
1354
                pullPolicy:      pullPolicy,
×
1355
                cdiNamespace:    util.GetNamespace(),
×
1356
                installerLabels: installerLabels,
×
1357
        }
×
1358
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1359
                MaxConcurrentReconciles: 3,
×
1360
                Reconciler:              reconciler,
×
1361
        })
×
1362
        if err != nil {
×
1363
                return nil, err
×
1364
        }
×
1365
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
1366
                return nil, err
×
1367
        }
×
1368
        log.Info("Initialized DataImportCron controller")
×
1369
        return dataImportCronController, nil
×
1370
}
1371

1372
func getCronName(obj client.Object) string {
×
1373
        return obj.GetLabels()[common.DataImportCronLabel]
×
1374
}
×
1375

1376
func getCronNs(obj client.Object) string {
×
1377
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
1378
}
×
1379

1380
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
1381
        if cronName := getCronName(obj); cronName != "" {
×
1382
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
1383
        }
×
1384
        return nil
×
1385
}
1386

1387
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1388
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1389
                return err
×
1390
        }
×
1391

1392
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1393
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1394
                // Otherwise we risk losing the storage profile event
×
1395
                var crons cdiv1.DataImportCronList
×
1396
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
1397
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
1398
                        return nil
×
1399
                }
×
1400
                // Storage profiles are 1:1 to storage classes
1401
                scName := obj.GetName()
×
1402
                var reqs []reconcile.Request
×
1403
                for _, cron := range crons.Items {
×
1404
                        dataVolume := cron.Spec.Template
×
1405
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1406
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1407
                        if err != nil || templateSc == nil {
×
1408
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1409
                                return reqs
×
1410
                        }
×
1411
                        if templateSc.Name == scName {
×
1412
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1413
                        }
×
1414
                }
1415
                return reqs
×
1416
        }
1417

1418
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1419
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1420
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1421
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1422
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1423
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1424
                },
1425
        )); err != nil {
×
1426
                return err
×
1427
        }
×
1428

1429
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1430
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1431
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1432
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1433
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1434
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1435
                },
1436
        )); err != nil {
×
1437
                return err
×
1438
        }
×
1439

1440
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1441
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1442
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1443
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1444
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1445
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1446
                },
1447
        )); err != nil {
×
1448
                return err
×
1449
        }
×
1450

1451
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1452
                return err
×
1453
        }
×
1454

1455
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1456
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1457
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
1458
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1459
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1460
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1461
                                return dicRelevantFieldsChanged(e.ObjectOld, e.ObjectNew)
×
1462
                        },
×
1463
                },
1464
        )); err != nil {
×
1465
                return err
×
1466
        }
×
1467

1468
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1469
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1470
        }
×
1471

1472
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1473
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1474
                predicate.TypedFuncs[*batchv1.CronJob]{
×
1475
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1476
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1477
                        },
×
1478
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
1479
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1480
                },
1481
        )); err != nil {
×
1482
                return err
×
1483
        }
×
1484

1485
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1486
                if meta.IsNoMatchError(err) {
×
1487
                        // Back out if there's no point to attempt watch
×
1488
                        return nil
×
1489
                }
×
1490
                if !cc.IsErrCacheNotStarted(err) {
×
1491
                        return err
×
1492
                }
×
1493
        }
1494
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1495
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
1496
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1497
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1498
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1499
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1500
                },
1501
        )); err != nil {
×
1502
                return err
×
1503
        }
×
1504

1505
        return nil
×
1506
}
1507

1508
func dicRelevantFieldsChanged(oldSp, newSp *cdiv1.StorageProfile) bool {
×
1509
        sourceFormatChanged := oldSp.Status.DataImportCronSourceFormat != newSp.Status.DataImportCronSourceFormat
×
1510
        rwoAnnotationChanged := oldSp.Annotations[cc.AnnUseReadWriteOnceForDataImportCron] != newSp.Annotations[cc.AnnUseReadWriteOnceForDataImportCron]
×
1511
        snapshotClassChanged := oldSp.Annotations[cc.AnnSnapshotClassForDataImportCron] != newSp.Annotations[cc.AnnSnapshotClassForDataImportCron]
×
1512
        return sourceFormatChanged || rwoAnnotationChanged || snapshotClassChanged
×
1513
}
×
1514

1515
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1516
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1517
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1518
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
1519
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1520
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1521
                                log.Info("Update", "sc", obj.GetName(),
×
1522
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1523
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1524
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
×
1525
                                if err != nil {
×
1526
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
1527
                                }
×
1528
                                return reqs
×
1529
                        },
1530
                ),
1531
                predicate.TypedFuncs[*storagev1.StorageClass]{
1532
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1533
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1534
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1535
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1536
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1537
                        },
×
1538
                },
1539
        )); err != nil {
×
1540
                return err
×
1541
        }
×
1542

1543
        return nil
×
1544
}
1545

1546
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1547
        dicList := &cdiv1.DataImportCronList{}
×
1548
        if err := c.List(ctx, dicList); err != nil {
×
1549
                return nil, err
×
1550
        }
×
1551
        reqs := []reconcile.Request{}
×
1552
        for _, dic := range dicList.Items {
×
1553
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
1554
                        continue
×
1555
                }
1556

1557
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1558
        }
1559

1560
        return reqs, nil
×
1561
}
1562

1563
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1564
        cronJob := &batchv1.CronJob{}
1✔
1565
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1566
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1567
                return false, cc.IgnoreNotFound(err)
1✔
1568
        }
1✔
1569

1570
        cronJobCopy := cronJob.DeepCopy()
1✔
1571
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1572
                return false, err
×
1573
        }
×
1574

1575
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1576
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1577
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
1578
                        return false, cc.IgnoreNotFound(err)
×
1579
                }
×
1580
        }
1581
        return true, nil
1✔
1582
}
1583

1584
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1585
        cronJob := &batchv1.CronJob{
1✔
1586
                ObjectMeta: metav1.ObjectMeta{
1✔
1587
                        Name:      GetCronJobName(cron),
1✔
1588
                        Namespace: r.cdiNamespace,
1✔
1589
                },
1✔
1590
        }
1✔
1591
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
1592
                return nil, err
×
1593
        }
×
1594
        return cronJob, nil
1✔
1595
}
1596

1597
// InitPollerPod inits poller Pod
1598
func InitPollerPod(c client.Client, cron *cdiv1.DataImportCron, pod *corev1.PodTemplateSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1599
        regSource, err := getCronRegistrySource(cron)
1✔
1600
        if err != nil {
1✔
1601
                return err
×
1602
        }
×
1603
        if regSource.URL == nil {
1✔
1604
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1605
        }
×
1606
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1607
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
1608
                return err
×
1609
        }
×
1610
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1611
        if err != nil {
1✔
1612
                return err
×
1613
        }
×
1614
        container := corev1.Container{
1✔
1615
                Name:  "cdi-source-update-poller",
1✔
1616
                Image: image,
1✔
1617
                Command: []string{
1✔
1618
                        "/usr/bin/cdi-source-update-poller",
1✔
1619
                        "-ns", cron.Namespace,
1✔
1620
                        "-cron", cron.Name,
1✔
1621
                        "-url", *regSource.URL,
1✔
1622
                },
1✔
1623
                ImagePullPolicy:          pullPolicy,
1✔
1624
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1625
                TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
1626
        }
1✔
1627

1✔
1628
        var volumes []corev1.Volume
1✔
1629
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1630
        if hasCertConfigMap {
1✔
1631
                vm := corev1.VolumeMount{
×
1632
                        Name:      CertVolName,
×
1633
                        MountPath: common.ImporterCertDir,
×
1634
                }
×
1635
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1636
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
1637
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
1638
        }
×
1639

1640
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1641
                vm := corev1.VolumeMount{
1✔
1642
                        Name:      ProxyCertVolName,
1✔
1643
                        MountPath: common.ImporterProxyCertDir,
1✔
1644
                }
1✔
1645
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1646
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1647
        }
1✔
1648

1649
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
1650
                container.Env = append(container.Env,
×
1651
                        corev1.EnvVar{
×
1652
                                Name: common.ImporterAccessKeyID,
×
1653
                                ValueFrom: &corev1.EnvVarSource{
×
1654
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1655
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1656
                                                        Name: *regSource.SecretRef,
×
1657
                                                },
×
1658
                                                Key: common.KeyAccess,
×
1659
                                        },
×
1660
                                },
×
1661
                        },
×
1662
                        corev1.EnvVar{
×
1663
                                Name: common.ImporterSecretKey,
×
1664
                                ValueFrom: &corev1.EnvVarSource{
×
1665
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1666
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1667
                                                        Name: *regSource.SecretRef,
×
1668
                                                },
×
1669
                                                Key: common.KeySecret,
×
1670
                                        },
×
1671
                                },
×
1672
                        },
×
1673
                )
×
1674
        }
×
1675

1676
        addEnvVar := func(varName, value string) {
2✔
1677
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1678
        }
1✔
1679

1680
        if insecureTLS {
1✔
1681
                addEnvVar(common.InsecureTLSVar, "true")
×
1682
        }
×
1683

1684
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1685
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1686
                        addEnvVar(varName, value)
1✔
1687
                }
1✔
1688
        }
1689

1690
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1691
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1692
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1693

1✔
1694
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1695
        if err != nil {
1✔
1696
                return err
×
1697
        }
×
1698
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1699
        if err != nil {
1✔
1700
                return err
×
1701
        }
×
1702

1703
        podSpec := &pod.Spec
1✔
1704

1✔
1705
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1706
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1707
        podSpec.Containers = []corev1.Container{container}
1✔
1708
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1709
        podSpec.Volumes = volumes
1✔
1710
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1711
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1712
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1713
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1714

1✔
1715
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1716
        // No need for specifid uid/fsgroup here since this doesn't write or use qemu
1✔
1717
        if podSpec.SecurityContext != nil {
2✔
1718
                podSpec.SecurityContext.FSGroup = nil
1✔
1719
        }
1✔
1720
        if podSpec.Containers[0].SecurityContext != nil {
2✔
1721
                podSpec.Containers[0].SecurityContext.RunAsUser = nil
1✔
1722
        }
1✔
1723

1724
        if pod.Labels == nil {
2✔
1725
                pod.Labels = map[string]string{}
1✔
1726
        }
1✔
1727
        pod.Labels[common.DataImportCronPollerLabel] = ""
1✔
1728

1✔
1729
        return nil
1✔
1730
}
1731

1732
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1733
        cronJobSpec := &cronJob.Spec
1✔
1734
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1735
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1736
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1737
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1738

1✔
1739
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1740
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1741
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1742
        cc.AddAnnotation(&jobSpec.Template, secv1.RequiredSCCAnnotation, common.RestrictedSCCName)
1✔
1743

1✔
1744
        pod := &jobSpec.Template
1✔
1745
        if err := InitPollerPod(r.client, cron, pod, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1746
                return err
×
1747
        }
×
1748
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1749
                return err
×
1750
        }
×
1751
        return nil
1✔
1752
}
1753

1754
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1755
        job := &batchv1.Job{
1✔
1756
                ObjectMeta: metav1.ObjectMeta{
1✔
1757
                        Name:      GetInitialJobName(cron),
1✔
1758
                        Namespace: cronJob.Namespace,
1✔
1759
                },
1✔
1760
                Spec: cronJob.Spec.JobTemplate.Spec,
1✔
1761
        }
1✔
1762
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1763
                return nil, err
×
1764
        }
×
1765
        return job, nil
1✔
1766
}
1767

1768
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1769
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1770
                return err
×
1771
        }
×
1772
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1773
        labels := obj.GetLabels()
1✔
1774
        labels[common.DataImportCronNsLabel] = cron.Namespace
1✔
1775
        labels[common.DataImportCronLabel] = cron.Name
1✔
1776
        obj.SetLabels(labels)
1✔
1777
        return nil
1✔
1778
}
1779

1780
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string, storageProfile *cdiv1.StorageProfile) *cdiv1.DataVolume {
1✔
1781
        dv := cron.Spec.Template.DeepCopy()
1✔
1782
        if isCronRegistrySource(cron) {
2✔
1783
                var digestedURL string
1✔
1784
                if isURLSource(cron) {
2✔
1785
                        digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1786
                } else if isImageStreamSource(cron) {
3✔
1787
                        // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1788
                        digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1789
                        dv.Spec.Source.Registry.ImageStream = nil
1✔
1790
                }
1✔
1791
                dv.Spec.Source.Registry.URL = &digestedURL
1✔
1792
        }
1793
        dv.Name = dataVolumeName
1✔
1794
        dv.Namespace = cron.Namespace
1✔
1795
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1796
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1797
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1798
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1799

1✔
1800
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
1801
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1802
        }
1✔
1803

1804
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1805

1✔
1806
        // Apply RWO access mode as default for DataImportCron (from StorageProfile annotation)
1✔
1807
        // Only applies if the DV doesn't already have AccessModes configured
1✔
1808
        if storageProfile != nil && storageProfile.Annotations[cc.AnnUseReadWriteOnceForDataImportCron] == "true" {
2✔
1809
                if dv.Spec.Storage != nil && len(dv.Spec.Storage.AccessModes) == 0 {
2✔
1810
                        dv.Spec.Storage.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}
1✔
1811
                }
1✔
1812
        }
1813

1814
        return dv
1✔
1815
}
1816

1817
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1818
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1819
        cc.CopyAllowedLabels(cron.GetLabels(), obj, true)
1✔
1820
        labels := obj.GetLabels()
1✔
1821
        labels[common.DataImportCronLabel] = cron.Name
1✔
1822
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
2✔
1823
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1824
        }
1✔
1825
        obj.SetLabels(labels)
1✔
1826
}
1827

1828
func untagDigestedDockerURL(dockerURL string) string {
1✔
1829
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1830
                url := u.Host + u.Path
1✔
1831
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1832
                // Check for tag
1✔
1833
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1834
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
2✔
1835
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1836
                        }
1✔
1837
                }
1838
        }
1839
        return dockerURL
1✔
1840
}
1841

1842
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1843
        if val := cron.Labels[ann]; val != "" {
2✔
1844
                cc.AddLabel(dv, ann, val)
1✔
1845
        }
1✔
1846
}
1847

1848
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1849
        if val := cron.Annotations[ann]; val != "" {
1✔
1850
                cc.AddAnnotation(dv, ann, val)
×
1851
        }
×
1852
}
1853

1854
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1✔
1855
        dataSource := &cdiv1.DataSource{
1✔
1856
                ObjectMeta: metav1.ObjectMeta{
1✔
1857
                        Name:      cron.Spec.ManagedDataSource,
1✔
1858
                        Namespace: cron.Namespace,
1✔
1859
                },
1✔
1860
        }
1✔
1861
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1862
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1863
        cc.CopyAllowedLabels(cron.GetLabels(), dataSource, true)
1✔
1864
        return dataSource
1✔
1865
}
1✔
1866

1867
// Create DataVolume name based on the DataSource name + prefix of the digest
1868
func createDvName(prefix, digest string) (string, error) {
1✔
1869
        digestPrefix := ""
1✔
1870
        if strings.HasPrefix(digest, digestSha256Prefix) {
2✔
1871
                digestPrefix = digestSha256Prefix
1✔
1872
        } else if strings.HasPrefix(digest, digestUIDPrefix) {
3✔
1873
                digestPrefix = digestUIDPrefix
1✔
1874
        } else {
2✔
1875
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1876
        }
1✔
1877
        fromIdx := len(digestPrefix)
1✔
1878
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1879
        if len(digest) < toIdx {
2✔
1880
                return "", errors.Errorf("Digest is too short")
1✔
1881
        }
1✔
1882
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1883
}
1884

1885
// GetCronJobName get CronJob name based on cron name and UID
1886
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1887
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1888
}
1✔
1889

1890
// GetInitialJobName get initial job name based on cron name and UID
1891
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1892
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1893
}
1✔
1894

1895
func getPollerPodName(cron *cdiv1.DataImportCron) string {
1✔
1896
        return naming.GetResourceName("poller-"+cron.Name, string(cron.UID)[:8])
1✔
1897
}
1✔
1898

1899
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1900
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1901
}
1✔
1902

1903
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1904
        dv := &cron.Spec.Template
1✔
1905

1✔
1906
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1907
                return explicitVolumeMode, nil
×
1908
        }
×
1909

1910
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1911
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1912
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1913
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1914
                        AccessModes:      accessModes,
1✔
1915
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1916
                        Resources: corev1.VolumeResourceRequirements{
1✔
1917
                                Requests: corev1.ResourceList{
1✔
1918
                                        // Doesn't matter
1✔
1919
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1920
                                },
1✔
1921
                        },
1✔
1922
                },
1✔
1923
        }
1✔
1924
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1925
                return nil, err
×
1926
        }
×
1927

1928
        return inferredPvc.Spec.VolumeMode, nil
1✔
1929
}
1930

1931
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1932
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1933
        if dv.Spec.PVC != nil {
1✔
1934
                return dv.Spec.PVC.VolumeMode
×
1935
        }
×
1936

1937
        if dv.Spec.Storage != nil {
2✔
1938
                return dv.Spec.Storage.VolumeMode
1✔
1939
        }
1✔
1940

1941
        return nil
×
1942
}
1943

1944
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1945
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1946
        if dv.Spec.PVC != nil {
1✔
1947
                return dv.Spec.PVC.AccessModes
×
1948
        }
×
1949

1950
        if dv.Spec.Storage != nil {
2✔
1951
                return dv.Spec.Storage.AccessModes
1✔
1952
        }
1✔
1953

1954
        return nil
×
1955
}
1956

1957
func inferAdvisedRestoreSizeForSnapshot(dv *cdiv1.DataVolume, snapshot *snapshotv1.VolumeSnapshot, fallback *resource.Quantity) *resource.Quantity {
1✔
1958
        var dvSize resource.Quantity
1✔
1959

1✔
1960
        if dv.Spec.PVC != nil {
1✔
1961
                dvSize = dv.Spec.PVC.Resources.Requests[corev1.ResourceStorage]
×
1962
        }
×
1963

1964
        if dv.Spec.Storage != nil {
2✔
1965
                dvSize = dv.Spec.Storage.Resources.Requests[corev1.ResourceStorage]
1✔
1966
        }
1✔
1967

1968
        if dvSize.IsZero() && fallback != nil {
2✔
1969
                return fallback
1✔
1970
        }
1✔
1971

1972
        if snapshot.Status != nil {
2✔
1973
                if rs := snapshot.Status.RestoreSize; rs != nil && dvSize.Cmp(*rs) < 0 {
2✔
1974
                        return snapshot.Status.RestoreSize
1✔
1975
                }
1✔
1976
        }
1977

1978
        return &dvSize
1✔
1979
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc