• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5661

11 Nov 2025 01:07AM UTC coverage: 58.782% (-0.3%) from 59.076%
#5661

Pull #3883

travis-ci

Davo911
Fix buggy manifest

Signed-off-by: Thomas-David Griedel griedel911@gmail.com
Pull Request #3883: Publish test images in a multiarch manifest

17273 of 29385 relevant lines covered (58.78%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.7
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        secv1 "github.com/openshift/api/security/v1"
33
        "github.com/pkg/errors"
34
        cronexpr "github.com/robfig/cron/v3"
35

36
        batchv1 "k8s.io/api/batch/v1"
37
        corev1 "k8s.io/api/core/v1"
38
        storagev1 "k8s.io/api/storage/v1"
39
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
40
        "k8s.io/apimachinery/pkg/api/meta"
41
        "k8s.io/apimachinery/pkg/api/resource"
42
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
43
        "k8s.io/apimachinery/pkg/labels"
44
        "k8s.io/apimachinery/pkg/runtime"
45
        "k8s.io/apimachinery/pkg/types"
46
        "k8s.io/client-go/tools/record"
47
        openapicommon "k8s.io/kube-openapi/pkg/common"
48
        "k8s.io/utils/ptr"
49

50
        "sigs.k8s.io/controller-runtime/pkg/client"
51
        "sigs.k8s.io/controller-runtime/pkg/controller"
52
        "sigs.k8s.io/controller-runtime/pkg/event"
53
        "sigs.k8s.io/controller-runtime/pkg/handler"
54
        "sigs.k8s.io/controller-runtime/pkg/manager"
55
        "sigs.k8s.io/controller-runtime/pkg/predicate"
56
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
57
        "sigs.k8s.io/controller-runtime/pkg/source"
58

59
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
60
        "kubevirt.io/containerized-data-importer/pkg/common"
61
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
62
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
63
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
64
        "kubevirt.io/containerized-data-importer/pkg/operator"
65
        "kubevirt.io/containerized-data-importer/pkg/util"
66
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
67
)
68

69
const (
70
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
71
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
72
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
73
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
74
)
75

76
// DataImportCronReconciler members
77
type DataImportCronReconciler struct {
78
        client          client.Client
79
        uncachedClient  client.Client
80
        recorder        record.EventRecorder
81
        scheme          *runtime.Scheme
82
        log             logr.Logger
83
        image           string
84
        pullPolicy      string
85
        cdiNamespace    string
86
        installerLabels map[string]string
87
}
88

89
const (
90
        // AnnSourceDesiredDigest is the digest of the pending updated image
91
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
92
        // AnnImageStreamDockerRef is the ImageStream Docker reference
93
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
94
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
95
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
96
        // AnnLastCronTime is the cron last execution time stamp
97
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
98
        // AnnLastUseTime is the PVC last use time stamp
99
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
100
        // AnnStorageClass is the cron DV's storage class
101
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
102

103
        dataImportControllerName    = "dataimportcron-controller"
104
        digestSha256Prefix          = "sha256:"
105
        digestUIDPrefix             = "uid:"
106
        digestDvNameSuffixLength    = 12
107
        cronJobUIDSuffixLength      = 8
108
        defaultImportsToKeepPerCron = 3
109
)
110

111
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
112

113
// Reconcile loop for DataImportCronReconciler
114
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
115
        dataImportCron := &cdiv1.DataImportCron{}
1✔
116
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
117
                return reconcile.Result{}, err
1✔
118
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
119
                err := r.cleanup(ctx, req.NamespacedName)
1✔
120
                return reconcile.Result{}, err
1✔
121
        }
1✔
122
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
123
        if !shouldReconcile || err != nil {
1✔
124
                return reconcile.Result{}, err
×
125
        }
×
126

127
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
128
                return reconcile.Result{}, err
×
129
        }
×
130

131
        return r.update(ctx, dataImportCron)
1✔
132
}
133

134
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
135
        dataSource := &cdiv1.DataSource{}
1✔
136
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
137
                if k8serrors.IsNotFound(err) {
2✔
138
                        return true, nil
1✔
139
                }
1✔
140
                return false, err
×
141
        }
142
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
143
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
144
                return true, nil
1✔
145
        }
1✔
146
        otherCron := &cdiv1.DataImportCron{}
1✔
147
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
148
                if k8serrors.IsNotFound(err) {
2✔
149
                        return true, nil
1✔
150
                }
1✔
151
                return false, err
×
152
        }
153
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
154
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
155
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
156
                r.log.V(3).Info(msg)
1✔
157
                return false, nil
1✔
158
        }
1✔
159
        return true, nil
×
160
}
161

162
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
163
        if dataImportCron.Spec.Schedule == "" {
2✔
164
                return nil
1✔
165
        }
1✔
166
        if isControllerPolledSource(dataImportCron) {
2✔
167
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
168
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
169
                }
1✔
170
                return nil
1✔
171
        }
172
        if !isURLSource(dataImportCron) {
1✔
173
                return nil
×
174
        }
×
175
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
176
        if exists || err != nil {
2✔
177
                return err
1✔
178
        }
1✔
179
        cronJob, err := r.newCronJob(dataImportCron)
1✔
180
        if err != nil {
1✔
181
                return err
×
182
        }
×
183
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
184
                return err
×
185
        }
×
186
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
187
        if err != nil {
1✔
188
                return err
×
189
        }
×
190
        if err := r.client.Create(ctx, job); err != nil {
1✔
191
                return err
×
192
        }
×
193
        return nil
1✔
194
}
195

196
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
197
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
198
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
199
        }
×
200
        imageStream := &imagev1.ImageStream{}
1✔
201
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
202
        if err != nil {
2✔
203
                return nil, "", err
1✔
204
        }
1✔
205
        imageStreamNamespacedName := types.NamespacedName{
1✔
206
                Namespace: imageStreamNamespace,
1✔
207
                Name:      name,
1✔
208
        }
1✔
209
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
210
                return nil, "", err
1✔
211
        }
1✔
212
        return imageStream, tag, nil
1✔
213
}
214

215
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
216
        if imageStream == nil {
1✔
217
                return "", "", errors.Errorf("No ImageStream")
×
218
        }
×
219
        tags := imageStream.Status.Tags
1✔
220
        if len(tags) == 0 {
1✔
221
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
222
        }
×
223

224
        tagIdx := 0
1✔
225
        if len(imageStreamTag) > 0 {
2✔
226
                tagIdx = -1
1✔
227
                for i, tag := range tags {
2✔
228
                        if tag.Tag == imageStreamTag {
2✔
229
                                tagIdx = i
1✔
230
                                break
1✔
231
                        }
232
                }
233
        }
234
        if tagIdx == -1 {
2✔
235
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
236
        }
1✔
237

238
        if len(tags[tagIdx].Items) == 0 {
2✔
239
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
240
        }
1✔
241
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
242
}
243

244
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
245
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
246
                return imageStreamName, "", nil
1✔
247
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
248
                return subs[0], subs[1], nil
1✔
249
        }
1✔
250
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
251
}
252

253
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
254
        nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]
1✔
255
        if nextTimeStr == "" {
1✔
256
                return r.setNextCronTime(dataImportCron)
×
257
        }
×
258
        nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
259
        if err != nil {
1✔
260
                return reconcile.Result{}, err
×
261
        }
×
262
        if nextTime.After(time.Now()) {
2✔
263
                return r.setNextCronTime(dataImportCron)
1✔
264
        }
1✔
265
        switch {
1✔
266
        case isImageStreamSource(dataImportCron):
1✔
267
                if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
268
                        return reconcile.Result{}, err
1✔
269
                }
1✔
270
        case isPvcSource(dataImportCron):
1✔
271
                if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
2✔
272
                        return reconcile.Result{}, err
1✔
273
                }
1✔
274
        case isNodePull(dataImportCron):
1✔
275
                if done, err := r.updateContainerImageDesiredDigest(ctx, dataImportCron); !done {
2✔
276
                        return reconcile.Result{RequeueAfter: 3 * time.Second}, err
1✔
277
                } else if err != nil {
2✔
278
                        return reconcile.Result{}, err
×
279
                }
×
280
        }
281

282
        return r.setNextCronTime(dataImportCron)
1✔
283
}
284

285
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
286
        now := time.Now()
1✔
287
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
288
        if err != nil {
1✔
289
                return reconcile.Result{}, err
×
290
        }
×
291
        nextTime := expr.Next(now)
1✔
292
        requeueAfter := nextTime.Sub(now)
1✔
293
        res := reconcile.Result{RequeueAfter: requeueAfter}
1✔
294
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
295
        return res, err
1✔
296
}
297

298
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
299
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
300
        return err == nil && regSource.ImageStream != nil
1✔
301
}
1✔
302

303
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
304
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
305
        return err == nil && regSource.URL != nil
1✔
306
}
1✔
307

308
func isNodePull(cron *cdiv1.DataImportCron) bool {
1✔
309
        regSource, err := getCronRegistrySource(cron)
1✔
310
        return err == nil && regSource != nil && regSource.PullMethod != nil &&
1✔
311
                *regSource.PullMethod == cdiv1.RegistryPullNode
1✔
312
}
1✔
313

314
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
315
        if !isCronRegistrySource(cron) {
2✔
316
                return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
1✔
317
        }
1✔
318
        return cron.Spec.Template.Spec.Source.Registry, nil
1✔
319
}
320

321
func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
1✔
322
        source := cron.Spec.Template.Spec.Source
1✔
323
        return source != nil && source.Registry != nil
1✔
324
}
1✔
325

326
func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
1✔
327
        if !isPvcSource(cron) {
1✔
328
                return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
×
329
        }
×
330
        return cron.Spec.Template.Spec.Source.PVC, nil
1✔
331
}
332

333
func isPvcSource(cron *cdiv1.DataImportCron) bool {
1✔
334
        source := cron.Spec.Template.Spec.Source
1✔
335
        return source != nil && source.PVC != nil
1✔
336
}
1✔
337

338
func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
1✔
339
        return isImageStreamSource(cron) || isPvcSource(cron) || isNodePull(cron)
1✔
340
}
1✔
341

342
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
343
        res := reconcile.Result{}
1✔
344

1✔
345
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
346
        if err != nil {
1✔
347
                return res, err
×
348
        }
×
349

350
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
351
        imports := dataImportCron.Status.CurrentImports
1✔
352
        importSucceeded := false
1✔
353

1✔
354
        dataVolume := dataImportCron.Spec.Template
1✔
355
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
356
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
357
        if err != nil {
1✔
358
                return res, err
×
359
        }
×
360
        if desiredStorageClass != nil {
2✔
361
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
362
                        return res, err
1✔
363
                }
1✔
364
                currentSc, hasCurrent := dataImportCron.Annotations[AnnStorageClass]
1✔
365
                desiredSc := desiredStorageClass.Name
1✔
366
                if hasCurrent && currentSc != desiredSc {
2✔
367
                        r.log.Info("Storage class changed, delete most recent source on the old sc as it's no longer the desired", "currentSc", currentSc, "desiredSc", desiredSc)
1✔
368
                        if err := r.handleStorageClassChange(ctx, dataImportCron, desiredSc); err != nil {
1✔
369
                                return res, err
×
370
                        }
×
371
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
372
                }
373
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
374
        }
375
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
376
        if err != nil {
1✔
377
                return res, err
×
378
        }
×
379
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
380
        if err != nil {
1✔
381
                return res, err
×
382
        }
×
383

384
        handlePopulatedPvc := func() error {
2✔
385
                if pvc != nil {
2✔
386
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
387
                                return err
×
388
                        }
×
389
                }
390
                importSucceeded = true
1✔
391
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
392
                        return err
×
393
                }
×
394

395
                return nil
1✔
396
        }
397

398
        switch {
1✔
399
        case dv != nil:
1✔
400
                switch dv.Status.Phase {
1✔
401
                case cdiv1.Succeeded:
1✔
402
                        if err := handlePopulatedPvc(); err != nil {
1✔
403
                                return res, err
×
404
                        }
×
405
                case cdiv1.ImportScheduled:
1✔
406
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
407
                case cdiv1.ImportInProgress:
1✔
408
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
409
                default:
1✔
410
                        dvPhase := string(dv.Status.Phase)
1✔
411
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
412
                }
413
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
414
                if err := handlePopulatedPvc(); err != nil {
1✔
415
                        return res, err
×
416
                }
×
417
        case snapshot != nil:
1✔
418
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
419
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
420
                                return res, err
×
421
                        }
×
422
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
423
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
424
                }
425
                // Below k8s 1.29 there's no way to know the source volume mode
426
                // Let's at least expose this info on our own snapshots
427
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
428
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
429
                        if err != nil {
1✔
430
                                return res, err
×
431
                        }
×
432
                        if volMode != nil {
2✔
433
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
434
                        }
1✔
435
                }
436
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
437
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
438
                if err != nil {
2✔
439
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
440
                                return res, err
×
441
                        }
×
442
                } else {
1✔
443
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
444
                }
1✔
445
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
446
                        return res, err
×
447
                }
×
448
                importSucceeded = true
1✔
449
        default:
1✔
450
                if len(imports) > 0 {
2✔
451
                        imports = imports[1:]
1✔
452
                        dataImportCron.Status.CurrentImports = imports
1✔
453
                }
1✔
454
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
455
        }
456

457
        if importSucceeded {
2✔
458
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
459
                        return res, err
×
460
                }
×
461
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
462
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
463
                        return res, err
×
464
                }
×
465
        }
466

467
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
468
                return res, err
×
469
        }
×
470

471
        // Skip if schedule is disabled
472
        if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
473
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
474
                pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
1✔
475
                if err != nil {
2✔
476
                        return pollRes, err
1✔
477
                }
1✔
478
                res = pollRes
1✔
479
        }
480

481
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
482
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
483
        if digestUpdated {
2✔
484
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
485
                if dv != nil {
1✔
486
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
487
                                return res, err
×
488
                        }
×
489
                }
490
                if importSucceeded || len(imports) == 0 {
2✔
491
                        if err := r.createImportDataVolume(ctx, dataImportCron); err != nil {
2✔
492
                                return res, err
1✔
493
                        }
1✔
494
                }
495
        } else if importSucceeded {
2✔
496
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
497
                        return res, err
×
498
                }
×
499
        } else if len(imports) > 0 {
2✔
500
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
501
        } else {
2✔
502
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
503
        }
1✔
504

505
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
506
                return res, err
×
507
        }
×
508

509
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
510
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
511
                        return res, err
×
512
                }
×
513
        }
514
        return res, nil
1✔
515
}
516

517
// Returns the current import DV if exists, and the last imported PVC
518
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
519
        imports := cron.Status.CurrentImports
1✔
520
        if len(imports) == 0 {
2✔
521
                return nil, nil, nil
1✔
522
        }
1✔
523

524
        dvName := imports[0].DataVolumeName
1✔
525
        dv := &cdiv1.DataVolume{}
1✔
526
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
527
                if !k8serrors.IsNotFound(err) {
1✔
528
                        return nil, nil, err
×
529
                }
×
530
                dv = nil
1✔
531
        }
532

533
        pvc := &corev1.PersistentVolumeClaim{}
1✔
534
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
535
                if !k8serrors.IsNotFound(err) {
1✔
536
                        return nil, nil, err
×
537
                }
×
538
                pvc = nil
1✔
539
        }
540
        return dv, pvc, nil
1✔
541
}
542

543
// Returns the current import DV if exists, and the last imported PVC
544
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
545
        imports := cron.Status.CurrentImports
1✔
546
        if len(imports) == 0 {
2✔
547
                return nil, nil
1✔
548
        }
1✔
549

550
        snapName := imports[0].DataVolumeName
1✔
551
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
552
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
553
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
554
                        return nil, err
×
555
                }
×
556
                return nil, nil
1✔
557
        }
558

559
        return snapshot, nil
1✔
560
}
561

562
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
563
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
564
        dataSource := &cdiv1.DataSource{}
1✔
565
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
566
                return nil, err
1✔
567
        }
1✔
568
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
569
                log := r.log.WithName("getCronManagedDataSource")
×
570
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
×
571
                return nil, ErrNotManagedByCron
×
572
        }
×
573
        return dataSource, nil
1✔
574
}
575

576
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
577
        objCopy := obj.DeepCopyObject()
1✔
578
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
579
        r.setDataImportCronResourceLabels(cron, obj)
1✔
580
        if !reflect.DeepEqual(obj, objCopy) {
2✔
581
                if err := r.client.Update(ctx, obj); err != nil {
1✔
582
                        return err
×
583
                }
×
584
        }
585
        return nil
1✔
586
}
587

588
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
589
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
590
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
591
                if cond.Status == corev1.ConditionFalse &&
×
592
                        (cond.Reason == common.GenericError || cond.Reason == ImagePullFailedReason) {
×
593
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
594
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
595
                        dv.Labels[common.DataImportCronLabel] = ""
×
596
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
597
                                return err
×
598
                        }
×
599
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
600
                                return err
×
601
                        }
×
602
                        cron.Status.CurrentImports = nil
×
603
                }
604
        }
605
        return nil
×
606
}
607

608
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
609
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
610
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
611
        if err != nil {
1✔
612
                return err
×
613
        }
×
614
        if regSource.ImageStream == nil {
1✔
615
                return nil
×
616
        }
×
617
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
618
        if err != nil {
2✔
619
                return err
1✔
620
        }
1✔
621
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
622
        if err != nil {
2✔
623
                return err
1✔
624
        }
1✔
625
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
626
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
627
                log.Info("Updating DataImportCron", "digest", digest)
1✔
628
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
629
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
630
        }
1✔
631
        return nil
1✔
632
}
633

634
func (r *DataImportCronReconciler) updateContainerImageDesiredDigest(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
635
        log := r.log.WithValues("name", cron.Name).WithValues("uid", cron.UID)
1✔
636
        podName := getPollerPodName(cron)
1✔
637
        ns := cron.Namespace
1✔
638
        nn := types.NamespacedName{Name: podName, Namespace: ns}
1✔
639
        pod := &corev1.Pod{}
1✔
640

1✔
641
        if err := r.client.Get(ctx, nn, pod); err == nil {
2✔
642
                digest, err := fetchContainerImageDigest(pod)
1✔
643
                if err != nil || digest == "" {
1✔
644
                        return false, err
×
645
                }
×
646
                cc.AddAnnotation(cron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
647
                if cron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
648
                        log.Info("Updating DataImportCron", "digest", digest)
1✔
649
                        cc.AddAnnotation(cron, AnnSourceDesiredDigest, digest)
1✔
650
                }
1✔
651
                return true, r.client.Delete(ctx, pod)
1✔
652
        } else if cc.IgnoreNotFound(err) != nil {
1✔
653
                return false, err
×
654
        }
×
655

656
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(ctx, r.client)
1✔
657
        if err != nil {
1✔
658
                return false, err
×
659
        }
×
660
        platform := cron.Spec.Template.Spec.Source.Registry.Platform
1✔
661
        if platform != nil && platform.Architecture != "" {
1✔
662
                if workloadNodePlacement.NodeSelector == nil {
×
663
                        workloadNodePlacement.NodeSelector = map[string]string{}
×
664
                }
×
665
                workloadNodePlacement.NodeSelector[corev1.LabelArchStable] = platform.Architecture
×
666
        }
667

668
        containerImage := strings.TrimPrefix(*cron.Spec.Template.Spec.Source.Registry.URL, "docker://")
1✔
669

1✔
670
        pod = &corev1.Pod{
1✔
671
                ObjectMeta: metav1.ObjectMeta{
1✔
672
                        Name:      podName,
1✔
673
                        Namespace: ns,
1✔
674
                        OwnerReferences: []metav1.OwnerReference{
1✔
675
                                {
1✔
676
                                        APIVersion:         cron.APIVersion,
1✔
677
                                        Kind:               cron.Kind,
1✔
678
                                        Name:               cron.Name,
1✔
679
                                        UID:                cron.UID,
1✔
680
                                        BlockOwnerDeletion: ptr.To[bool](true),
1✔
681
                                        Controller:         ptr.To[bool](true),
1✔
682
                                },
1✔
683
                        },
1✔
684
                },
1✔
685
                Spec: corev1.PodSpec{
1✔
686
                        TerminationGracePeriodSeconds: ptr.To[int64](0),
1✔
687
                        RestartPolicy:                 corev1.RestartPolicyNever,
1✔
688
                        NodeSelector:                  workloadNodePlacement.NodeSelector,
1✔
689
                        Tolerations:                   workloadNodePlacement.Tolerations,
1✔
690
                        Affinity:                      workloadNodePlacement.Affinity,
1✔
691
                        Volumes: []corev1.Volume{
1✔
692
                                {
1✔
693
                                        Name: "shared-volume",
1✔
694
                                        VolumeSource: corev1.VolumeSource{
1✔
695
                                                EmptyDir: &corev1.EmptyDirVolumeSource{},
1✔
696
                                        },
1✔
697
                                },
1✔
698
                        },
1✔
699
                        InitContainers: []corev1.Container{
1✔
700
                                {
1✔
701
                                        Name:                     "init",
1✔
702
                                        Image:                    r.image,
1✔
703
                                        ImagePullPolicy:          corev1.PullPolicy(r.pullPolicy),
1✔
704
                                        Command:                  []string{"sh", "-c", "cp /usr/bin/cdi-containerimage-server /shared/server"},
1✔
705
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
706
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
707
                                },
1✔
708
                        },
1✔
709
                        Containers: []corev1.Container{
1✔
710
                                {
1✔
711
                                        Name:                     "image-container",
1✔
712
                                        Image:                    containerImage,
1✔
713
                                        ImagePullPolicy:          corev1.PullAlways,
1✔
714
                                        Command:                  []string{"/shared/server", "-h"},
1✔
715
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
716
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
717
                                },
1✔
718
                        },
1✔
719
                },
1✔
720
        }
1✔
721

1✔
722
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
723
        if pod.Spec.SecurityContext != nil {
2✔
724
                pod.Spec.SecurityContext.FSGroup = nil
1✔
725
        }
1✔
726

727
        return false, r.client.Create(ctx, pod)
1✔
728
}
729

730
func fetchContainerImageDigest(pod *corev1.Pod) (string, error) {
1✔
731
        if len(pod.Status.ContainerStatuses) == 0 {
1✔
732
                return "", nil
×
733
        }
×
734

735
        status := pod.Status.ContainerStatuses[0]
1✔
736
        if status.State.Waiting != nil {
1✔
737
                reason := status.State.Waiting.Reason
×
738
                switch reason {
×
739
                case "ImagePullBackOff", "ErrImagePull", "InvalidImageName":
×
740
                        return "", errors.Errorf("%s %s: %s", common.ImagePullFailureText, status.Image, reason)
×
741
                }
742
                return "", nil
×
743
        }
744

745
        if status.State.Terminated == nil {
1✔
746
                return "", nil
×
747
        }
×
748

749
        imageID := status.ImageID
1✔
750
        if imageID == "" {
1✔
751
                return "", errors.Errorf("Container has no imageID")
×
752
        }
×
753
        idx := strings.Index(imageID, digestSha256Prefix)
1✔
754
        if idx < 0 {
1✔
755
                return "", errors.Errorf("Container image %s ID has no digest: %s", status.Image, imageID)
×
756
        }
×
757

758
        return imageID[idx:], nil
1✔
759
}
760

761
func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
762
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
763
        pvcSource, err := getCronPvcSource(dataImportCron)
1✔
764
        if err != nil {
1✔
765
                return err
×
766
        }
×
767
        ns := pvcSource.Namespace
1✔
768
        if ns == "" {
2✔
769
                ns = dataImportCron.Namespace
1✔
770
        }
1✔
771
        pvc := &corev1.PersistentVolumeClaim{}
1✔
772
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
2✔
773
                return err
1✔
774
        }
1✔
775
        digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
1✔
776
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
777
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
778
                log.Info("Updating DataImportCron", "digest", digest)
1✔
779
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
780
        }
1✔
781
        return nil
1✔
782
}
783

784
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
785
        log := r.log.WithName("updateDataSource")
1✔
786
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
787
        if err != nil {
2✔
788
                if k8serrors.IsNotFound(err) {
2✔
789
                        dataSource = r.newDataSource(dataImportCron)
1✔
790
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
791
                                return err
×
792
                        }
×
793
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
1✔
794
                } else if errors.Is(err, ErrNotManagedByCron) {
×
795
                        return nil
×
796
                } else {
×
797
                        return err
×
798
                }
×
799
        }
800
        dataSourceCopy := dataSource.DeepCopy()
1✔
801
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
802

1✔
803
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
804
        populateDataSource(format, dataSource, sourcePVC)
1✔
805

1✔
806
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
807
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
808
                        return err
×
809
                }
×
810
        }
811

812
        return nil
1✔
813
}
814

815
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
816
        if sourcePVC == nil {
2✔
817
                return
1✔
818
        }
1✔
819

820
        switch format {
1✔
821
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
822
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
823
                        PVC: sourcePVC,
1✔
824
                }
1✔
825
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
826
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
827
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
828
                                Namespace: sourcePVC.Namespace,
1✔
829
                                Name:      sourcePVC.Name,
1✔
830
                        },
1✔
831
                }
1✔
832
        }
833
}
834

835
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
836
        if dataImportCron.Status.CurrentImports == nil {
1✔
837
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
838
        }
×
839
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
840
                Namespace: dataImportCron.Namespace,
1✔
841
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
842
        }
1✔
843
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
844
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
845
                now := metav1.Now()
1✔
846
                dataImportCron.Status.LastImportTimestamp = &now
1✔
847
        }
1✔
848
        return nil
1✔
849
}
850

851
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
852
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
853
        if lastTimeStr == "" {
2✔
854
                return nil
1✔
855
        }
1✔
856
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
857
        if err != nil {
1✔
858
                return err
×
859
        }
×
860
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
861
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
862
        }
1✔
863
        return nil
1✔
864
}
865

866
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
867
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
868
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
869
        if digest == "" {
1✔
870
                return nil
×
871
        }
×
872
        dvName, err := createDvName(dataSourceName, digest)
1✔
873
        if err != nil {
2✔
874
                return err
1✔
875
        }
1✔
876
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
877

1✔
878
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
879
        for _, src := range sources {
2✔
880
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
881
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
882
                                return err
×
883
                        }
×
884
                } else {
1✔
885
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
886
                                return err
×
887
                        }
×
888
                        // If source exists don't create DV
889
                        return nil
1✔
890
                }
891
        }
892

893
        dv := r.newSourceDataVolume(dataImportCron, dvName)
1✔
894
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
895
                return err
×
896
        }
×
897

898
        return nil
1✔
899
}
900

901
func (r *DataImportCronReconciler) handleStorageClassChange(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
1✔
902
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
903
        if !ok {
1✔
904
                // nothing to delete
×
905
                return nil
×
906
        }
×
907
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
908
        if err != nil {
1✔
909
                return err
×
910
        }
×
911
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
1✔
912
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
1✔
913
        for _, src := range sources {
2✔
914
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
915
                        return err
×
916
                }
×
917
        }
918
        for _, src := range sources {
2✔
919
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
1✔
920
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
×
921
                }
×
922
        }
923
        // Only update desired storage class once garbage collection went through
924
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
1✔
925
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
926
        if err != nil {
1✔
927
                return err
×
928
        }
×
929

930
        return nil
1✔
931
}
932

933
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
934
        switch format {
1✔
935
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
936
                return nil
1✔
937
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
938
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
939
        default:
×
940
                return fmt.Errorf("unknown source format for snapshot")
×
941
        }
942
}
943

944
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
945
        if pvc == nil {
1✔
946
                return nil
×
947
        }
×
948
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
949
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
950
                return nil
1✔
951
        }
1✔
952
        storageProfile := &cdiv1.StorageProfile{}
1✔
953
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
954
                return err
×
955
        }
×
956
        className, err := cc.GetSnapshotClassForSmartClone(pvc, &desiredStorageClass.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
957
        if err != nil {
1✔
958
                return err
×
959
        }
×
960
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
961
                ObjectMeta: metav1.ObjectMeta{
1✔
962
                        Name:      pvc.Name,
1✔
963
                        Namespace: dataImportCron.Namespace,
1✔
964
                        Labels: map[string]string{
1✔
965
                                common.CDILabelKey:       common.CDILabelValue,
1✔
966
                                common.CDIComponentLabel: "",
1✔
967
                        },
1✔
968
                },
1✔
969
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
970
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
971
                                PersistentVolumeClaimName: &pvc.Name,
1✔
972
                        },
1✔
973
                        VolumeSnapshotClassName: &className,
1✔
974
                },
1✔
975
        }
1✔
976
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
977
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
1✔
978

1✔
979
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
980
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
981
                if !k8serrors.IsNotFound(err) {
1✔
982
                        return err
×
983
                }
×
984
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
985
                if pvc.Spec.VolumeMode != nil {
2✔
986
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
987
                }
1✔
988
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
989
                        return err
×
990
                }
×
991
        } else {
1✔
992
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
993
                        // Clean up DV/PVC as they are not needed anymore
1✔
994
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
995
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
996
                                return err
×
997
                        }
×
998
                }
999
        }
1000

1001
        return nil
1✔
1002
}
1003

1004
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
1005
        dataImportCron.Status.SourceFormat = &format
1✔
1006

1✔
1007
        switch format {
1✔
1008
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1009
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1010
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1011
                if snapshot == nil {
2✔
1012
                        // Snapshot create/update will trigger reconcile
1✔
1013
                        return nil
1✔
1014
                }
1✔
1015
                if cc.IsSnapshotReady(snapshot) {
2✔
1016
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1017
                } else {
2✔
1018
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
1019
                }
1✔
1020
        default:
×
1021
                return fmt.Errorf("unknown source format for snapshot")
×
1022
        }
1023

1024
        return nil
1✔
1025
}
1026

1027
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
1028
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
1029
        if desiredStorageClass == nil {
2✔
1030
                return format, nil
1✔
1031
        }
1✔
1032

1033
        storageProfile := &cdiv1.StorageProfile{}
1✔
1034
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1035
                return format, err
×
1036
        }
×
1037
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
1038
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
1039
        }
1✔
1040

1041
        return format, nil
1✔
1042
}
1043

1044
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
1045
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
1046
                return nil
×
1047
        }
×
1048
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
1049
        if err != nil {
1✔
1050
                return err
×
1051
        }
×
1052

1053
        maxImports := defaultImportsToKeepPerCron
1✔
1054

1✔
1055
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
1056
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
1057
        }
1✔
1058

1059
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
1060
                return err
×
1061
        }
×
1062
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
1063
                return err
×
1064
        }
×
1065

1066
        return nil
1✔
1067
}
1068

1069
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
1070
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
1071

1✔
1072
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1073
                return err
×
1074
        }
×
1075
        if len(pvcList.Items) > maxImports {
2✔
1076
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
1077
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
1078
                })
1✔
1079
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
1080
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1081
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
1082
                                return err
×
1083
                        }
×
1084
                }
1085
        }
1086

1087
        dvList := &cdiv1.DataVolumeList{}
1✔
1088
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1089
                return err
×
1090
        }
×
1091

1092
        if len(dvList.Items) > maxImports {
2✔
1093
                for _, dv := range dvList.Items {
2✔
1094
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
1095
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
1096
                                return err
×
1097
                        }
×
1098

1099
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1100
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1101
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
1102
                                        return err
×
1103
                                }
×
1104
                        }
1105
                }
1106
        }
1107

1108
        return nil
1✔
1109
}
1110

1111
// deleteDvPvc deletes DV or PVC if DV was GCed
1112
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
1113
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
1114
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
1115
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
1116
                return err
1✔
1117
        }
1✔
1118
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
1119
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
1120
                return err
×
1121
        }
×
1122
        return nil
1✔
1123
}
1124

1125
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
1126
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
1127

1✔
1128
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1129
                if meta.IsNoMatchError(err) {
×
1130
                        return nil
×
1131
                }
×
1132
                return err
×
1133
        }
1134
        if len(snapList.Items) > maxImports {
1✔
1135
                sort.Slice(snapList.Items, func(i, j int) bool {
×
1136
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
1137
                })
×
1138
                for _, snap := range snapList.Items[maxImports:] {
×
1139
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
1140
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
1141
                                return err
×
1142
                        }
×
1143
                }
1144
        }
1145

1146
        return nil
1✔
1147
}
1148

1149
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
1150
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
1151
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
1152

1✔
1153
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
1154
                return err
×
1155
        }
×
1156
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
1157
        if err != nil {
1✔
1158
                return err
×
1159
        }
×
1160
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
1161
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
1162
                return err
×
1163
        }
×
1164
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
1165
                return err
×
1166
        }
×
1167
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
1168
                return err
×
1169
        }
×
1170
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
1171
                return err
×
1172
        }
×
1173
        return nil
1✔
1174
}
1175

1176
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
1177
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
1178
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
1179
        if err != nil {
1✔
1180
                return err
×
1181
        }
×
1182
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
1183
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
1184
                return err
×
1185
        }
×
1186
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
1187
                return err
×
1188
        }
×
1189

1190
        return nil
1✔
1191
}
1192

1193
// NewDataImportCronController creates a new instance of the DataImportCron controller
1194
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
1195
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
1196
                Scheme: mgr.GetScheme(),
×
1197
                Mapper: mgr.GetRESTMapper(),
×
1198
        })
×
1199
        if err != nil {
×
1200
                return nil, err
×
1201
        }
×
1202
        reconciler := &DataImportCronReconciler{
×
1203
                client:          mgr.GetClient(),
×
1204
                uncachedClient:  uncachedClient,
×
1205
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1206
                scheme:          mgr.GetScheme(),
×
1207
                log:             log.WithName(dataImportControllerName),
×
1208
                image:           importerImage,
×
1209
                pullPolicy:      pullPolicy,
×
1210
                cdiNamespace:    util.GetNamespace(),
×
1211
                installerLabels: installerLabels,
×
1212
        }
×
1213
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1214
                MaxConcurrentReconciles: 3,
×
1215
                Reconciler:              reconciler,
×
1216
        })
×
1217
        if err != nil {
×
1218
                return nil, err
×
1219
        }
×
1220
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
1221
                return nil, err
×
1222
        }
×
1223
        log.Info("Initialized DataImportCron controller")
×
1224
        return dataImportCronController, nil
×
1225
}
1226

1227
func getCronName(obj client.Object) string {
×
1228
        return obj.GetLabels()[common.DataImportCronLabel]
×
1229
}
×
1230

1231
func getCronNs(obj client.Object) string {
×
1232
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
1233
}
×
1234

1235
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
1236
        if cronName := getCronName(obj); cronName != "" {
×
1237
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
1238
        }
×
1239
        return nil
×
1240
}
1241

1242
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1243
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1244
                return err
×
1245
        }
×
1246

1247
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1248
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1249
                // Otherwise we risk losing the storage profile event
×
1250
                var crons cdiv1.DataImportCronList
×
1251
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
1252
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
1253
                        return nil
×
1254
                }
×
1255
                // Storage profiles are 1:1 to storage classes
1256
                scName := obj.GetName()
×
1257
                var reqs []reconcile.Request
×
1258
                for _, cron := range crons.Items {
×
1259
                        dataVolume := cron.Spec.Template
×
1260
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1261
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1262
                        if err != nil || templateSc == nil {
×
1263
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1264
                                return reqs
×
1265
                        }
×
1266
                        if templateSc.Name == scName {
×
1267
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1268
                        }
×
1269
                }
1270
                return reqs
×
1271
        }
1272

1273
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1274
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1275
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1276
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1277
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1278
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1279
                },
1280
        )); err != nil {
×
1281
                return err
×
1282
        }
×
1283

1284
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1285
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1286
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1287
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1288
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1289
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1290
                },
1291
        )); err != nil {
×
1292
                return err
×
1293
        }
×
1294

1295
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1296
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1297
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1298
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1299
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1300
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1301
                },
1302
        )); err != nil {
×
1303
                return err
×
1304
        }
×
1305

1306
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1307
                return err
×
1308
        }
×
1309

1310
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1311
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1312
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
1313
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1314
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1315
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1316
                                return e.ObjectOld.Status.DataImportCronSourceFormat != e.ObjectNew.Status.DataImportCronSourceFormat
×
1317
                        },
×
1318
                },
1319
        )); err != nil {
×
1320
                return err
×
1321
        }
×
1322

1323
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1324
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1325
        }
×
1326

1327
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1328
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1329
                predicate.TypedFuncs[*batchv1.CronJob]{
×
1330
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1331
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1332
                        },
×
1333
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
1334
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1335
                },
1336
        )); err != nil {
×
1337
                return err
×
1338
        }
×
1339

1340
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1341
                if meta.IsNoMatchError(err) {
×
1342
                        // Back out if there's no point to attempt watch
×
1343
                        return nil
×
1344
                }
×
1345
                if !cc.IsErrCacheNotStarted(err) {
×
1346
                        return err
×
1347
                }
×
1348
        }
1349
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1350
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
1351
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1352
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1353
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1354
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1355
                },
1356
        )); err != nil {
×
1357
                return err
×
1358
        }
×
1359

1360
        return nil
×
1361
}
1362

1363
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1364
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1365
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1366
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
1367
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1368
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1369
                                log.Info("Update", "sc", obj.GetName(),
×
1370
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1371
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1372
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
×
1373
                                if err != nil {
×
1374
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
1375
                                }
×
1376
                                return reqs
×
1377
                        },
1378
                ),
1379
                predicate.TypedFuncs[*storagev1.StorageClass]{
1380
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1381
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1382
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1383
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1384
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1385
                        },
×
1386
                },
1387
        )); err != nil {
×
1388
                return err
×
1389
        }
×
1390

1391
        return nil
×
1392
}
1393

1394
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1395
        dicList := &cdiv1.DataImportCronList{}
×
1396
        if err := c.List(ctx, dicList); err != nil {
×
1397
                return nil, err
×
1398
        }
×
1399
        reqs := []reconcile.Request{}
×
1400
        for _, dic := range dicList.Items {
×
1401
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
1402
                        continue
×
1403
                }
1404

1405
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1406
        }
1407

1408
        return reqs, nil
×
1409
}
1410

1411
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
1✔
1412
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1413
                return false, nil
1✔
1414
        }
1✔
1415

1416
        sc := pvc.Spec.StorageClassName
1✔
1417
        if sc == nil || *sc == desiredStorageClass {
2✔
1418
                return false, nil
1✔
1419
        }
1✔
1420

1421
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1✔
1422
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
1423
                return false, err
×
1424
        }
×
1425

1426
        return true, nil
1✔
1427
}
1428

1429
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1430
        cronJob := &batchv1.CronJob{}
1✔
1431
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1432
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1433
                return false, cc.IgnoreNotFound(err)
1✔
1434
        }
1✔
1435

1436
        cronJobCopy := cronJob.DeepCopy()
1✔
1437
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1438
                return false, err
×
1439
        }
×
1440

1441
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1442
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1443
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
1444
                        return false, cc.IgnoreNotFound(err)
×
1445
                }
×
1446
        }
1447
        return true, nil
1✔
1448
}
1449

1450
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1451
        cronJob := &batchv1.CronJob{
1✔
1452
                ObjectMeta: metav1.ObjectMeta{
1✔
1453
                        Name:      GetCronJobName(cron),
1✔
1454
                        Namespace: r.cdiNamespace,
1✔
1455
                },
1✔
1456
        }
1✔
1457
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
1458
                return nil, err
×
1459
        }
×
1460
        return cronJob, nil
1✔
1461
}
1462

1463
// InitPollerPodSpec inits poller PodSpec
1464
func InitPollerPodSpec(c client.Client, cron *cdiv1.DataImportCron, podSpec *corev1.PodSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1465
        regSource, err := getCronRegistrySource(cron)
1✔
1466
        if err != nil {
1✔
1467
                return err
×
1468
        }
×
1469
        if regSource.URL == nil {
1✔
1470
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1471
        }
×
1472
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1473
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
1474
                return err
×
1475
        }
×
1476
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1477
        if err != nil {
1✔
1478
                return err
×
1479
        }
×
1480
        container := corev1.Container{
1✔
1481
                Name:  "cdi-source-update-poller",
1✔
1482
                Image: image,
1✔
1483
                Command: []string{
1✔
1484
                        "/usr/bin/cdi-source-update-poller",
1✔
1485
                        "-ns", cron.Namespace,
1✔
1486
                        "-cron", cron.Name,
1✔
1487
                        "-url", *regSource.URL,
1✔
1488
                },
1✔
1489
                ImagePullPolicy:          pullPolicy,
1✔
1490
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1491
                TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
1492
        }
1✔
1493

1✔
1494
        var volumes []corev1.Volume
1✔
1495
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1496
        if hasCertConfigMap {
1✔
1497
                vm := corev1.VolumeMount{
×
1498
                        Name:      CertVolName,
×
1499
                        MountPath: common.ImporterCertDir,
×
1500
                }
×
1501
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1502
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
1503
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
1504
        }
×
1505

1506
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1507
                vm := corev1.VolumeMount{
1✔
1508
                        Name:      ProxyCertVolName,
1✔
1509
                        MountPath: common.ImporterProxyCertDir,
1✔
1510
                }
1✔
1511
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1512
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1513
        }
1✔
1514

1515
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
1516
                container.Env = append(container.Env,
×
1517
                        corev1.EnvVar{
×
1518
                                Name: common.ImporterAccessKeyID,
×
1519
                                ValueFrom: &corev1.EnvVarSource{
×
1520
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1521
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1522
                                                        Name: *regSource.SecretRef,
×
1523
                                                },
×
1524
                                                Key: common.KeyAccess,
×
1525
                                        },
×
1526
                                },
×
1527
                        },
×
1528
                        corev1.EnvVar{
×
1529
                                Name: common.ImporterSecretKey,
×
1530
                                ValueFrom: &corev1.EnvVarSource{
×
1531
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1532
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1533
                                                        Name: *regSource.SecretRef,
×
1534
                                                },
×
1535
                                                Key: common.KeySecret,
×
1536
                                        },
×
1537
                                },
×
1538
                        },
×
1539
                )
×
1540
        }
×
1541

1542
        addEnvVar := func(varName, value string) {
2✔
1543
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1544
        }
1✔
1545

1546
        if insecureTLS {
1✔
1547
                addEnvVar(common.InsecureTLSVar, "true")
×
1548
        }
×
1549

1550
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1551
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1552
                        addEnvVar(varName, value)
1✔
1553
                }
1✔
1554
        }
1555

1556
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1557
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1558
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1559

1✔
1560
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1561
        if err != nil {
1✔
1562
                return err
×
1563
        }
×
1564
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1565
        if err != nil {
1✔
1566
                return err
×
1567
        }
×
1568

1569
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1570
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1571
        podSpec.Containers = []corev1.Container{container}
1✔
1572
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1573
        podSpec.Volumes = volumes
1✔
1574
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1575
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1576
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1577
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1578

1✔
1579
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1580
        // No need for specifid uid/fsgroup here since this doesn't write or use qemu
1✔
1581
        if podSpec.SecurityContext != nil {
1✔
1582
                podSpec.SecurityContext.FSGroup = nil
1✔
1583
        }
2✔
1584
        if podSpec.Containers[0].SecurityContext != nil {
1✔
1585
                podSpec.Containers[0].SecurityContext.RunAsUser = nil
1✔
1586
        }
2✔
1587

1✔
1588
        return nil
1✔
1589
}
1590

2✔
1591
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1592
        cronJobSpec := &cronJob.Spec
1✔
1593
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1594
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1595
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1596
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1597

1598
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1599
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1600
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1601
        cc.AddAnnotation(&jobSpec.Template, secv1.RequiredSCCAnnotation, common.RestrictedSCCName)
1✔
1602

1✔
1603
        podSpec := &jobSpec.Template.Spec
1✔
1604
        if err := InitPollerPodSpec(r.client, cron, podSpec, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1605
                return err
1✔
1606
        }
1✔
1607
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1608
                return err
1✔
1609
        }
1✔
1610
        return nil
1✔
1611
}
1✔
1612

×
1613
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
×
1614
        job := &batchv1.Job{
1✔
1615
                ObjectMeta: metav1.ObjectMeta{
×
1616
                        Name:      GetInitialJobName(cron),
×
1617
                        Namespace: cronJob.Namespace,
1✔
1618
                },
1619
                Spec: cronJob.Spec.JobTemplate.Spec,
1620
        }
1✔
1621
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1622
                return nil, err
1✔
1623
        }
1✔
1624
        return job, nil
1✔
1625
}
1✔
1626

1✔
1627
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1628
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1629
                return err
×
1630
        }
×
1631
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1632
        labels := obj.GetLabels()
1633
        labels[common.DataImportCronNsLabel] = cron.Namespace
1634
        labels[common.DataImportCronLabel] = cron.Name
1✔
1635
        obj.SetLabels(labels)
1✔
1636
        return nil
×
1637
}
×
1638

1✔
1639
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string) *cdiv1.DataVolume {
1✔
1640
        dv := cron.Spec.Template.DeepCopy()
1✔
1641
        if isCronRegistrySource(cron) {
1✔
1642
                var digestedURL string
1✔
1643
                if isURLSource(cron) {
1✔
1644
                        digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1645
                } else if isImageStreamSource(cron) {
1646
                        // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1647
                        digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1648
                        dv.Spec.Source.Registry.ImageStream = nil
2✔
1649
                }
1✔
1650
                dv.Spec.Source.Registry.URL = &digestedURL
2✔
1651
        }
1✔
1652
        dv.Name = dataVolumeName
3✔
1653
        dv.Namespace = cron.Namespace
1✔
1654
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1655
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1656
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1657
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1658

1659
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
1✔
1660
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1661
        }
1✔
1662

1✔
1663
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1664

1✔
1665
        return dv
1✔
1666
}
2✔
1667

1✔
1668
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1669
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1670
        cc.CopyAllowedLabels(cron.GetLabels(), obj, true)
1✔
1671
        labels := obj.GetLabels()
1✔
1672
        labels[common.DataImportCronLabel] = cron.Name
1✔
1673
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
1674
                labels[common.DataImportCronCleanupLabel] = "true"
1675
        }
1✔
1676
        obj.SetLabels(labels)
1✔
1677
}
1✔
1678

1✔
1679
func untagDigestedDockerURL(dockerURL string) string {
1✔
1680
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1681
                url := u.Host + u.Path
1✔
1682
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1683
                // Check for tag
1✔
1684
                if len(subs) > 2 && len(subs[2]) > 0 {
1685
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
1686
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1687
                        }
2✔
1688
                }
1✔
1689
        }
1✔
1690
        return dockerURL
1✔
1691
}
2✔
1692

2✔
1693
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1694
        if val := cron.Labels[ann]; val != "" {
1✔
1695
                cc.AddLabel(dv, ann, val)
1696
        }
1697
}
1✔
1698

1699
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1700
        if val := cron.Annotations[ann]; val != "" {
1✔
1701
                cc.AddAnnotation(dv, ann, val)
2✔
1702
        }
1✔
1703
}
1✔
1704

1705
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1706
        dataSource := &cdiv1.DataSource{
1✔
1707
                ObjectMeta: metav1.ObjectMeta{
1✔
1708
                        Name:      cron.Spec.ManagedDataSource,
×
1709
                        Namespace: cron.Namespace,
×
1710
                },
1711
        }
1712
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1713
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1714
        cc.CopyAllowedLabels(cron.GetLabels(), dataSource, true)
1✔
1715
        return dataSource
1✔
1716
}
1✔
1717

1✔
1718
// Create DataVolume name based on the DataSource name + prefix of the digest
1✔
1719
func createDvName(prefix, digest string) (string, error) {
1✔
1720
        digestPrefix := ""
1✔
1721
        if strings.HasPrefix(digest, digestSha256Prefix) {
1✔
1722
                digestPrefix = digestSha256Prefix
1✔
1723
        } else if strings.HasPrefix(digest, digestUIDPrefix) {
1✔
1724
                digestPrefix = digestUIDPrefix
1725
        } else {
1726
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1727
        }
1✔
1728
        fromIdx := len(digestPrefix)
2✔
1729
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1730
        if len(digest) < toIdx {
3✔
1731
                return "", errors.Errorf("Digest is too short")
1✔
1732
        }
2✔
1733
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1734
}
1✔
1735

1✔
1736
// GetCronJobName get CronJob name based on cron name and UID
1✔
1737
func GetCronJobName(cron *cdiv1.DataImportCron) string {
2✔
1738
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1739
}
1✔
1740

1✔
1741
// GetInitialJobName get initial job name based on cron name and UID
1742
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1743
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1744
}
1✔
1745

1✔
1746
func getPollerPodName(cron *cdiv1.DataImportCron) string {
1✔
1747
        return naming.GetResourceName("poller-"+cron.Name, string(cron.UID)[:8])
1748
}
1749

1✔
1750
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1751
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1752
}
1753

1✔
1754
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1755
        dv := &cron.Spec.Template
1✔
1756

1757
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1758
                return explicitVolumeMode, nil
1✔
1759
        }
1✔
1760

1761
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1762
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1763
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1764
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1765
                        AccessModes:      accessModes,
×
1766
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
×
1767
                        Resources: corev1.VolumeResourceRequirements{
1768
                                Requests: corev1.ResourceList{
1✔
1769
                                        // Doesn't matter
1✔
1770
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1771
                                },
1✔
1772
                        },
1✔
1773
                },
1✔
1774
        }
1✔
1775
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1776
                return nil, err
1✔
1777
        }
1✔
1778

1✔
1779
        return inferredPvc.Spec.VolumeMode, nil
1✔
1780
}
1✔
1781

1✔
1782
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1✔
1783
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
×
1784
        if dv.Spec.PVC != nil {
×
1785
                return dv.Spec.PVC.VolumeMode
1786
        }
1✔
1787

1788
        if dv.Spec.Storage != nil {
1789
                return dv.Spec.Storage.VolumeMode
1790
        }
1✔
1791

1✔
1792
        return nil
×
1793
}
×
1794

1795
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
2✔
1796
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1797
        if dv.Spec.PVC != nil {
1✔
1798
                return dv.Spec.PVC.AccessModes
1799
        }
×
1800

1801
        if dv.Spec.Storage != nil {
1802
                return dv.Spec.Storage.AccessModes
1803
        }
1✔
1804

1✔
1805
        return nil
×
1806
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc