• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #6087

29 Jun 2026 03:00PM UTC coverage: 49.73% (-0.05%) from 49.781%
#6087

Pull #4074

travis-ci

noamasu
Fix DataImportCron reconciliation when first default StorageClass is set

The default SC change condition only handled changes from one SC to another, but did not handle the first default SC being set.

To get the DIC-created DV with the DIC-special RWO preference, it must be deleted (as DV is immutable) and recreated with the right settings from the new default StorageClass.

Signed-off-by: Noam Assouline <nassouli@redhat.com>
Pull Request #4074: Recreate DIC import DV when first default SC is set

30 of 32 new or added lines in 1 file covered. (93.75%)

21 existing lines in 4 files now uncovered.

15125 of 30414 relevant lines covered (49.73%)

0.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.98
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        secv1 "github.com/openshift/api/security/v1"
33
        "github.com/pkg/errors"
34
        cronexpr "github.com/robfig/cron/v3"
35

36
        authorizationv1 "k8s.io/api/authorization/v1"
37
        batchv1 "k8s.io/api/batch/v1"
38
        corev1 "k8s.io/api/core/v1"
39
        storagev1 "k8s.io/api/storage/v1"
40
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
41
        "k8s.io/apimachinery/pkg/api/meta"
42
        "k8s.io/apimachinery/pkg/api/resource"
43
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
44
        "k8s.io/apimachinery/pkg/labels"
45
        "k8s.io/apimachinery/pkg/runtime"
46
        "k8s.io/apimachinery/pkg/types"
47
        "k8s.io/client-go/tools/record"
48
        openapicommon "k8s.io/kube-openapi/pkg/common"
49
        "k8s.io/utils/ptr"
50

51
        "sigs.k8s.io/controller-runtime/pkg/client"
52
        "sigs.k8s.io/controller-runtime/pkg/controller"
53
        "sigs.k8s.io/controller-runtime/pkg/event"
54
        "sigs.k8s.io/controller-runtime/pkg/handler"
55
        "sigs.k8s.io/controller-runtime/pkg/manager"
56
        "sigs.k8s.io/controller-runtime/pkg/predicate"
57
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
58
        "sigs.k8s.io/controller-runtime/pkg/source"
59

60
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
61
        "kubevirt.io/containerized-data-importer/pkg/common"
62
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
63
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
64
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
65
        "kubevirt.io/containerized-data-importer/pkg/operator"
66
        "kubevirt.io/containerized-data-importer/pkg/util"
67
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
68
)
69

70
const (
71
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
72
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
73
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
74
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
75
)
76

77
// DataImportCronReconciler members
78
type DataImportCronReconciler struct {
79
        client          client.Client
80
        uncachedClient  client.Client
81
        recorder        record.EventRecorder
82
        scheme          *runtime.Scheme
83
        log             logr.Logger
84
        image           string
85
        pullPolicy      string
86
        cdiNamespace    string
87
        installerLabels map[string]string
88
}
89

90
const (
91
        // AnnSourceDesiredDigest is the digest of the pending updated image
92
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
93
        // AnnImageStreamDockerRef is the ImageStream Docker reference
94
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
95
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
96
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
97
        // AnnLastCronTime is the cron last execution time stamp
98
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
99
        // AnnLastUseTime is the PVC last use time stamp
100
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
101
        // AnnStorageClass is the cron DV's storage class
102
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
103

104
        dataImportControllerName    = "dataimportcron-controller"
105
        digestSha256Prefix          = "sha256:"
106
        digestUIDPrefix             = "uid:"
107
        digestDvNameSuffixLength    = 12
108
        cronJobUIDSuffixLength      = 8
109
        defaultImportsToKeepPerCron = 3
110
)
111

112
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
113

114
// Reconcile loop for DataImportCronReconciler
115
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
116
        dataImportCron := &cdiv1.DataImportCron{}
1✔
117
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
118
                return reconcile.Result{}, err
1✔
119
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
120
                err := r.cleanup(ctx, req.NamespacedName)
1✔
121
                return reconcile.Result{}, err
1✔
122
        }
1✔
123
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
124
        if !shouldReconcile || err != nil {
1✔
125
                return reconcile.Result{}, err
×
126
        }
×
127

128
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
129
                return reconcile.Result{}, err
×
130
        }
×
131

132
        return r.update(ctx, dataImportCron)
1✔
133
}
134

135
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
136
        dataSource := &cdiv1.DataSource{}
1✔
137
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
138
                if k8serrors.IsNotFound(err) {
2✔
139
                        return true, nil
1✔
140
                }
1✔
141
                return false, err
×
142
        }
143
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
144
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
145
                return true, nil
1✔
146
        }
1✔
147
        otherCron := &cdiv1.DataImportCron{}
1✔
148
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
149
                if k8serrors.IsNotFound(err) {
2✔
150
                        return true, nil
1✔
151
                }
1✔
152
                return false, err
×
153
        }
154
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
155
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
156
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
157
                r.log.V(3).Info(msg)
1✔
158
                return false, nil
1✔
159
        }
1✔
160
        return true, nil
×
161
}
162

163
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
164
        if dataImportCron.Spec.Schedule == "" {
2✔
165
                return nil
1✔
166
        }
1✔
167
        if isControllerPolledSource(dataImportCron) {
2✔
168
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
169
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
170
                }
1✔
171
                return nil
1✔
172
        }
173
        if !isURLSource(dataImportCron) {
1✔
174
                return nil
×
175
        }
×
176
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
177
        if exists || err != nil {
2✔
178
                return err
1✔
179
        }
1✔
180
        cronJob, err := r.newCronJob(dataImportCron)
1✔
181
        if err != nil {
1✔
182
                return err
×
183
        }
×
184
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
185
                return err
×
186
        }
×
187
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
188
        if err != nil {
1✔
189
                return err
×
190
        }
×
191
        if err := r.client.Create(ctx, job); err != nil {
1✔
192
                return err
×
193
        }
×
194
        return nil
1✔
195
}
196

197
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
198
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
199
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
200
        }
×
201
        imageStream := &imagev1.ImageStream{}
1✔
202
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
203
        if err != nil {
2✔
204
                return nil, "", err
1✔
205
        }
1✔
206
        imageStreamNamespacedName := types.NamespacedName{
1✔
207
                Namespace: imageStreamNamespace,
1✔
208
                Name:      name,
1✔
209
        }
1✔
210
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
211
                return nil, "", err
1✔
212
        }
1✔
213
        return imageStream, tag, nil
1✔
214
}
215

216
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
217
        if imageStream == nil {
1✔
218
                return "", "", errors.Errorf("No ImageStream")
×
219
        }
×
220
        tags := imageStream.Status.Tags
1✔
221
        if len(tags) == 0 {
1✔
222
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
223
        }
×
224

225
        tagIdx := 0
1✔
226
        if len(imageStreamTag) > 0 {
2✔
227
                tagIdx = -1
1✔
228
                for i, tag := range tags {
2✔
229
                        if tag.Tag == imageStreamTag {
2✔
230
                                tagIdx = i
1✔
231
                                break
1✔
232
                        }
233
                }
234
        }
235
        if tagIdx == -1 {
2✔
236
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
237
        }
1✔
238

239
        if len(tags[tagIdx].Items) == 0 {
2✔
240
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
241
        }
1✔
242
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
243
}
244

245
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
246
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
247
                return imageStreamName, "", nil
1✔
248
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
249
                return subs[0], subs[1], nil
1✔
250
        }
1✔
251
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
252
}
253

254
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
255
        nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]
1✔
256
        if nextTimeStr == "" {
1✔
257
                return r.setNextCronTime(dataImportCron)
×
258
        }
×
259
        nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
260
        if err != nil {
1✔
261
                return reconcile.Result{}, err
×
262
        }
×
263
        if nextTime.After(time.Now()) {
2✔
264
                return r.setNextCronTime(dataImportCron)
1✔
265
        }
1✔
266
        switch {
1✔
267
        case isImageStreamSource(dataImportCron):
1✔
268
                if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
269
                        return reconcile.Result{}, err
1✔
270
                }
1✔
271
        case isPvcSource(dataImportCron):
1✔
272
                if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
2✔
273
                        return reconcile.Result{}, err
1✔
274
                }
1✔
275
        case isNodePull(dataImportCron):
1✔
276
                if done, err := r.updateContainerImageDesiredDigest(ctx, dataImportCron); !done {
2✔
277
                        return reconcile.Result{RequeueAfter: 3 * time.Second}, err
1✔
278
                } else if err != nil {
2✔
279
                        return reconcile.Result{}, err
×
280
                }
×
281
        }
282

283
        return r.setNextCronTime(dataImportCron)
1✔
284
}
285

286
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
287
        now := time.Now()
1✔
288
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
289
        if err != nil {
1✔
290
                return reconcile.Result{}, err
×
291
        }
×
292
        nextTime := expr.Next(now)
1✔
293
        requeueAfter := nextTime.Sub(now)
1✔
294
        res := reconcile.Result{RequeueAfter: requeueAfter}
1✔
295
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
296
        return res, err
1✔
297
}
298

299
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
300
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
301
        return err == nil && regSource.ImageStream != nil
1✔
302
}
1✔
303

304
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
305
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
306
        return err == nil && regSource.URL != nil
1✔
307
}
1✔
308

309
func isNodePull(cron *cdiv1.DataImportCron) bool {
1✔
310
        regSource, err := getCronRegistrySource(cron)
1✔
311
        return err == nil && regSource != nil && regSource.PullMethod != nil &&
1✔
312
                *regSource.PullMethod == cdiv1.RegistryPullNode
1✔
313
}
1✔
314

315
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
316
        if !isCronRegistrySource(cron) {
2✔
317
                return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
1✔
318
        }
1✔
319
        return cron.Spec.Template.Spec.Source.Registry, nil
1✔
320
}
321

322
func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
1✔
323
        source := cron.Spec.Template.Spec.Source
1✔
324
        return source != nil && source.Registry != nil
1✔
325
}
1✔
326

327
func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
1✔
328
        if !isPvcSource(cron) {
1✔
329
                return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
×
330
        }
×
331
        return cron.Spec.Template.Spec.Source.PVC, nil
1✔
332
}
333

334
func isPvcSource(cron *cdiv1.DataImportCron) bool {
1✔
335
        source := cron.Spec.Template.Spec.Source
1✔
336
        return source != nil && source.PVC != nil
1✔
337
}
1✔
338

339
func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
1✔
340
        return isImageStreamSource(cron) || isPvcSource(cron) || isNodePull(cron)
1✔
341
}
1✔
342

343
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
344
        res := reconcile.Result{}
1✔
345

1✔
346
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
347
        if err != nil {
1✔
348
                return res, err
×
349
        }
×
350

351
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
352
        imports := dataImportCron.Status.CurrentImports
1✔
353
        importSucceeded := false
1✔
354

1✔
355
        dataVolume := dataImportCron.Spec.Template
1✔
356
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
357
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
358
        if err != nil {
1✔
359
                return res, err
×
360
        }
×
361
        if desiredStorageClass != nil {
2✔
362
                desiredSc := desiredStorageClass.Name
1✔
363
                outdated, err := r.importOutdated(ctx, dataImportCron, dv, pvc, desiredStorageClass)
1✔
364
                if err != nil {
1✔
UNCOV
365
                        return res, err
×
UNCOV
366
                }
×
367
                if outdated {
2✔
368
                        if err := r.recreateImport(ctx, dataImportCron, desiredSc); err != nil {
1✔
369
                                return res, err
×
370
                        }
×
371
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
372
                }
373
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredSc)
1✔
374
        }
375
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
376
        if err != nil {
1✔
377
                return res, err
×
378
        }
×
379
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
380
        if err != nil {
1✔
381
                return res, err
×
382
        }
×
383

384
        handlePopulatedPvc := func() error {
2✔
385
                if pvc != nil {
2✔
386
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
387
                                return err
×
388
                        }
×
389
                }
390
                importSucceeded = true
1✔
391
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
392
                        return err
×
393
                }
×
394

395
                return nil
1✔
396
        }
397

398
        switch {
1✔
399
        case dv != nil:
1✔
400
                switch dv.Status.Phase {
1✔
401
                case cdiv1.Succeeded:
1✔
402
                        if err := handlePopulatedPvc(); err != nil {
1✔
403
                                return res, err
×
404
                        }
×
405
                case cdiv1.ImportScheduled:
1✔
406
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
407
                case cdiv1.ImportInProgress:
1✔
408
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
409
                default:
1✔
410
                        dvPhase := string(dv.Status.Phase)
1✔
411
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
412
                }
413
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
414
                if err := handlePopulatedPvc(); err != nil {
1✔
415
                        return res, err
×
416
                }
×
417
        case snapshot != nil:
1✔
418
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
419
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
420
                                return res, err
×
421
                        }
×
422
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
423
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
424
                }
425
                // Check if snapshot class changed
426
                if desiredStorageClass != nil {
2✔
427
                        changed, err := r.handleSnapshotClassChange(ctx, snapshot, desiredStorageClass.Name)
1✔
428
                        if err != nil {
1✔
429
                                return res, err
×
430
                        }
×
431
                        if changed {
1✔
432
                                return reconcile.Result{RequeueAfter: time.Second}, nil
×
433
                        }
×
434
                }
435
                // Below k8s 1.29 there's no way to know the source volume mode
436
                // Let's at least expose this info on our own snapshots
437
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
438
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
439
                        if err != nil {
1✔
440
                                return res, err
×
441
                        }
×
442
                        if volMode != nil {
2✔
443
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
444
                        }
1✔
445
                }
446
                if _, ok := snapshot.Annotations[cc.AnnAdvisedRestoreSize]; !ok {
2✔
447
                        size := inferAdvisedRestoreSizeForSnapshot(&dataImportCron.Spec.Template, snapshot, nil)
1✔
448
                        if size != nil {
2✔
449
                                cc.AddAnnotation(snapshot, cc.AnnAdvisedRestoreSize, size.String())
1✔
450
                        }
1✔
451
                }
452
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
453
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
454
                if err != nil {
2✔
455
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
456
                                return res, err
×
457
                        }
×
458
                } else {
1✔
459
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
460
                }
1✔
461
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
462
                        return res, err
×
463
                }
×
464
                importSucceeded = true
1✔
465
        default:
1✔
466
                if len(imports) > 0 {
2✔
467
                        imports = imports[1:]
1✔
468
                        dataImportCron.Status.CurrentImports = imports
1✔
469
                }
1✔
470
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
471
        }
472

473
        if importSucceeded {
2✔
474
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
475
                        return res, err
×
476
                }
×
477
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
478
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
479
                        return res, err
×
480
                }
×
481
        }
482

483
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
484
                return res, err
×
485
        }
×
486

487
        // Skip if schedule is disabled
488
        if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
489
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
490
                pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
1✔
491
                if err != nil {
2✔
492
                        return pollRes, err
1✔
493
                }
1✔
494
                res = pollRes
1✔
495
        }
496

497
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
498
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
499
        if digestUpdated {
2✔
500
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
501
                if dv != nil {
1✔
502
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
503
                                return res, err
×
504
                        }
×
505
                }
506
                if importSucceeded || len(imports) == 0 {
2✔
507
                        if err := r.createImportDataVolume(ctx, dataImportCron, desiredStorageClass); err != nil {
2✔
508
                                return res, err
1✔
509
                        }
1✔
510
                }
511
        } else if importSucceeded {
2✔
512
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
513
                        return res, err
×
514
                }
×
515
        } else if len(imports) > 0 {
2✔
516
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
517
        } else {
2✔
518
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
519
        }
1✔
520

521
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
522
                return res, err
×
523
        }
×
524

525
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
526
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
527
                        return res, err
×
528
                }
×
529
        }
530
        return res, nil
1✔
531
}
532

533
// Returns the current import DV if exists, and the last imported PVC
534
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
535
        imports := cron.Status.CurrentImports
1✔
536
        if len(imports) == 0 {
2✔
537
                return nil, nil, nil
1✔
538
        }
1✔
539

540
        dvName := imports[0].DataVolumeName
1✔
541
        dv := &cdiv1.DataVolume{}
1✔
542
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
543
                if !k8serrors.IsNotFound(err) {
1✔
544
                        return nil, nil, err
×
545
                }
×
546
                dv = nil
1✔
547
        }
548

549
        pvc := &corev1.PersistentVolumeClaim{}
1✔
550
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
551
                if !k8serrors.IsNotFound(err) {
1✔
552
                        return nil, nil, err
×
553
                }
×
554
                pvc = nil
1✔
555
        }
556
        return dv, pvc, nil
1✔
557
}
558

559
// Returns the current import DV if exists, and the last imported PVC
560
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
561
        imports := cron.Status.CurrentImports
1✔
562
        if len(imports) == 0 {
2✔
563
                return nil, nil
1✔
564
        }
1✔
565

566
        snapName := imports[0].DataVolumeName
1✔
567
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
568
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
569
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
570
                        return nil, err
×
571
                }
×
572
                return nil, nil
1✔
573
        }
574

575
        return snapshot, nil
1✔
576
}
577

578
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
579
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
580
        dataSource := &cdiv1.DataSource{}
1✔
581
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
582
                return nil, err
1✔
583
        }
1✔
584
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
585
                log := r.log.WithName("getCronManagedDataSource")
×
586
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
×
587
                return nil, ErrNotManagedByCron
×
588
        }
×
589
        return dataSource, nil
1✔
590
}
591

592
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
593
        objCopy := obj.DeepCopyObject()
1✔
594
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
595
        r.setDataImportCronResourceLabels(cron, obj)
1✔
596
        if !reflect.DeepEqual(obj, objCopy) {
2✔
597
                if err := r.client.Update(ctx, obj); err != nil {
1✔
598
                        return err
×
599
                }
×
600
        }
601
        return nil
1✔
602
}
603

604
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
605
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
606
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
607
                if cond.Status == corev1.ConditionFalse &&
×
608
                        (cond.Reason == common.GenericError || cond.Reason == ImagePullFailedReason) {
×
609
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
610
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
611
                        dv.Labels[common.DataImportCronLabel] = ""
×
612
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
613
                                return err
×
614
                        }
×
615
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
616
                                return err
×
617
                        }
×
618
                        cron.Status.CurrentImports = nil
×
619
                }
620
        }
621
        return nil
×
622
}
623

624
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
625
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
626
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
627
        if err != nil {
1✔
628
                return err
×
629
        }
×
630
        if regSource.ImageStream == nil {
1✔
631
                return nil
×
632
        }
×
633
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
634
        if err != nil {
2✔
635
                return err
1✔
636
        }
1✔
637
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
638
        if err != nil {
2✔
639
                return err
1✔
640
        }
1✔
641
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
642
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
643
                log.Info("Updating DataImportCron", "digest", digest)
1✔
644
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
645
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
646
        }
1✔
647
        return nil
1✔
648
}
649

650
func (r *DataImportCronReconciler) updateContainerImageDesiredDigest(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
651
        log := r.log.WithValues("name", cron.Name).WithValues("uid", cron.UID)
1✔
652
        podName := getPollerPodName(cron)
1✔
653
        ns := cron.Namespace
1✔
654
        nn := types.NamespacedName{Name: podName, Namespace: ns}
1✔
655
        pod := &corev1.Pod{}
1✔
656

1✔
657
        if err := r.client.Get(ctx, nn, pod); err == nil {
2✔
658
                digest, err := fetchContainerImageDigest(pod)
1✔
659
                if err != nil || digest == "" {
1✔
660
                        return false, err
×
661
                }
×
662
                cc.AddAnnotation(cron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
663
                if cron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
664
                        log.Info("Updating DataImportCron", "digest", digest)
1✔
665
                        cc.AddAnnotation(cron, AnnSourceDesiredDigest, digest)
1✔
666
                }
1✔
667
                return true, r.client.Delete(ctx, pod)
1✔
668
        } else if cc.IgnoreNotFound(err) != nil {
1✔
669
                return false, err
×
670
        }
×
671

672
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(ctx, r.client)
1✔
673
        if err != nil {
1✔
674
                return false, err
×
675
        }
×
676
        platform := cron.Spec.Template.Spec.Source.Registry.Platform
1✔
677
        if platform != nil && platform.Architecture != "" {
1✔
678
                if workloadNodePlacement.NodeSelector == nil {
×
679
                        workloadNodePlacement.NodeSelector = map[string]string{}
×
680
                }
×
681
                workloadNodePlacement.NodeSelector[corev1.LabelArchStable] = platform.Architecture
×
682
        }
683

684
        containerImage := strings.TrimPrefix(*cron.Spec.Template.Spec.Source.Registry.URL, "docker://")
1✔
685

1✔
686
        pod = &corev1.Pod{
1✔
687
                ObjectMeta: metav1.ObjectMeta{
1✔
688
                        Name:      podName,
1✔
689
                        Namespace: ns,
1✔
690
                        OwnerReferences: []metav1.OwnerReference{
1✔
691
                                {
1✔
692
                                        APIVersion:         cron.APIVersion,
1✔
693
                                        Kind:               cron.Kind,
1✔
694
                                        Name:               cron.Name,
1✔
695
                                        UID:                cron.UID,
1✔
696
                                        BlockOwnerDeletion: ptr.To[bool](true),
1✔
697
                                        Controller:         ptr.To[bool](true),
1✔
698
                                },
1✔
699
                        },
1✔
700
                },
1✔
701
                Spec: corev1.PodSpec{
1✔
702
                        TerminationGracePeriodSeconds: ptr.To[int64](0),
1✔
703
                        RestartPolicy:                 corev1.RestartPolicyNever,
1✔
704
                        NodeSelector:                  workloadNodePlacement.NodeSelector,
1✔
705
                        Tolerations:                   workloadNodePlacement.Tolerations,
1✔
706
                        Affinity:                      workloadNodePlacement.Affinity,
1✔
707
                        Volumes: []corev1.Volume{
1✔
708
                                {
1✔
709
                                        Name: "shared-volume",
1✔
710
                                        VolumeSource: corev1.VolumeSource{
1✔
711
                                                EmptyDir: &corev1.EmptyDirVolumeSource{},
1✔
712
                                        },
1✔
713
                                },
1✔
714
                        },
1✔
715
                        InitContainers: []corev1.Container{
1✔
716
                                {
1✔
717
                                        Name:                     "init",
1✔
718
                                        Image:                    r.image,
1✔
719
                                        ImagePullPolicy:          corev1.PullPolicy(r.pullPolicy),
1✔
720
                                        Command:                  []string{"sh", "-c", "cp /usr/bin/cdi-containerimage-server /shared/server"},
1✔
721
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
722
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
723
                                },
1✔
724
                        },
1✔
725
                        Containers: []corev1.Container{
1✔
726
                                {
1✔
727
                                        Name:                     "image-container",
1✔
728
                                        Image:                    containerImage,
1✔
729
                                        ImagePullPolicy:          corev1.PullAlways,
1✔
730
                                        Command:                  []string{"/shared/server", "-h"},
1✔
731
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
732
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
733
                                },
1✔
734
                        },
1✔
735
                },
1✔
736
        }
1✔
737

1✔
738
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
739
        if pod.Spec.SecurityContext != nil {
2✔
740
                pod.Spec.SecurityContext.FSGroup = nil
1✔
741
        }
1✔
742

743
        return false, r.client.Create(ctx, pod)
1✔
744
}
745

746
func fetchContainerImageDigest(pod *corev1.Pod) (string, error) {
1✔
747
        if len(pod.Status.ContainerStatuses) == 0 {
1✔
748
                return "", nil
×
749
        }
×
750

751
        status := pod.Status.ContainerStatuses[0]
1✔
752
        if status.State.Waiting != nil {
1✔
753
                reason := status.State.Waiting.Reason
×
754
                switch reason {
×
755
                case "ImagePullBackOff", "ErrImagePull", "InvalidImageName":
×
756
                        return "", errors.Errorf("%s %s: %s", common.ImagePullFailureText, status.Image, reason)
×
757
                }
758
                return "", nil
×
759
        }
760

761
        if status.State.Terminated == nil {
1✔
762
                return "", nil
×
763
        }
×
764

765
        imageID := status.ImageID
1✔
766
        if imageID == "" {
1✔
767
                return "", errors.Errorf("Container has no imageID")
×
768
        }
×
769
        idx := strings.Index(imageID, digestSha256Prefix)
1✔
770
        if idx < 0 {
1✔
771
                return "", errors.Errorf("Container image %s ID has no digest: %s", status.Image, imageID)
×
772
        }
×
773

774
        return imageID[idx:], nil
1✔
775
}
776

777
func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
778
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
779
        pvcSource, err := getCronPvcSource(dataImportCron)
1✔
780
        if err != nil {
1✔
781
                return err
×
782
        }
×
783
        ns := pvcSource.Namespace
1✔
784
        if ns == "" {
2✔
785
                ns = dataImportCron.Namespace
1✔
786
        }
1✔
787
        pvc := &corev1.PersistentVolumeClaim{}
1✔
788
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
2✔
789
                return err
1✔
790
        }
1✔
791
        digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
1✔
792
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
793
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
794
                log.Info("Updating DataImportCron", "digest", digest)
1✔
795
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
796
        }
1✔
797
        return nil
1✔
798
}
799

800
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
801
        log := r.log.WithName("updateDataSource")
1✔
802
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
803
        if err != nil {
2✔
804
                if k8serrors.IsNotFound(err) {
2✔
805
                        dataSource = r.newDataSource(dataImportCron)
1✔
806
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
807
                                return err
×
808
                        }
×
809
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
1✔
810
                } else if errors.Is(err, ErrNotManagedByCron) {
×
811
                        return nil
×
812
                } else {
×
813
                        return err
×
814
                }
×
815
        }
816
        dataSourceCopy := dataSource.DeepCopy()
1✔
817
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
818

1✔
819
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
820
        populateDataSource(format, dataSource, sourcePVC)
1✔
821

1✔
822
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
823
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
824
                        return err
×
825
                }
×
826
        }
827

828
        return nil
1✔
829
}
830

831
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
832
        if sourcePVC == nil {
2✔
833
                return
1✔
834
        }
1✔
835

836
        switch format {
1✔
837
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
838
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
839
                        PVC: sourcePVC,
1✔
840
                }
1✔
841
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
842
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
843
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
844
                                Namespace: sourcePVC.Namespace,
1✔
845
                                Name:      sourcePVC.Name,
1✔
846
                        },
1✔
847
                }
1✔
848
        }
849
}
850

851
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
852
        if dataImportCron.Status.CurrentImports == nil {
1✔
853
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
854
        }
×
855
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
856
                Namespace: dataImportCron.Namespace,
1✔
857
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
858
        }
1✔
859
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
860
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
861
                now := metav1.Now()
1✔
862
                dataImportCron.Status.LastImportTimestamp = &now
1✔
863
        }
1✔
864
        return nil
1✔
865
}
866

867
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
868
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
869
        if lastTimeStr == "" {
2✔
870
                return nil
1✔
871
        }
1✔
872
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
873
        if err != nil {
1✔
874
                return err
×
875
        }
×
876
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
877
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
878
        }
1✔
879
        return nil
1✔
880
}
881

882
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass *storagev1.StorageClass) error {
1✔
883
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
884
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
885
        if digest == "" {
1✔
886
                return nil
×
887
        }
×
888
        dvName, err := createDvName(dataSourceName, digest)
1✔
889
        if err != nil {
2✔
890
                return err
1✔
891
        }
1✔
892

893
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
894
        for _, src := range sources {
2✔
895
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
896
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
897
                                return err
×
898
                        }
×
899
                } else {
1✔
900
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
901
                                return err
×
902
                        }
×
903
                        // If source exists don't create DV
904
                        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
905
                        return nil
1✔
906
                }
907
        }
908

909
        storageProfile := &cdiv1.StorageProfile{}
1✔
910
        if desiredStorageClass != nil {
2✔
911
                if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
912
                        return err
×
913
                }
×
914
        }
915
        dv := r.newSourceDataVolume(dataImportCron, dvName, storageProfile)
1✔
916
        if allowed, err := r.authorizeCloneDataVolume(dataImportCron, dv); err != nil {
1✔
917
                return err
×
918
        } else if !allowed {
2✔
919
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse,
1✔
920
                        "Not authorized to create DataVolume", notAuthorized)
1✔
921
                return nil
1✔
922
        }
1✔
923
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
924
                return err
×
925
        }
×
926
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
927

1✔
928
        return nil
1✔
929
}
930

931
func (r *DataImportCronReconciler) authorizeCloneDataVolume(dataImportCron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) (bool, error) {
1✔
932
        if !isPvcSource(dataImportCron) {
2✔
933
                return true, nil
1✔
934
        }
1✔
935
        saName := "default"
1✔
936
        if dataImportCron.Spec.ServiceAccountName != nil {
1✔
937
                saName = *dataImportCron.Spec.ServiceAccountName
×
938
        }
×
939
        if resp, err := dv.AuthorizeSA(dv.Namespace, dv.Name, &authProxy{r.client}, dataImportCron.Namespace, saName); err != nil {
1✔
940
                return false, err
×
941
        } else if !resp.Allowed {
2✔
942
                r.log.Info("Not authorized to create DataVolume", "cron", dataImportCron.Name, "reason", resp.Reason)
1✔
943
                return false, nil
1✔
944
        }
1✔
945

946
        return true, nil
1✔
947
}
948

949
type authProxy struct {
950
        client client.Client
951
}
952

953
func (p *authProxy) CreateSar(sar *authorizationv1.SubjectAccessReview) (*authorizationv1.SubjectAccessReview, error) {
1✔
954
        if err := p.client.Create(context.TODO(), sar); err != nil {
1✔
955
                return nil, err
×
956
        }
×
957
        return sar, nil
1✔
958
}
959

960
func (p *authProxy) GetNamespace(name string) (*corev1.Namespace, error) {
1✔
961
        ns := &corev1.Namespace{}
1✔
962
        if err := p.client.Get(context.TODO(), types.NamespacedName{Name: name}, ns); err != nil {
1✔
963
                return nil, err
×
964
        }
×
965
        return ns, nil
1✔
966
}
967

968
func (p *authProxy) GetDataSource(namespace, name string) (*cdiv1.DataSource, error) {
×
969
        das := &cdiv1.DataSource{}
×
970
        if err := p.client.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, das); err != nil {
×
971
                return nil, err
×
972
        }
×
973
        return das, nil
×
974
}
975

976
func (r *DataImportCronReconciler) recreateImport(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
1✔
977
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
978
        if !ok {
1✔
979
                // nothing to delete
×
980
                return nil
×
981
        }
×
982
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
983
        if err != nil {
1✔
984
                return err
×
985
        }
×
986
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
1✔
987
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
1✔
988
        for _, src := range sources {
2✔
989
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
990
                        return err
×
991
                }
×
992
        }
993
        for _, src := range sources {
2✔
994
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
1✔
995
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
×
996
                }
×
997
        }
998
        // Only update desired storage class once garbage collection went through
999
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
1✔
1000
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
1001
        if err != nil {
1✔
1002
                return err
×
1003
        }
×
1004

1005
        return nil
1✔
1006
}
1007

1008
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
1009
        switch format {
1✔
1010
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1011
                return nil
1✔
1012
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1013
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
1014
        default:
×
1015
                return fmt.Errorf("unknown source format for snapshot")
×
1016
        }
1017
}
1018

1019
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
1020
        if pvc == nil {
1✔
1021
                return nil
×
1022
        }
×
1023
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
1024
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
1025
                return nil
1✔
1026
        }
1✔
1027
        storageProfile := &cdiv1.StorageProfile{}
1✔
1028
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1029
                return err
×
1030
        }
×
1031

1032
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
1033
                ObjectMeta: metav1.ObjectMeta{
1✔
1034
                        Name:      pvc.Name,
1✔
1035
                        Namespace: dataImportCron.Namespace,
1✔
1036
                        Labels: map[string]string{
1✔
1037
                                common.CDILabelKey:       common.CDILabelValue,
1✔
1038
                                common.CDIComponentLabel: "",
1✔
1039
                        },
1✔
1040
                },
1✔
1041
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
1042
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
1043
                                PersistentVolumeClaimName: &pvc.Name,
1✔
1044
                        },
1✔
1045
                },
1✔
1046
        }
1✔
1047
        // Select VolumeSnapshotClass for boot source snapshot
1✔
1048
        snapshotClassName, err := r.getSnapshotClassForDataImportCron(pvc, storageProfile)
1✔
1049
        if err != nil {
1✔
1050
                return err
×
1051
        }
×
1052
        if snapshotClassName != "" {
2✔
1053
                desiredSnapshot.Spec.VolumeSnapshotClassName = &snapshotClassName
1✔
1054
        }
1✔
1055
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
1056
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
1✔
1057

1✔
1058
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
1059
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
1060
                if !k8serrors.IsNotFound(err) {
1✔
1061
                        return err
×
1062
                }
×
1063
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1064
                pvcSize := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
1✔
1065
                size := inferAdvisedRestoreSizeForSnapshot(&dataImportCron.Spec.Template, desiredSnapshot, &pvcSize)
1✔
1066
                if size != nil {
2✔
1067
                        cc.AddAnnotation(desiredSnapshot, cc.AnnAdvisedRestoreSize, size.String())
1✔
1068
                }
1✔
1069
                if pvc.Spec.VolumeMode != nil {
2✔
1070
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
1071
                }
1✔
1072
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
1073
                        return err
×
1074
                }
×
1075
        } else {
1✔
1076
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
1077
                        // Clean up DV/PVC as they are not needed anymore
1✔
1078
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
1079
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
1080
                                return err
×
1081
                        }
×
1082
                }
1083
        }
1084

1085
        return nil
1✔
1086
}
1087

1088
func (r *DataImportCronReconciler) handleSnapshotClassChange(ctx context.Context, snapshot *snapshotv1.VolumeSnapshot, storageClassName string) (bool, error) {
1✔
1089
        sp := &cdiv1.StorageProfile{}
1✔
1090
        if err := r.client.Get(ctx, types.NamespacedName{Name: storageClassName}, sp); err != nil {
1✔
1091
                return false, client.IgnoreNotFound(err)
×
1092
        }
×
1093

1094
        desiredVSC, err := r.getSnapshotClassForDataImportCron(nil, sp)
1✔
1095
        if err != nil {
1✔
1096
                return false, err
×
1097
        }
×
1098
        actualVSC := ""
1✔
1099
        if snapshot.Spec.VolumeSnapshotClassName != nil {
1✔
1100
                actualVSC = *snapshot.Spec.VolumeSnapshotClassName
×
1101
        }
×
1102
        if desiredVSC == "" || actualVSC == desiredVSC {
2✔
1103
                return false, nil
1✔
1104
        }
1✔
1105

1106
        r.log.Info("Snapshot class changed, deleting", "name", snapshot.Name, "from", actualVSC, "to", desiredVSC)
×
1107
        if err := r.client.Delete(ctx, snapshot); err != nil {
×
1108
                return false, client.IgnoreNotFound(err)
×
1109
        }
×
1110
        return true, nil
×
1111
}
1112

1113
func (r *DataImportCronReconciler) importOutdated(ctx context.Context, cron *cdiv1.DataImportCron, currentDV *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim, desiredSC *storagev1.StorageClass) (bool, error) {
1✔
1114
        currentSc, hasCurrent := cron.Annotations[AnnStorageClass]
1✔
1115
        scChanged := currentSc != desiredSC.Name
1✔
1116
        needsReset := hasCurrent || len(cron.Status.CurrentImports) > 0
1✔
1117
        if scChanged && needsReset {
2✔
1118
                r.log.Info("Storage class changed, resetting import", "currentSc", currentSc, "desiredSc", desiredSC.Name)
1✔
1119
                return true, nil
1✔
1120
        }
1✔
1121

1122
        if pvc != nil && pvc.Status.Phase == corev1.ClaimPending &&
1✔
1123
                pvc.Labels[common.DataImportCronLabel] == cron.Name &&
1✔
1124
                pvc.Spec.StorageClassName != nil && *pvc.Spec.StorageClassName != desiredSC.Name {
2✔
1125
                r.log.Info("Pending PVC has outdated storage class, resetting import", "pvc", pvc.Name, "pvcSc", *pvc.Spec.StorageClassName, "desiredSc", desiredSC.Name)
1✔
1126
                return true, nil
1✔
1127
        }
1✔
1128

1129
        if currentDV != nil {
2✔
1130
                sp := &cdiv1.StorageProfile{}
1✔
1131
                if err := r.client.Get(ctx, types.NamespacedName{Name: desiredSC.Name}, sp); err != nil {
1✔
NEW
1132
                        return false, err
×
NEW
1133
                }
×
1134
                desiredDV := r.newSourceDataVolume(cron, currentDV.Name, sp)
1✔
1135
                if !reflect.DeepEqual(currentDV.Spec, desiredDV.Spec) {
2✔
1136
                        r.log.Info("Import DV spec changed, resetting import", "dv", currentDV.Name)
1✔
1137
                        return true, nil
1✔
1138
                }
1✔
1139
        }
1140

1141
        return false, nil
1✔
1142
}
1143

1144
// getSnapshotClassForDataImportCron returns the VolumeSnapshotClass name to use for DataImportCron snapshots.
1145
func (r *DataImportCronReconciler) getSnapshotClassForDataImportCron(pvc *corev1.PersistentVolumeClaim, storageProfile *cdiv1.StorageProfile) (string, error) {
1✔
1146
        if vscName := storageProfile.Annotations[cc.AnnSnapshotClassForDataImportCron]; vscName != "" {
2✔
1147
                return vscName, nil
1✔
1148
        }
1✔
1149
        return cc.GetSnapshotClassForSmartClone(pvc, &storageProfile.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
1150
}
1151

1152
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
1153
        dataImportCron.Status.SourceFormat = &format
1✔
1154

1✔
1155
        switch format {
1✔
1156
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1157
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1158
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1159
                if snapshot == nil {
2✔
1160
                        // Snapshot create/update will trigger reconcile
1✔
1161
                        return nil
1✔
1162
                }
1✔
1163
                if cc.IsSnapshotReady(snapshot) {
2✔
1164
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1165
                } else {
2✔
1166
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
1167
                }
1✔
1168
        default:
×
1169
                return fmt.Errorf("unknown source format for snapshot")
×
1170
        }
1171

1172
        return nil
1✔
1173
}
1174

1175
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
1176
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
1177
        if desiredStorageClass == nil {
2✔
1178
                return format, nil
1✔
1179
        }
1✔
1180

1181
        storageProfile := &cdiv1.StorageProfile{}
1✔
1182
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1183
                return format, err
×
1184
        }
×
1185
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
1186
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
1187
        }
1✔
1188

1189
        return format, nil
1✔
1190
}
1191

1192
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
1193
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
1194
                return nil
×
1195
        }
×
1196
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
1197
        if err != nil {
1✔
1198
                return err
×
1199
        }
×
1200

1201
        maxImports := defaultImportsToKeepPerCron
1✔
1202

1✔
1203
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
1204
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
1205
        }
1✔
1206

1207
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
1208
                return err
×
1209
        }
×
1210
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
1211
                return err
×
1212
        }
×
1213

1214
        return nil
1✔
1215
}
1216

1217
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
1218
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
1219

1✔
1220
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1221
                return err
×
1222
        }
×
1223
        if len(pvcList.Items) > maxImports {
2✔
1224
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
1225
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
1226
                })
1✔
1227
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
1228
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1229
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
1230
                                return err
×
1231
                        }
×
1232
                }
1233
        }
1234

1235
        dvList := &cdiv1.DataVolumeList{}
1✔
1236
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1237
                return err
×
1238
        }
×
1239

1240
        if len(dvList.Items) > maxImports {
2✔
1241
                for _, dv := range dvList.Items {
2✔
1242
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
1243
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
1244
                                return err
×
1245
                        }
×
1246

1247
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1248
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1249
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
1250
                                        return err
×
1251
                                }
×
1252
                        }
1253
                }
1254
        }
1255

1256
        return nil
1✔
1257
}
1258

1259
// deleteDvPvc deletes DV or PVC if DV was GCed
1260
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
1261
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
1262
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
1263
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
1264
                return err
1✔
1265
        }
1✔
1266
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
1267
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
1268
                return err
×
1269
        }
×
1270
        return nil
1✔
1271
}
1272

1273
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
1274
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
1275

1✔
1276
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1277
                if meta.IsNoMatchError(err) {
×
1278
                        return nil
×
1279
                }
×
1280
                return err
×
1281
        }
1282
        if len(snapList.Items) > maxImports {
1✔
1283
                sort.Slice(snapList.Items, func(i, j int) bool {
×
1284
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
1285
                })
×
1286
                for _, snap := range snapList.Items[maxImports:] {
×
1287
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
1288
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
1289
                                return err
×
1290
                        }
×
1291
                }
1292
        }
1293

1294
        return nil
1✔
1295
}
1296

1297
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
1298
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
1299
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
1300

1✔
1301
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
1302
                return err
×
1303
        }
×
1304
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
1305
        if err != nil {
1✔
1306
                return err
×
1307
        }
×
1308
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
1309
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
1310
                return err
×
1311
        }
×
1312
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
1313
                return err
×
1314
        }
×
1315
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
1316
                return err
×
1317
        }
×
1318
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
1319
                return err
×
1320
        }
×
1321
        return nil
1✔
1322
}
1323

1324
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
1325
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
1326
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
1327
        if err != nil {
1✔
1328
                return err
×
1329
        }
×
1330
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
1331
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
1332
                return err
×
1333
        }
×
1334
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
1335
                return err
×
1336
        }
×
1337

1338
        return nil
1✔
1339
}
1340

1341
// NewDataImportCronController creates a new instance of the DataImportCron controller
1342
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
1343
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
1344
                Scheme: mgr.GetScheme(),
×
1345
                Mapper: mgr.GetRESTMapper(),
×
1346
        })
×
1347
        if err != nil {
×
1348
                return nil, err
×
1349
        }
×
1350
        reconciler := &DataImportCronReconciler{
×
1351
                client:          mgr.GetClient(),
×
1352
                uncachedClient:  uncachedClient,
×
1353
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1354
                scheme:          mgr.GetScheme(),
×
1355
                log:             log.WithName(dataImportControllerName),
×
1356
                image:           importerImage,
×
1357
                pullPolicy:      pullPolicy,
×
1358
                cdiNamespace:    util.GetNamespace(),
×
1359
                installerLabels: installerLabels,
×
1360
        }
×
1361
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1362
                MaxConcurrentReconciles: 3,
×
1363
                Reconciler:              reconciler,
×
1364
        })
×
1365
        if err != nil {
×
1366
                return nil, err
×
1367
        }
×
1368
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
1369
                return nil, err
×
1370
        }
×
1371
        log.Info("Initialized DataImportCron controller")
×
1372
        return dataImportCronController, nil
×
1373
}
1374

1375
func getCronName(obj client.Object) string {
×
1376
        return obj.GetLabels()[common.DataImportCronLabel]
×
1377
}
×
1378

1379
func getCronNs(obj client.Object) string {
×
1380
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
1381
}
×
1382

1383
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
1384
        if cronName := getCronName(obj); cronName != "" {
×
1385
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
1386
        }
×
1387
        return nil
×
1388
}
1389

1390
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1391
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1392
                return err
×
1393
        }
×
1394

1395
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1396
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1397
                // Otherwise we risk losing the storage profile event
×
1398
                var crons cdiv1.DataImportCronList
×
1399
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
1400
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
1401
                        return nil
×
1402
                }
×
1403
                // Storage profiles are 1:1 to storage classes
1404
                scName := obj.GetName()
×
1405
                var reqs []reconcile.Request
×
1406
                for _, cron := range crons.Items {
×
1407
                        dataVolume := cron.Spec.Template
×
1408
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1409
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1410
                        if err != nil || templateSc == nil {
×
1411
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1412
                                return reqs
×
1413
                        }
×
1414
                        if templateSc.Name == scName {
×
1415
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1416
                        }
×
1417
                }
1418
                return reqs
×
1419
        }
1420

1421
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1422
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1423
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1424
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1425
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1426
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1427
                },
1428
        )); err != nil {
×
1429
                return err
×
1430
        }
×
1431

1432
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1433
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1434
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1435
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1436
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1437
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1438
                },
1439
        )); err != nil {
×
1440
                return err
×
1441
        }
×
1442

1443
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1444
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1445
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1446
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1447
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1448
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1449
                },
1450
        )); err != nil {
×
1451
                return err
×
1452
        }
×
1453

1454
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1455
                return err
×
1456
        }
×
1457

1458
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1459
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1460
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
1461
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1462
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1463
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1464
                                return dicRelevantFieldsChanged(e.ObjectOld, e.ObjectNew)
×
1465
                        },
×
1466
                },
1467
        )); err != nil {
×
1468
                return err
×
1469
        }
×
1470

1471
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1472
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1473
        }
×
1474

1475
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1476
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1477
                predicate.TypedFuncs[*batchv1.CronJob]{
×
1478
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1479
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1480
                        },
×
1481
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
1482
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1483
                },
1484
        )); err != nil {
×
1485
                return err
×
1486
        }
×
1487

1488
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1489
                if meta.IsNoMatchError(err) {
×
1490
                        // Back out if there's no point to attempt watch
×
1491
                        return nil
×
1492
                }
×
1493
                if !cc.IsErrCacheNotStarted(err) {
×
1494
                        return err
×
1495
                }
×
1496
        }
1497
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1498
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
1499
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1500
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1501
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1502
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1503
                },
1504
        )); err != nil {
×
1505
                return err
×
1506
        }
×
1507

1508
        return nil
×
1509
}
1510

1511
func dicRelevantFieldsChanged(oldSp, newSp *cdiv1.StorageProfile) bool {
×
1512
        sourceFormatChanged := oldSp.Status.DataImportCronSourceFormat != newSp.Status.DataImportCronSourceFormat
×
1513
        rwoAnnotationChanged := oldSp.Annotations[cc.AnnUseReadWriteOnceForDataImportCron] != newSp.Annotations[cc.AnnUseReadWriteOnceForDataImportCron]
×
1514
        snapshotClassChanged := oldSp.Annotations[cc.AnnSnapshotClassForDataImportCron] != newSp.Annotations[cc.AnnSnapshotClassForDataImportCron]
×
1515
        return sourceFormatChanged || rwoAnnotationChanged || snapshotClassChanged
×
1516
}
×
1517

1518
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1519
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1520
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1521
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
1522
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1523
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1524
                                log.Info("Update", "sc", obj.GetName(),
×
1525
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1526
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1527
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
×
1528
                                if err != nil {
×
1529
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
1530
                                }
×
1531
                                return reqs
×
1532
                        },
1533
                ),
1534
                predicate.TypedFuncs[*storagev1.StorageClass]{
1535
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1536
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1537
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1538
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1539
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1540
                        },
×
1541
                },
1542
        )); err != nil {
×
1543
                return err
×
1544
        }
×
1545

1546
        return nil
×
1547
}
1548

1549
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1550
        dicList := &cdiv1.DataImportCronList{}
×
1551
        if err := c.List(ctx, dicList); err != nil {
×
1552
                return nil, err
×
1553
        }
×
1554
        reqs := []reconcile.Request{}
×
1555
        for _, dic := range dicList.Items {
×
1556
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
1557
                        continue
×
1558
                }
1559

1560
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1561
        }
1562

1563
        return reqs, nil
×
1564
}
1565

1566

1567
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1568
        cronJob := &batchv1.CronJob{}
1✔
1569
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1570
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1571
                return false, cc.IgnoreNotFound(err)
1✔
1572
        }
1✔
1573

1574
        cronJobCopy := cronJob.DeepCopy()
1✔
1575
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1576
                return false, err
×
1577
        }
×
1578

1579
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1580
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1581
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
1582
                        return false, cc.IgnoreNotFound(err)
×
1583
                }
×
1584
        }
1585
        return true, nil
1✔
1586
}
1587

1588
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1589
        cronJob := &batchv1.CronJob{
1✔
1590
                ObjectMeta: metav1.ObjectMeta{
1✔
1591
                        Name:      GetCronJobName(cron),
1✔
1592
                        Namespace: r.cdiNamespace,
1✔
1593
                },
1✔
1594
        }
1✔
1595
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
1596
                return nil, err
×
1597
        }
×
1598
        return cronJob, nil
1✔
1599
}
1600

1601
// InitPollerPod inits poller Pod
1602
func InitPollerPod(c client.Client, cron *cdiv1.DataImportCron, pod *corev1.PodTemplateSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1603
        regSource, err := getCronRegistrySource(cron)
1✔
1604
        if err != nil {
1✔
1605
                return err
×
1606
        }
×
1607
        if regSource.URL == nil {
1✔
1608
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1609
        }
×
1610
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1611
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
1612
                return err
×
1613
        }
×
1614
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1615
        if err != nil {
1✔
1616
                return err
×
1617
        }
×
1618
        container := corev1.Container{
1✔
1619
                Name:  "cdi-source-update-poller",
1✔
1620
                Image: image,
1✔
1621
                Command: []string{
1✔
1622
                        "/usr/bin/cdi-source-update-poller",
1✔
1623
                        "-ns", cron.Namespace,
1✔
1624
                        "-cron", cron.Name,
1✔
1625
                        "-url", *regSource.URL,
1✔
1626
                },
1✔
1627
                ImagePullPolicy:          pullPolicy,
1✔
1628
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1629
                TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
1630
        }
1✔
1631

1✔
1632
        var volumes []corev1.Volume
1✔
1633
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1634
        if hasCertConfigMap {
1✔
1635
                vm := corev1.VolumeMount{
×
1636
                        Name:      CertVolName,
×
1637
                        MountPath: common.ImporterCertDir,
×
1638
                }
×
1639
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1640
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
1641
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
1642
        }
×
1643

1644
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1645
                vm := corev1.VolumeMount{
1✔
1646
                        Name:      ProxyCertVolName,
1✔
1647
                        MountPath: common.ImporterProxyCertDir,
1✔
1648
                }
1✔
1649
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1650
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1651
        }
1✔
1652

1653
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
1654
                container.Env = append(container.Env,
×
1655
                        corev1.EnvVar{
×
1656
                                Name: common.ImporterAccessKeyID,
×
1657
                                ValueFrom: &corev1.EnvVarSource{
×
1658
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1659
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1660
                                                        Name: *regSource.SecretRef,
×
1661
                                                },
×
1662
                                                Key: common.KeyAccess,
×
1663
                                        },
×
1664
                                },
×
1665
                        },
×
1666
                        corev1.EnvVar{
×
1667
                                Name: common.ImporterSecretKey,
×
1668
                                ValueFrom: &corev1.EnvVarSource{
×
1669
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1670
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1671
                                                        Name: *regSource.SecretRef,
×
1672
                                                },
×
1673
                                                Key: common.KeySecret,
×
1674
                                        },
×
1675
                                },
×
1676
                        },
×
1677
                )
×
1678
        }
×
1679

1680
        addEnvVar := func(varName, value string) {
2✔
1681
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1682
        }
1✔
1683

1684
        if insecureTLS {
1✔
1685
                addEnvVar(common.InsecureTLSVar, "true")
×
1686
        }
×
1687

1688
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1689
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1690
                        addEnvVar(varName, value)
1✔
1691
                }
1✔
1692
        }
1693

1694
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1695
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1696
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1697

1✔
1698
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1699
        if err != nil {
1✔
1700
                return err
×
1701
        }
×
1702
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1703
        if err != nil {
1✔
1704
                return err
×
1705
        }
×
1706

1707
        podSpec := &pod.Spec
1✔
1708

1✔
1709
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1710
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1711
        podSpec.Containers = []corev1.Container{container}
1✔
1712
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1713
        podSpec.Volumes = volumes
1✔
1714
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1715
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1716
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1717
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1718

1✔
1719
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1720
        // No need for specifid uid/fsgroup here since this doesn't write or use qemu
1✔
1721
        if podSpec.SecurityContext != nil {
2✔
1722
                podSpec.SecurityContext.FSGroup = nil
1✔
1723
        }
1✔
1724
        if podSpec.Containers[0].SecurityContext != nil {
2✔
1725
                podSpec.Containers[0].SecurityContext.RunAsUser = nil
1✔
1726
        }
1✔
1727

1728
        if pod.Labels == nil {
2✔
1729
                pod.Labels = map[string]string{}
1✔
1730
        }
1✔
1731
        pod.Labels[common.DataImportCronPollerLabel] = ""
1✔
1732

1✔
1733
        return nil
1✔
1734
}
1735

1736
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1737
        cronJobSpec := &cronJob.Spec
1✔
1738
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1739
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1740
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1741
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1742

1✔
1743
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1744
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1745
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1746
        cc.AddAnnotation(&jobSpec.Template, secv1.RequiredSCCAnnotation, common.RestrictedSCCName)
1✔
1747

1✔
1748
        pod := &jobSpec.Template
1✔
1749
        if err := InitPollerPod(r.client, cron, pod, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1750
                return err
×
1751
        }
×
1752
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1753
                return err
×
1754
        }
×
1755
        return nil
1✔
1756
}
1757

1758
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1759
        job := &batchv1.Job{
1✔
1760
                ObjectMeta: metav1.ObjectMeta{
1✔
1761
                        Name:      GetInitialJobName(cron),
1✔
1762
                        Namespace: cronJob.Namespace,
1✔
1763
                },
1✔
1764
                Spec: cronJob.Spec.JobTemplate.Spec,
1✔
1765
        }
1✔
1766
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1767
                return nil, err
×
1768
        }
×
1769
        return job, nil
1✔
1770
}
1771

1772
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1773
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1774
                return err
×
1775
        }
×
1776
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1777
        labels := obj.GetLabels()
1✔
1778
        labels[common.DataImportCronNsLabel] = cron.Namespace
1✔
1779
        labels[common.DataImportCronLabel] = cron.Name
1✔
1780
        obj.SetLabels(labels)
1✔
1781
        return nil
1✔
1782
}
1783

1784
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string, storageProfile *cdiv1.StorageProfile) *cdiv1.DataVolume {
1✔
1785
        dv := cron.Spec.Template.DeepCopy()
1✔
1786
        if isCronRegistrySource(cron) {
2✔
1787
                var digestedURL string
1✔
1788
                if isURLSource(cron) {
2✔
1789
                        digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1790
                } else if isImageStreamSource(cron) {
3✔
1791
                        // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1792
                        digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1793
                        dv.Spec.Source.Registry.ImageStream = nil
1✔
1794
                }
1✔
1795
                dv.Spec.Source.Registry.URL = &digestedURL
1✔
1796
        }
1797
        dv.Name = dataVolumeName
1✔
1798
        dv.Namespace = cron.Namespace
1✔
1799
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1800
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1801
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1802
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1803

1✔
1804
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
1805
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1806
        }
1✔
1807

1808
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1809

1✔
1810
        // Apply RWO access mode as default for DataImportCron (from StorageProfile annotation)
1✔
1811
        // Only applies if the DV doesn't already have AccessModes configured
1✔
1812
        if storageProfile != nil && storageProfile.Annotations[cc.AnnUseReadWriteOnceForDataImportCron] == "true" {
2✔
1813
                if dv.Spec.Storage != nil && len(dv.Spec.Storage.AccessModes) == 0 {
2✔
1814
                        dv.Spec.Storage.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}
1✔
1815
                }
1✔
1816
        }
1817

1818
        return dv
1✔
1819
}
1820

1821
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1822
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1823
        cc.CopyAllowedLabels(cron.GetLabels(), obj, true)
1✔
1824
        labels := obj.GetLabels()
1✔
1825
        labels[common.DataImportCronLabel] = cron.Name
1✔
1826
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
2✔
1827
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1828
        }
1✔
1829
        obj.SetLabels(labels)
1✔
1830
}
1831

1832
func untagDigestedDockerURL(dockerURL string) string {
1✔
1833
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1834
                url := u.Host + u.Path
1✔
1835
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1836
                // Check for tag
1✔
1837
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1838
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
2✔
1839
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1840
                        }
1✔
1841
                }
1842
        }
1843
        return dockerURL
1✔
1844
}
1845

1846
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1847
        if val := cron.Labels[ann]; val != "" {
2✔
1848
                cc.AddLabel(dv, ann, val)
1✔
1849
        }
1✔
1850
}
1851

1852
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1853
        if val := cron.Annotations[ann]; val != "" {
1✔
1854
                cc.AddAnnotation(dv, ann, val)
×
1855
        }
×
1856
}
1857

1858
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1✔
1859
        dataSource := &cdiv1.DataSource{
1✔
1860
                ObjectMeta: metav1.ObjectMeta{
1✔
1861
                        Name:      cron.Spec.ManagedDataSource,
1✔
1862
                        Namespace: cron.Namespace,
1✔
1863
                },
1✔
1864
        }
1✔
1865
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1866
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1867
        cc.CopyAllowedLabels(cron.GetLabels(), dataSource, true)
1✔
1868
        return dataSource
1✔
1869
}
1✔
1870

1871
// Create DataVolume name based on the DataSource name + prefix of the digest
1872
func createDvName(prefix, digest string) (string, error) {
1✔
1873
        digestPrefix := ""
1✔
1874
        if strings.HasPrefix(digest, digestSha256Prefix) {
2✔
1875
                digestPrefix = digestSha256Prefix
1✔
1876
        } else if strings.HasPrefix(digest, digestUIDPrefix) {
3✔
1877
                digestPrefix = digestUIDPrefix
1✔
1878
        } else {
2✔
1879
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1880
        }
1✔
1881
        fromIdx := len(digestPrefix)
1✔
1882
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1883
        if len(digest) < toIdx {
2✔
1884
                return "", errors.Errorf("Digest is too short")
1✔
1885
        }
1✔
1886
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1887
}
1888

1889
// GetCronJobName get CronJob name based on cron name and UID
1890
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1891
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1892
}
1✔
1893

1894
// GetInitialJobName get initial job name based on cron name and UID
1895
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1896
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1897
}
1✔
1898

1899
func getPollerPodName(cron *cdiv1.DataImportCron) string {
1✔
1900
        return naming.GetResourceName("poller-"+cron.Name, string(cron.UID)[:8])
1✔
1901
}
1✔
1902

1903
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1904
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1905
}
1✔
1906

1907
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1908
        dv := &cron.Spec.Template
1✔
1909

1✔
1910
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1911
                return explicitVolumeMode, nil
×
1912
        }
×
1913

1914
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1915
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1916
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1917
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1918
                        AccessModes:      accessModes,
1✔
1919
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1920
                        Resources: corev1.VolumeResourceRequirements{
1✔
1921
                                Requests: corev1.ResourceList{
1✔
1922
                                        // Doesn't matter
1✔
1923
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1924
                                },
1✔
1925
                        },
1✔
1926
                },
1✔
1927
        }
1✔
1928
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1929
                return nil, err
×
1930
        }
×
1931

1932
        return inferredPvc.Spec.VolumeMode, nil
1✔
1933
}
1934

1935
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1936
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1937
        if dv.Spec.PVC != nil {
1✔
1938
                return dv.Spec.PVC.VolumeMode
×
1939
        }
×
1940

1941
        if dv.Spec.Storage != nil {
2✔
1942
                return dv.Spec.Storage.VolumeMode
1✔
1943
        }
1✔
1944

1945
        return nil
×
1946
}
1947

1948
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1949
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1950
        if dv.Spec.PVC != nil {
1✔
1951
                return dv.Spec.PVC.AccessModes
×
1952
        }
×
1953

1954
        if dv.Spec.Storage != nil {
2✔
1955
                return dv.Spec.Storage.AccessModes
1✔
1956
        }
1✔
1957

1958
        return nil
×
1959
}
1960

1961
func inferAdvisedRestoreSizeForSnapshot(dv *cdiv1.DataVolume, snapshot *snapshotv1.VolumeSnapshot, fallback *resource.Quantity) *resource.Quantity {
1✔
1962
        var dvSize resource.Quantity
1✔
1963

1✔
1964
        if dv.Spec.PVC != nil {
1✔
1965
                dvSize = dv.Spec.PVC.Resources.Requests[corev1.ResourceStorage]
×
1966
        }
×
1967

1968
        if dv.Spec.Storage != nil {
2✔
1969
                dvSize = dv.Spec.Storage.Resources.Requests[corev1.ResourceStorage]
1✔
1970
        }
1✔
1971

1972
        if dvSize.IsZero() && fallback != nil {
2✔
1973
                return fallback
1✔
1974
        }
1✔
1975

1976
        if snapshot.Status != nil {
2✔
1977
                if rs := snapshot.Status.RestoreSize; rs != nil && dvSize.Cmp(*rs) < 0 {
2✔
1978
                        return snapshot.Status.RestoreSize
1✔
1979
                }
1✔
1980
        }
1981

1982
        return &dvSize
1✔
1983
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc