• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5459

11 Jul 2025 02:30AM UTC coverage: 59.487% (+0.06%) from 59.426%
#5459

push

travis-ci

web-flow
Support DIC DV node pullMethod for update polling (#3798)

* Support DIC DV node pullMethod for update polling

It allows the DataImportCron controller to fetch a registry
containerImage imageID digest by a do-nothing image-puller container,
without depending on OpenShift ImageStream or cronjobs. k8s node caching
makes it also quite cheap. If the image was not updated, it is already
cached. If the image is a new one, k8s will pull and cache it, so in the
following import it will be retrieved from the cache, assuming both pods
are scheduled on the same node.

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>

* Add utest and do minor cleanup

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>

---------

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>

105 of 126 new or added lines in 1 file covered. (83.33%)

11 existing lines in 2 files now uncovered.

17090 of 28729 relevant lines covered (59.49%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.62
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        secv1 "github.com/openshift/api/security/v1"
33
        "github.com/pkg/errors"
34
        cronexpr "github.com/robfig/cron/v3"
35

36
        batchv1 "k8s.io/api/batch/v1"
37
        corev1 "k8s.io/api/core/v1"
38
        storagev1 "k8s.io/api/storage/v1"
39
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
40
        "k8s.io/apimachinery/pkg/api/meta"
41
        "k8s.io/apimachinery/pkg/api/resource"
42
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
43
        "k8s.io/apimachinery/pkg/labels"
44
        "k8s.io/apimachinery/pkg/runtime"
45
        "k8s.io/apimachinery/pkg/types"
46
        "k8s.io/client-go/tools/record"
47
        openapicommon "k8s.io/kube-openapi/pkg/common"
48
        "k8s.io/utils/ptr"
49

50
        "sigs.k8s.io/controller-runtime/pkg/client"
51
        "sigs.k8s.io/controller-runtime/pkg/controller"
52
        "sigs.k8s.io/controller-runtime/pkg/event"
53
        "sigs.k8s.io/controller-runtime/pkg/handler"
54
        "sigs.k8s.io/controller-runtime/pkg/manager"
55
        "sigs.k8s.io/controller-runtime/pkg/predicate"
56
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
57
        "sigs.k8s.io/controller-runtime/pkg/source"
58

59
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
60
        "kubevirt.io/containerized-data-importer/pkg/common"
61
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
62
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
63
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
64
        "kubevirt.io/containerized-data-importer/pkg/operator"
65
        "kubevirt.io/containerized-data-importer/pkg/util"
66
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
67
)
68

69
const (
70
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
71
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
72
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
73
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
74
)
75

76
// DataImportCronReconciler members
77
type DataImportCronReconciler struct {
78
        client          client.Client
79
        uncachedClient  client.Client
80
        recorder        record.EventRecorder
81
        scheme          *runtime.Scheme
82
        log             logr.Logger
83
        image           string
84
        pullPolicy      string
85
        cdiNamespace    string
86
        installerLabels map[string]string
87
}
88

89
const (
90
        // AnnSourceDesiredDigest is the digest of the pending updated image
91
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
92
        // AnnImageStreamDockerRef is the ImageStream Docker reference
93
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
94
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
95
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
96
        // AnnLastCronTime is the cron last execution time stamp
97
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
98
        // AnnLastUseTime is the PVC last use time stamp
99
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
100
        // AnnStorageClass is the cron DV's storage class
101
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
102

103
        dataImportControllerName    = "dataimportcron-controller"
104
        digestSha256Prefix          = "sha256:"
105
        digestUIDPrefix             = "uid:"
106
        digestDvNameSuffixLength    = 12
107
        cronJobUIDSuffixLength      = 8
108
        defaultImportsToKeepPerCron = 3
109
)
110

111
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
112

113
// Reconcile loop for DataImportCronReconciler
114
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
115
        dataImportCron := &cdiv1.DataImportCron{}
1✔
116
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
117
                return reconcile.Result{}, err
1✔
118
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
119
                err := r.cleanup(ctx, req.NamespacedName)
1✔
120
                return reconcile.Result{}, err
1✔
121
        }
1✔
122
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
123
        if !shouldReconcile || err != nil {
1✔
124
                return reconcile.Result{}, err
×
125
        }
×
126

127
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
128
                return reconcile.Result{}, err
×
129
        }
×
130

131
        return r.update(ctx, dataImportCron)
1✔
132
}
133

134
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
135
        dataSource := &cdiv1.DataSource{}
1✔
136
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
137
                if k8serrors.IsNotFound(err) {
2✔
138
                        return true, nil
1✔
139
                }
1✔
140
                return false, err
×
141
        }
142
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
143
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
144
                return true, nil
1✔
145
        }
1✔
146
        otherCron := &cdiv1.DataImportCron{}
1✔
147
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
148
                if k8serrors.IsNotFound(err) {
2✔
149
                        return true, nil
1✔
150
                }
1✔
151
                return false, err
×
152
        }
153
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
154
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
155
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
156
                r.log.V(3).Info(msg)
1✔
157
                return false, nil
1✔
158
        }
1✔
159
        return true, nil
×
160
}
161

162
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
163
        if dataImportCron.Spec.Schedule == "" {
2✔
164
                return nil
1✔
165
        }
1✔
166
        if isControllerPolledSource(dataImportCron) {
2✔
167
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
168
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
169
                }
1✔
170
                return nil
1✔
171
        }
172
        if !isURLSource(dataImportCron) {
1✔
173
                return nil
×
174
        }
×
175
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
176
        if exists || err != nil {
2✔
177
                return err
1✔
178
        }
1✔
179
        cronJob, err := r.newCronJob(dataImportCron)
1✔
180
        if err != nil {
1✔
181
                return err
×
182
        }
×
183
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
184
                return err
×
185
        }
×
186
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
187
        if err != nil {
1✔
188
                return err
×
189
        }
×
190
        if err := r.client.Create(ctx, job); err != nil {
1✔
191
                return err
×
192
        }
×
193
        return nil
1✔
194
}
195

196
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
197
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
198
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
199
        }
×
200
        imageStream := &imagev1.ImageStream{}
1✔
201
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
202
        if err != nil {
2✔
203
                return nil, "", err
1✔
204
        }
1✔
205
        imageStreamNamespacedName := types.NamespacedName{
1✔
206
                Namespace: imageStreamNamespace,
1✔
207
                Name:      name,
1✔
208
        }
1✔
209
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
210
                return nil, "", err
1✔
211
        }
1✔
212
        return imageStream, tag, nil
1✔
213
}
214

215
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
216
        if imageStream == nil {
1✔
217
                return "", "", errors.Errorf("No ImageStream")
×
218
        }
×
219
        tags := imageStream.Status.Tags
1✔
220
        if len(tags) == 0 {
1✔
221
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
222
        }
×
223

224
        tagIdx := 0
1✔
225
        if len(imageStreamTag) > 0 {
2✔
226
                tagIdx = -1
1✔
227
                for i, tag := range tags {
2✔
228
                        if tag.Tag == imageStreamTag {
2✔
229
                                tagIdx = i
1✔
230
                                break
1✔
231
                        }
232
                }
233
        }
234
        if tagIdx == -1 {
2✔
235
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
236
        }
1✔
237

238
        if len(tags[tagIdx].Items) == 0 {
2✔
239
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
240
        }
1✔
241
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
242
}
243

244
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
245
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
246
                return imageStreamName, "", nil
1✔
247
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
248
                return subs[0], subs[1], nil
1✔
249
        }
1✔
250
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
251
}
252

253
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
254
        nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]
1✔
255
        if nextTimeStr == "" {
1✔
256
                return r.setNextCronTime(dataImportCron)
×
257
        }
×
258
        nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
259
        if err != nil {
1✔
260
                return reconcile.Result{}, err
×
261
        }
×
262
        if nextTime.After(time.Now()) {
2✔
263
                return r.setNextCronTime(dataImportCron)
1✔
264
        }
1✔
265
        switch {
1✔
266
        case isImageStreamSource(dataImportCron):
1✔
267
                if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
268
                        return reconcile.Result{}, err
1✔
269
                }
1✔
270
        case isPvcSource(dataImportCron):
1✔
271
                if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
2✔
272
                        return reconcile.Result{}, err
1✔
273
                }
1✔
274
        case isNodePull(dataImportCron):
1✔
275
                if done, err := r.updateContainerImageDesiredDigest(ctx, dataImportCron); !done {
2✔
276
                        return reconcile.Result{RequeueAfter: 3 * time.Second}, err
1✔
277
                } else if err != nil {
2✔
NEW
278
                        return reconcile.Result{}, err
×
NEW
279
                }
×
280
        }
281

282
        return r.setNextCronTime(dataImportCron)
1✔
283
}
284

285
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
286
        now := time.Now()
1✔
287
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
288
        if err != nil {
1✔
289
                return reconcile.Result{}, err
×
290
        }
×
291
        nextTime := expr.Next(now)
1✔
292
        requeueAfter := nextTime.Sub(now)
1✔
293
        res := reconcile.Result{RequeueAfter: requeueAfter}
1✔
294
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
295
        return res, err
1✔
296
}
297

298
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
299
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
300
        return err == nil && regSource.ImageStream != nil
1✔
301
}
1✔
302

303
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
304
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
305
        return err == nil && regSource.URL != nil
1✔
306
}
1✔
307

308
func isNodePull(cron *cdiv1.DataImportCron) bool {
1✔
309
        regSource, err := getCronRegistrySource(cron)
1✔
310
        return err == nil && regSource != nil && regSource.PullMethod != nil &&
1✔
311
                *regSource.PullMethod == cdiv1.RegistryPullNode
1✔
312
}
1✔
313

314
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
315
        if !isCronRegistrySource(cron) {
2✔
316
                return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
1✔
317
        }
1✔
318
        return cron.Spec.Template.Spec.Source.Registry, nil
1✔
319
}
320

321
func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
1✔
322
        source := cron.Spec.Template.Spec.Source
1✔
323
        return source != nil && source.Registry != nil
1✔
324
}
1✔
325

326
func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
1✔
327
        if !isPvcSource(cron) {
1✔
328
                return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
×
329
        }
×
330
        return cron.Spec.Template.Spec.Source.PVC, nil
1✔
331
}
332

333
func isPvcSource(cron *cdiv1.DataImportCron) bool {
1✔
334
        source := cron.Spec.Template.Spec.Source
1✔
335
        return source != nil && source.PVC != nil
1✔
336
}
1✔
337

338
func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
1✔
339
        return isImageStreamSource(cron) || isPvcSource(cron) || isNodePull(cron)
1✔
340
}
1✔
341

342
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
343
        res := reconcile.Result{}
1✔
344

1✔
345
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
346
        if err != nil {
1✔
347
                return res, err
×
348
        }
×
349

350
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
351
        imports := dataImportCron.Status.CurrentImports
1✔
352
        importSucceeded := false
1✔
353

1✔
354
        dataVolume := dataImportCron.Spec.Template
1✔
355
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
356
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
357
        if err != nil {
1✔
358
                return res, err
×
359
        }
×
360
        if desiredStorageClass != nil {
2✔
361
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
362
                        return res, err
1✔
363
                }
1✔
364
                currentSc, hasCurrent := dataImportCron.Annotations[AnnStorageClass]
1✔
365
                desiredSc := desiredStorageClass.Name
1✔
366
                if hasCurrent && currentSc != desiredSc {
2✔
367
                        r.log.Info("Storage class changed, delete most recent source on the old sc as it's no longer the desired", "currentSc", currentSc, "desiredSc", desiredSc)
1✔
368
                        if err := r.handleStorageClassChange(ctx, dataImportCron, desiredSc); err != nil {
1✔
369
                                return res, err
×
370
                        }
×
371
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
372
                }
373
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
374
        }
375
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
376
        if err != nil {
1✔
377
                return res, err
×
378
        }
×
379
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
380
        if err != nil {
1✔
381
                return res, err
×
382
        }
×
383

384
        handlePopulatedPvc := func() error {
2✔
385
                if pvc != nil {
2✔
386
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
387
                                return err
×
388
                        }
×
389
                }
390
                importSucceeded = true
1✔
391
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
392
                        return err
×
393
                }
×
394

395
                return nil
1✔
396
        }
397

398
        switch {
1✔
399
        case dv != nil:
1✔
400
                switch dv.Status.Phase {
1✔
401
                case cdiv1.Succeeded:
1✔
402
                        if err := handlePopulatedPvc(); err != nil {
1✔
403
                                return res, err
×
404
                        }
×
405
                case cdiv1.ImportScheduled:
1✔
406
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
407
                case cdiv1.ImportInProgress:
1✔
408
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
409
                default:
1✔
410
                        dvPhase := string(dv.Status.Phase)
1✔
411
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
412
                }
413
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
414
                if err := handlePopulatedPvc(); err != nil {
1✔
415
                        return res, err
×
416
                }
×
417
        case snapshot != nil:
1✔
418
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
419
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
420
                                return res, err
×
421
                        }
×
422
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
423
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
424
                }
425
                // Below k8s 1.29 there's no way to know the source volume mode
426
                // Let's at least expose this info on our own snapshots
427
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
428
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
429
                        if err != nil {
1✔
430
                                return res, err
×
431
                        }
×
432
                        if volMode != nil {
2✔
433
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
434
                        }
1✔
435
                }
436
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
437
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
438
                if err != nil {
2✔
439
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
440
                                return res, err
×
441
                        }
×
442
                } else {
1✔
443
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
444
                }
1✔
445
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
446
                        return res, err
×
447
                }
×
448
                importSucceeded = true
1✔
449
        default:
1✔
450
                if len(imports) > 0 {
2✔
451
                        imports = imports[1:]
1✔
452
                        dataImportCron.Status.CurrentImports = imports
1✔
453
                }
1✔
454
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
455
        }
456

457
        if importSucceeded {
2✔
458
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
459
                        return res, err
×
460
                }
×
461
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
462
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
463
                        return res, err
×
464
                }
×
465
        }
466

467
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
468
                return res, err
×
469
        }
×
470

471
        // Skip if schedule is disabled
472
        if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
473
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
474
                pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
1✔
475
                if err != nil {
2✔
476
                        return pollRes, err
1✔
477
                }
1✔
478
                res = pollRes
1✔
479
        }
480

481
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
482
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
483
        if digestUpdated {
2✔
484
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
485
                if dv != nil {
1✔
486
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
487
                                return res, err
×
488
                        }
×
489
                }
490
                if importSucceeded || len(imports) == 0 {
2✔
491
                        if err := r.createImportDataVolume(ctx, dataImportCron); err != nil {
2✔
492
                                return res, err
1✔
493
                        }
1✔
494
                }
495
        } else if importSucceeded {
2✔
496
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
497
                        return res, err
×
498
                }
×
499
        } else if len(imports) > 0 {
2✔
500
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
501
        } else {
2✔
502
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
503
        }
1✔
504

505
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
506
                return res, err
×
507
        }
×
508

509
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
510
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
511
                        return res, err
×
512
                }
×
513
        }
514
        return res, nil
1✔
515
}
516

517
// Returns the current import DV if exists, and the last imported PVC
518
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
519
        imports := cron.Status.CurrentImports
1✔
520
        if len(imports) == 0 {
2✔
521
                return nil, nil, nil
1✔
522
        }
1✔
523

524
        dvName := imports[0].DataVolumeName
1✔
525
        dv := &cdiv1.DataVolume{}
1✔
526
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
527
                if !k8serrors.IsNotFound(err) {
1✔
528
                        return nil, nil, err
×
529
                }
×
530
                dv = nil
1✔
531
        }
532

533
        pvc := &corev1.PersistentVolumeClaim{}
1✔
534
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
535
                if !k8serrors.IsNotFound(err) {
1✔
536
                        return nil, nil, err
×
537
                }
×
538
                pvc = nil
1✔
539
        }
540
        return dv, pvc, nil
1✔
541
}
542

543
// Returns the current import DV if exists, and the last imported PVC
544
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
545
        imports := cron.Status.CurrentImports
1✔
546
        if len(imports) == 0 {
2✔
547
                return nil, nil
1✔
548
        }
1✔
549

550
        snapName := imports[0].DataVolumeName
1✔
551
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
552
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
553
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
554
                        return nil, err
×
555
                }
×
556
                return nil, nil
1✔
557
        }
558

559
        return snapshot, nil
1✔
560
}
561

562
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
563
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
564
        dataSource := &cdiv1.DataSource{}
1✔
565
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
566
                return nil, err
1✔
567
        }
1✔
568
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
569
                log := r.log.WithName("getCronManagedDataSource")
×
570
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
×
571
                return nil, ErrNotManagedByCron
×
572
        }
×
573
        return dataSource, nil
1✔
574
}
575

576
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
577
        objCopy := obj.DeepCopyObject()
1✔
578
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
579
        r.setDataImportCronResourceLabels(cron, obj)
1✔
580
        if !reflect.DeepEqual(obj, objCopy) {
2✔
581
                if err := r.client.Update(ctx, obj); err != nil {
1✔
582
                        return err
×
583
                }
×
584
        }
585
        return nil
1✔
586
}
587

588
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
589
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
590
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
591
                if cond.Status == corev1.ConditionFalse &&
×
592
                        (cond.Reason == common.GenericError || cond.Reason == ImagePullFailedReason) {
×
593
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
594
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
595
                        dv.Labels[common.DataImportCronLabel] = ""
×
596
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
597
                                return err
×
598
                        }
×
599
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
600
                                return err
×
601
                        }
×
602
                        cron.Status.CurrentImports = nil
×
603
                }
604
        }
605
        return nil
×
606
}
607

608
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
609
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
610
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
611
        if err != nil {
1✔
612
                return err
×
613
        }
×
614
        if regSource.ImageStream == nil {
1✔
615
                return nil
×
616
        }
×
617
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
618
        if err != nil {
2✔
619
                return err
1✔
620
        }
1✔
621
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
622
        if err != nil {
2✔
623
                return err
1✔
624
        }
1✔
625
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
626
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
627
                log.Info("Updating DataImportCron", "digest", digest)
1✔
628
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
629
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
630
        }
1✔
631
        return nil
1✔
632
}
633

634
func (r *DataImportCronReconciler) updateContainerImageDesiredDigest(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
635
        log := r.log.WithValues("name", cron.Name).WithValues("uid", cron.UID)
1✔
636
        podName := getPollerPodName(cron)
1✔
637
        ns := cron.Namespace
1✔
638
        nn := types.NamespacedName{Name: podName, Namespace: ns}
1✔
639
        pod := &corev1.Pod{}
1✔
640

1✔
641
        if err := r.client.Get(ctx, nn, pod); err == nil {
2✔
642
                digest, err := fetchContainerImageDigest(pod)
1✔
643
                if err != nil || digest == "" {
1✔
NEW
644
                        return false, err
×
NEW
645
                }
×
646
                cc.AddAnnotation(cron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
647
                if cron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
648
                        log.Info("Updating DataImportCron", "digest", digest)
1✔
649
                        cc.AddAnnotation(cron, AnnSourceDesiredDigest, digest)
1✔
650
                }
1✔
651
                return true, r.client.Delete(ctx, pod)
1✔
652
        } else if cc.IgnoreNotFound(err) != nil {
1✔
NEW
653
                return false, err
×
NEW
654
        }
×
655

656
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(ctx, r.client)
1✔
657
        if err != nil {
1✔
NEW
658
                return false, err
×
NEW
659
        }
×
660

661
        containerImage := strings.TrimPrefix(*cron.Spec.Template.Spec.Source.Registry.URL, "docker://")
1✔
662

1✔
663
        pod = &corev1.Pod{
1✔
664
                ObjectMeta: metav1.ObjectMeta{
1✔
665
                        Name:      podName,
1✔
666
                        Namespace: ns,
1✔
667
                        OwnerReferences: []metav1.OwnerReference{
1✔
668
                                {
1✔
669
                                        APIVersion:         cron.APIVersion,
1✔
670
                                        Kind:               cron.Kind,
1✔
671
                                        Name:               cron.Name,
1✔
672
                                        UID:                cron.UID,
1✔
673
                                        BlockOwnerDeletion: ptr.To[bool](true),
1✔
674
                                        Controller:         ptr.To[bool](true),
1✔
675
                                },
1✔
676
                        },
1✔
677
                },
1✔
678
                Spec: corev1.PodSpec{
1✔
679
                        TerminationGracePeriodSeconds: ptr.To[int64](0),
1✔
680
                        RestartPolicy:                 corev1.RestartPolicyNever,
1✔
681
                        NodeSelector:                  workloadNodePlacement.NodeSelector,
1✔
682
                        Tolerations:                   workloadNodePlacement.Tolerations,
1✔
683
                        Affinity:                      workloadNodePlacement.Affinity,
1✔
684
                        Volumes: []corev1.Volume{
1✔
685
                                {
1✔
686
                                        Name: "shared-volume",
1✔
687
                                        VolumeSource: corev1.VolumeSource{
1✔
688
                                                EmptyDir: &corev1.EmptyDirVolumeSource{},
1✔
689
                                        },
1✔
690
                                },
1✔
691
                        },
1✔
692
                        InitContainers: []corev1.Container{
1✔
693
                                {
1✔
694
                                        Name:                     "init",
1✔
695
                                        Image:                    r.image,
1✔
696
                                        ImagePullPolicy:          corev1.PullPolicy(r.pullPolicy),
1✔
697
                                        Command:                  []string{"sh", "-c", "cp /usr/bin/cdi-containerimage-server /shared/server"},
1✔
698
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
699
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
700
                                },
1✔
701
                        },
1✔
702
                        Containers: []corev1.Container{
1✔
703
                                {
1✔
704
                                        Name:                     "image-container",
1✔
705
                                        Image:                    containerImage,
1✔
706
                                        ImagePullPolicy:          corev1.PullAlways,
1✔
707
                                        Command:                  []string{"/shared/server", "-h"},
1✔
708
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
709
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
710
                                },
1✔
711
                        },
1✔
712
                },
1✔
713
        }
1✔
714

1✔
715
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
716
        if pod.Spec.SecurityContext != nil {
2✔
717
                pod.Spec.SecurityContext.FSGroup = nil
1✔
718
        }
1✔
719

720
        return false, r.client.Create(ctx, pod)
1✔
721
}
722

723
func fetchContainerImageDigest(pod *corev1.Pod) (string, error) {
1✔
724
        if len(pod.Status.ContainerStatuses) == 0 {
1✔
NEW
725
                return "", nil
×
NEW
726
        }
×
727

728
        status := pod.Status.ContainerStatuses[0]
1✔
729
        if status.State.Waiting != nil {
1✔
NEW
730
                reason := status.State.Waiting.Reason
×
NEW
731
                switch reason {
×
NEW
732
                case "ImagePullBackOff", "ErrImagePull", "InvalidImageName":
×
NEW
733
                        return "", errors.Errorf("%s %s: %s", common.ImagePullFailureText, status.Image, reason)
×
734
                }
NEW
735
                return "", nil
×
736
        }
737

738
        if status.State.Terminated == nil {
1✔
NEW
739
                return "", nil
×
NEW
740
        }
×
741

742
        imageID := status.ImageID
1✔
743
        if imageID == "" {
1✔
NEW
744
                return "", errors.Errorf("Container has no imageID")
×
NEW
745
        }
×
746
        idx := strings.Index(imageID, digestSha256Prefix)
1✔
747
        if idx < 0 {
1✔
NEW
748
                return "", errors.Errorf("Container image %s ID has no digest: %s", status.Image, imageID)
×
NEW
749
        }
×
750

751
        return imageID[idx:], nil
1✔
752
}
753

754
func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
755
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
756
        pvcSource, err := getCronPvcSource(dataImportCron)
1✔
757
        if err != nil {
1✔
758
                return err
×
759
        }
×
760
        ns := pvcSource.Namespace
1✔
761
        if ns == "" {
2✔
762
                ns = dataImportCron.Namespace
1✔
763
        }
1✔
764
        pvc := &corev1.PersistentVolumeClaim{}
1✔
765
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
2✔
766
                return err
1✔
767
        }
1✔
768
        digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
1✔
769
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
770
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
771
                log.Info("Updating DataImportCron", "digest", digest)
1✔
772
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
773
        }
1✔
774
        return nil
1✔
775
}
776

777
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
778
        log := r.log.WithName("updateDataSource")
1✔
779
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
780
        if err != nil {
2✔
781
                if k8serrors.IsNotFound(err) {
2✔
782
                        dataSource = r.newDataSource(dataImportCron)
1✔
783
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
784
                                return err
×
785
                        }
×
786
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
1✔
787
                } else if errors.Is(err, ErrNotManagedByCron) {
×
788
                        return nil
×
789
                } else {
×
790
                        return err
×
791
                }
×
792
        }
793
        dataSourceCopy := dataSource.DeepCopy()
1✔
794
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
795

1✔
796
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
797
        populateDataSource(format, dataSource, sourcePVC)
1✔
798

1✔
799
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
800
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
801
                        return err
×
802
                }
×
803
        }
804

805
        return nil
1✔
806
}
807

808
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
809
        if sourcePVC == nil {
2✔
810
                return
1✔
811
        }
1✔
812

813
        switch format {
1✔
814
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
815
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
816
                        PVC: sourcePVC,
1✔
817
                }
1✔
818
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
819
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
820
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
821
                                Namespace: sourcePVC.Namespace,
1✔
822
                                Name:      sourcePVC.Name,
1✔
823
                        },
1✔
824
                }
1✔
825
        }
826
}
827

828
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
829
        if dataImportCron.Status.CurrentImports == nil {
1✔
830
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
831
        }
×
832
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
833
                Namespace: dataImportCron.Namespace,
1✔
834
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
835
        }
1✔
836
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
837
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
838
                now := metav1.Now()
1✔
839
                dataImportCron.Status.LastImportTimestamp = &now
1✔
840
        }
1✔
841
        return nil
1✔
842
}
843

844
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
845
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
846
        if lastTimeStr == "" {
2✔
847
                return nil
1✔
848
        }
1✔
849
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
850
        if err != nil {
1✔
851
                return err
×
852
        }
×
853
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
854
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
855
        }
1✔
856
        return nil
1✔
857
}
858

859
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
860
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
861
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
862
        if digest == "" {
1✔
863
                return nil
×
864
        }
×
865
        dvName, err := createDvName(dataSourceName, digest)
1✔
866
        if err != nil {
2✔
867
                return err
1✔
868
        }
1✔
869
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
870

1✔
871
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
872
        for _, src := range sources {
2✔
873
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
874
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
875
                                return err
×
876
                        }
×
877
                } else {
1✔
878
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
879
                                return err
×
880
                        }
×
881
                        // If source exists don't create DV
882
                        return nil
1✔
883
                }
884
        }
885

886
        dv := r.newSourceDataVolume(dataImportCron, dvName)
1✔
887
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
888
                return err
×
889
        }
×
890

891
        return nil
1✔
892
}
893

894
func (r *DataImportCronReconciler) handleStorageClassChange(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
1✔
895
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
896
        if !ok {
1✔
897
                // nothing to delete
×
898
                return nil
×
899
        }
×
900
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
901
        if err != nil {
1✔
902
                return err
×
903
        }
×
904
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
1✔
905
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
1✔
906
        for _, src := range sources {
2✔
907
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
908
                        return err
×
909
                }
×
910
        }
911
        for _, src := range sources {
2✔
912
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
1✔
913
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
×
914
                }
×
915
        }
916
        // Only update desired storage class once garbage collection went through
917
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
1✔
918
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
919
        if err != nil {
1✔
920
                return err
×
921
        }
×
922

923
        return nil
1✔
924
}
925

926
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
927
        switch format {
1✔
928
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
929
                return nil
1✔
930
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
931
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
932
        default:
×
933
                return fmt.Errorf("unknown source format for snapshot")
×
934
        }
935
}
936

937
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
938
        if pvc == nil {
1✔
939
                return nil
×
940
        }
×
941
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
942
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
943
                return nil
1✔
944
        }
1✔
945
        storageProfile := &cdiv1.StorageProfile{}
1✔
946
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
947
                return err
×
948
        }
×
949
        className, err := cc.GetSnapshotClassForSmartClone(pvc, &desiredStorageClass.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
950
        if err != nil {
1✔
951
                return err
×
952
        }
×
953
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
954
                ObjectMeta: metav1.ObjectMeta{
1✔
955
                        Name:      pvc.Name,
1✔
956
                        Namespace: dataImportCron.Namespace,
1✔
957
                        Labels: map[string]string{
1✔
958
                                common.CDILabelKey:       common.CDILabelValue,
1✔
959
                                common.CDIComponentLabel: "",
1✔
960
                        },
1✔
961
                },
1✔
962
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
963
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
964
                                PersistentVolumeClaimName: &pvc.Name,
1✔
965
                        },
1✔
966
                        VolumeSnapshotClassName: &className,
1✔
967
                },
1✔
968
        }
1✔
969
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
970
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
1✔
971

1✔
972
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
973
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
974
                if !k8serrors.IsNotFound(err) {
1✔
975
                        return err
×
976
                }
×
977
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
978
                if pvc.Spec.VolumeMode != nil {
2✔
979
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
980
                }
1✔
981
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
982
                        return err
×
983
                }
×
984
        } else {
1✔
985
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
986
                        // Clean up DV/PVC as they are not needed anymore
1✔
987
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
988
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
989
                                return err
×
990
                        }
×
991
                }
992
        }
993

994
        return nil
1✔
995
}
996

997
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
998
        dataImportCron.Status.SourceFormat = &format
1✔
999

1✔
1000
        switch format {
1✔
1001
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1002
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1003
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1004
                if snapshot == nil {
2✔
1005
                        // Snapshot create/update will trigger reconcile
1✔
1006
                        return nil
1✔
1007
                }
1✔
1008
                if cc.IsSnapshotReady(snapshot) {
2✔
1009
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1010
                } else {
2✔
1011
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
1012
                }
1✔
1013
        default:
×
1014
                return fmt.Errorf("unknown source format for snapshot")
×
1015
        }
1016

1017
        return nil
1✔
1018
}
1019

1020
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
1021
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
1022
        if desiredStorageClass == nil {
2✔
1023
                return format, nil
1✔
1024
        }
1✔
1025

1026
        storageProfile := &cdiv1.StorageProfile{}
1✔
1027
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1028
                return format, err
×
1029
        }
×
1030
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
1031
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
1032
        }
1✔
1033

1034
        return format, nil
1✔
1035
}
1036

1037
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
1038
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
1039
                return nil
×
1040
        }
×
1041
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
1042
        if err != nil {
1✔
1043
                return err
×
1044
        }
×
1045

1046
        maxImports := defaultImportsToKeepPerCron
1✔
1047

1✔
1048
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
1049
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
1050
        }
1✔
1051

1052
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
1053
                return err
×
1054
        }
×
1055
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
1056
                return err
×
1057
        }
×
1058

1059
        return nil
1✔
1060
}
1061

1062
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
1063
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
1064

1✔
1065
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1066
                return err
×
1067
        }
×
1068
        if len(pvcList.Items) > maxImports {
2✔
1069
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
1070
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
1071
                })
1✔
1072
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
1073
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1074
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
1075
                                return err
×
1076
                        }
×
1077
                }
1078
        }
1079

1080
        dvList := &cdiv1.DataVolumeList{}
1✔
1081
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1082
                return err
×
1083
        }
×
1084

1085
        if len(dvList.Items) > maxImports {
2✔
1086
                for _, dv := range dvList.Items {
2✔
1087
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
1088
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
1089
                                return err
×
1090
                        }
×
1091

1092
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1093
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1094
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
1095
                                        return err
×
1096
                                }
×
1097
                        }
1098
                }
1099
        }
1100

1101
        return nil
1✔
1102
}
1103

1104
// deleteDvPvc deletes DV or PVC if DV was GCed
1105
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
1106
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
1107
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
1108
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
1109
                return err
1✔
1110
        }
1✔
1111
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
1112
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
1113
                return err
×
1114
        }
×
1115
        return nil
1✔
1116
}
1117

1118
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
1119
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
1120

1✔
1121
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1122
                if meta.IsNoMatchError(err) {
×
1123
                        return nil
×
1124
                }
×
1125
                return err
×
1126
        }
1127
        if len(snapList.Items) > maxImports {
1✔
1128
                sort.Slice(snapList.Items, func(i, j int) bool {
×
1129
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
1130
                })
×
1131
                for _, snap := range snapList.Items[maxImports:] {
×
1132
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
1133
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
1134
                                return err
×
1135
                        }
×
1136
                }
1137
        }
1138

1139
        return nil
1✔
1140
}
1141

1142
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
1143
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
1144
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
1145

1✔
1146
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
1147
                return err
×
1148
        }
×
1149
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
1150
        if err != nil {
1✔
1151
                return err
×
1152
        }
×
1153
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
1154
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
1155
                return err
×
1156
        }
×
1157
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
1158
                return err
×
1159
        }
×
1160
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
1161
                return err
×
1162
        }
×
1163
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
1164
                return err
×
1165
        }
×
1166
        return nil
1✔
1167
}
1168

1169
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
1170
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
1171
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
1172
        if err != nil {
1✔
1173
                return err
×
1174
        }
×
1175
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
1176
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
1177
                return err
×
1178
        }
×
1179
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
1180
                return err
×
1181
        }
×
1182

1183
        return nil
1✔
1184
}
1185

1186
// NewDataImportCronController creates a new instance of the DataImportCron controller
1187
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
1188
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
1189
                Scheme: mgr.GetScheme(),
×
1190
                Mapper: mgr.GetRESTMapper(),
×
1191
        })
×
1192
        if err != nil {
×
1193
                return nil, err
×
1194
        }
×
1195
        reconciler := &DataImportCronReconciler{
×
1196
                client:          mgr.GetClient(),
×
1197
                uncachedClient:  uncachedClient,
×
1198
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1199
                scheme:          mgr.GetScheme(),
×
1200
                log:             log.WithName(dataImportControllerName),
×
1201
                image:           importerImage,
×
1202
                pullPolicy:      pullPolicy,
×
1203
                cdiNamespace:    util.GetNamespace(),
×
1204
                installerLabels: installerLabels,
×
1205
        }
×
1206
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1207
                MaxConcurrentReconciles: 3,
×
1208
                Reconciler:              reconciler,
×
1209
        })
×
1210
        if err != nil {
×
1211
                return nil, err
×
1212
        }
×
1213
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
1214
                return nil, err
×
1215
        }
×
1216
        log.Info("Initialized DataImportCron controller")
×
1217
        return dataImportCronController, nil
×
1218
}
1219

1220
func getCronName(obj client.Object) string {
×
1221
        return obj.GetLabels()[common.DataImportCronLabel]
×
1222
}
×
1223

1224
func getCronNs(obj client.Object) string {
×
1225
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
1226
}
×
1227

1228
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
1229
        if cronName := getCronName(obj); cronName != "" {
×
1230
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
1231
        }
×
1232
        return nil
×
1233
}
1234

1235
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1236
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1237
                return err
×
1238
        }
×
1239

1240
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1241
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1242
                // Otherwise we risk losing the storage profile event
×
1243
                var crons cdiv1.DataImportCronList
×
1244
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
1245
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
1246
                        return nil
×
1247
                }
×
1248
                // Storage profiles are 1:1 to storage classes
1249
                scName := obj.GetName()
×
1250
                var reqs []reconcile.Request
×
1251
                for _, cron := range crons.Items {
×
1252
                        dataVolume := cron.Spec.Template
×
1253
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1254
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1255
                        if err != nil || templateSc == nil {
×
1256
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1257
                                return reqs
×
1258
                        }
×
1259
                        if templateSc.Name == scName {
×
1260
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1261
                        }
×
1262
                }
1263
                return reqs
×
1264
        }
1265

1266
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1267
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1268
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1269
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1270
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1271
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1272
                },
1273
        )); err != nil {
×
1274
                return err
×
1275
        }
×
1276

1277
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1278
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1279
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1280
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1281
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1282
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1283
                },
1284
        )); err != nil {
×
1285
                return err
×
1286
        }
×
1287

1288
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1289
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1290
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1291
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1292
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1293
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1294
                },
1295
        )); err != nil {
×
1296
                return err
×
1297
        }
×
1298

1299
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1300
                return err
×
1301
        }
×
1302

1303
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1304
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1305
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
1306
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1307
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1308
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1309
                                return e.ObjectOld.Status.DataImportCronSourceFormat != e.ObjectNew.Status.DataImportCronSourceFormat
×
1310
                        },
×
1311
                },
1312
        )); err != nil {
×
1313
                return err
×
1314
        }
×
1315

1316
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1317
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1318
        }
×
1319

1320
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1321
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1322
                predicate.TypedFuncs[*batchv1.CronJob]{
×
1323
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1324
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1325
                        },
×
1326
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
1327
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1328
                },
1329
        )); err != nil {
×
1330
                return err
×
1331
        }
×
1332

1333
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1334
                if meta.IsNoMatchError(err) {
×
1335
                        // Back out if there's no point to attempt watch
×
1336
                        return nil
×
1337
                }
×
1338
                if !cc.IsErrCacheNotStarted(err) {
×
1339
                        return err
×
1340
                }
×
1341
        }
1342
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1343
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
1344
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1345
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1346
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1347
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1348
                },
1349
        )); err != nil {
×
1350
                return err
×
1351
        }
×
1352

1353
        return nil
×
1354
}
1355

1356
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1357
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1358
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1359
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
1360
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1361
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1362
                                log.Info("Update", "sc", obj.GetName(),
×
1363
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1364
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1365
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
×
1366
                                if err != nil {
×
1367
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
1368
                                }
×
1369
                                return reqs
×
1370
                        },
1371
                ),
1372
                predicate.TypedFuncs[*storagev1.StorageClass]{
1373
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1374
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1375
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1376
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1377
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1378
                        },
×
1379
                },
1380
        )); err != nil {
×
1381
                return err
×
1382
        }
×
1383

1384
        return nil
×
1385
}
1386

1387
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1388
        dicList := &cdiv1.DataImportCronList{}
×
1389
        if err := c.List(ctx, dicList); err != nil {
×
1390
                return nil, err
×
1391
        }
×
1392
        reqs := []reconcile.Request{}
×
1393
        for _, dic := range dicList.Items {
×
1394
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
1395
                        continue
×
1396
                }
1397

1398
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1399
        }
1400

1401
        return reqs, nil
×
1402
}
1403

1404
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
1✔
1405
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1406
                return false, nil
1✔
1407
        }
1✔
1408

1409
        sc := pvc.Spec.StorageClassName
1✔
1410
        if sc == nil || *sc == desiredStorageClass {
2✔
1411
                return false, nil
1✔
1412
        }
1✔
1413

1414
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1✔
1415
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
1416
                return false, err
×
1417
        }
×
1418

1419
        return true, nil
1✔
1420
}
1421

1422
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1423
        cronJob := &batchv1.CronJob{}
1✔
1424
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1425
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1426
                return false, cc.IgnoreNotFound(err)
1✔
1427
        }
1✔
1428

1429
        cronJobCopy := cronJob.DeepCopy()
1✔
1430
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1431
                return false, err
×
1432
        }
×
1433

1434
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1435
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1436
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
1437
                        return false, cc.IgnoreNotFound(err)
×
1438
                }
×
1439
        }
1440
        return true, nil
1✔
1441
}
1442

1443
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1444
        cronJob := &batchv1.CronJob{
1✔
1445
                ObjectMeta: metav1.ObjectMeta{
1✔
1446
                        Name:      GetCronJobName(cron),
1✔
1447
                        Namespace: r.cdiNamespace,
1✔
1448
                },
1✔
1449
        }
1✔
1450
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
1451
                return nil, err
×
1452
        }
×
1453
        return cronJob, nil
1✔
1454
}
1455

1456
// InitPollerPodSpec inits poller PodSpec
1457
func InitPollerPodSpec(c client.Client, cron *cdiv1.DataImportCron, podSpec *corev1.PodSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1458
        regSource, err := getCronRegistrySource(cron)
1✔
1459
        if err != nil {
1✔
1460
                return err
×
1461
        }
×
1462
        if regSource.URL == nil {
1✔
1463
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1464
        }
×
1465
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1466
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
1467
                return err
×
1468
        }
×
1469
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1470
        if err != nil {
1✔
1471
                return err
×
1472
        }
×
1473
        container := corev1.Container{
1✔
1474
                Name:  "cdi-source-update-poller",
1✔
1475
                Image: image,
1✔
1476
                Command: []string{
1✔
1477
                        "/usr/bin/cdi-source-update-poller",
1✔
1478
                        "-ns", cron.Namespace,
1✔
1479
                        "-cron", cron.Name,
1✔
1480
                        "-url", *regSource.URL,
1✔
1481
                },
1✔
1482
                ImagePullPolicy:          pullPolicy,
1✔
1483
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1484
                TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
1485
        }
1✔
1486

1✔
1487
        var volumes []corev1.Volume
1✔
1488
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1489
        if hasCertConfigMap {
1✔
1490
                vm := corev1.VolumeMount{
×
1491
                        Name:      CertVolName,
×
1492
                        MountPath: common.ImporterCertDir,
×
1493
                }
×
1494
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1495
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
1496
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
1497
        }
×
1498

1499
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1500
                vm := corev1.VolumeMount{
1✔
1501
                        Name:      ProxyCertVolName,
1✔
1502
                        MountPath: common.ImporterProxyCertDir,
1✔
1503
                }
1✔
1504
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1505
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1506
        }
1✔
1507

1508
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
1509
                container.Env = append(container.Env,
×
1510
                        corev1.EnvVar{
×
1511
                                Name: common.ImporterAccessKeyID,
×
1512
                                ValueFrom: &corev1.EnvVarSource{
×
1513
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1514
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1515
                                                        Name: *regSource.SecretRef,
×
1516
                                                },
×
1517
                                                Key: common.KeyAccess,
×
1518
                                        },
×
1519
                                },
×
1520
                        },
×
1521
                        corev1.EnvVar{
×
1522
                                Name: common.ImporterSecretKey,
×
1523
                                ValueFrom: &corev1.EnvVarSource{
×
1524
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1525
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1526
                                                        Name: *regSource.SecretRef,
×
1527
                                                },
×
1528
                                                Key: common.KeySecret,
×
1529
                                        },
×
1530
                                },
×
1531
                        },
×
1532
                )
×
1533
        }
×
1534

1535
        addEnvVar := func(varName, value string) {
2✔
1536
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1537
        }
1✔
1538

1539
        if insecureTLS {
1✔
1540
                addEnvVar(common.InsecureTLSVar, "true")
×
1541
        }
×
1542

1543
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1544
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1545
                        addEnvVar(varName, value)
1✔
1546
                }
1✔
1547
        }
1548

1549
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1550
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1551
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1552

1✔
1553
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1554
        if err != nil {
1✔
1555
                return err
×
1556
        }
×
1557
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1558
        if err != nil {
1✔
1559
                return err
×
1560
        }
×
1561

1562
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1563
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1564
        podSpec.Containers = []corev1.Container{container}
1✔
1565
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1566
        podSpec.Volumes = volumes
1✔
1567
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1568
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1569
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1570
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1571

1✔
1572
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1573
        // No need for specifid uid/fsgroup here since this doesn't write or use qemu
1✔
1574
        if podSpec.SecurityContext != nil {
2✔
1575
                podSpec.SecurityContext.FSGroup = nil
1✔
1576
        }
1✔
1577
        if podSpec.Containers[0].SecurityContext != nil {
2✔
1578
                podSpec.Containers[0].SecurityContext.RunAsUser = nil
1✔
1579
        }
1✔
1580

1581
        return nil
1✔
1582
}
1583

1584
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1585
        cronJobSpec := &cronJob.Spec
1✔
1586
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1587
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1588
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1589
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1590

1✔
1591
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1592
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1593
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1594
        cc.AddAnnotation(&jobSpec.Template, secv1.RequiredSCCAnnotation, common.RestrictedSCCName)
1✔
1595

1✔
1596
        podSpec := &jobSpec.Template.Spec
1✔
1597
        if err := InitPollerPodSpec(r.client, cron, podSpec, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1598
                return err
×
1599
        }
×
1600
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1601
                return err
×
1602
        }
×
1603
        return nil
1✔
1604
}
1605

1606
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1607
        job := &batchv1.Job{
1✔
1608
                ObjectMeta: metav1.ObjectMeta{
1✔
1609
                        Name:      GetInitialJobName(cron),
1✔
1610
                        Namespace: cronJob.Namespace,
1✔
1611
                },
1✔
1612
                Spec: cronJob.Spec.JobTemplate.Spec,
1✔
1613
        }
1✔
1614
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1615
                return nil, err
×
1616
        }
×
1617
        return job, nil
1✔
1618
}
1619

1620
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1621
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1622
                return err
×
1623
        }
×
1624
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1625
        labels := obj.GetLabels()
1✔
1626
        labels[common.DataImportCronNsLabel] = cron.Namespace
1✔
1627
        labels[common.DataImportCronLabel] = cron.Name
1✔
1628
        obj.SetLabels(labels)
1✔
1629
        return nil
1✔
1630
}
1631

1632
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string) *cdiv1.DataVolume {
1✔
1633
        dv := cron.Spec.Template.DeepCopy()
1✔
1634
        if isCronRegistrySource(cron) {
2✔
1635
                var digestedURL string
1✔
1636
                if isURLSource(cron) {
2✔
1637
                        digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1638
                } else if isImageStreamSource(cron) {
3✔
1639
                        // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1640
                        digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1641
                        dv.Spec.Source.Registry.ImageStream = nil
1✔
1642
                }
1✔
1643
                dv.Spec.Source.Registry.URL = &digestedURL
1✔
1644
        }
1645
        dv.Name = dataVolumeName
1✔
1646
        dv.Namespace = cron.Namespace
1✔
1647
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1648
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1649
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1650
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1651

1✔
1652
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
1653
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1654
        }
1✔
1655

1656
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1657

1✔
1658
        return dv
1✔
1659
}
1660

1661
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1662
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1663
        labels := obj.GetLabels()
1✔
1664
        labels[common.DataImportCronLabel] = cron.Name
1✔
1665
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
2✔
1666
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1667
        }
1✔
1668
        obj.SetLabels(labels)
1✔
1669
}
1670

1671
func untagDigestedDockerURL(dockerURL string) string {
1✔
1672
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1673
                url := u.Host + u.Path
1✔
1674
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1675
                // Check for tag
1✔
1676
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1677
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
2✔
1678
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1679
                        }
1✔
1680
                }
1681
        }
1682
        return dockerURL
1✔
1683
}
1684

1685
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1686
        if val := cron.Labels[ann]; val != "" {
2✔
1687
                cc.AddLabel(dv, ann, val)
1✔
1688
        }
1✔
1689
}
1690

1691
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1692
        if val := cron.Annotations[ann]; val != "" {
1✔
1693
                cc.AddAnnotation(dv, ann, val)
×
1694
        }
×
1695
}
1696

1697
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1✔
1698
        dataSource := &cdiv1.DataSource{
1✔
1699
                ObjectMeta: metav1.ObjectMeta{
1✔
1700
                        Name:      cron.Spec.ManagedDataSource,
1✔
1701
                        Namespace: cron.Namespace,
1✔
1702
                },
1✔
1703
        }
1✔
1704
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1705
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1706
        return dataSource
1✔
1707
}
1✔
1708

1709
// Create DataVolume name based on the DataSource name + prefix of the digest
1710
func createDvName(prefix, digest string) (string, error) {
1✔
1711
        digestPrefix := ""
1✔
1712
        if strings.HasPrefix(digest, digestSha256Prefix) {
2✔
1713
                digestPrefix = digestSha256Prefix
1✔
1714
        } else if strings.HasPrefix(digest, digestUIDPrefix) {
3✔
1715
                digestPrefix = digestUIDPrefix
1✔
1716
        } else {
2✔
1717
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1718
        }
1✔
1719
        fromIdx := len(digestPrefix)
1✔
1720
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1721
        if len(digest) < toIdx {
2✔
1722
                return "", errors.Errorf("Digest is too short")
1✔
1723
        }
1✔
1724
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1725
}
1726

1727
// GetCronJobName get CronJob name based on cron name and UID
1728
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1729
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1730
}
1✔
1731

1732
// GetInitialJobName get initial job name based on cron name and UID
1733
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1734
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1735
}
1✔
1736

1737
func getPollerPodName(cron *cdiv1.DataImportCron) string {
1✔
1738
        return naming.GetResourceName("poller-"+cron.Name, string(cron.UID)[:8])
1✔
1739
}
1✔
1740

1741
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1742
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1743
}
1✔
1744

1745
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1746
        dv := &cron.Spec.Template
1✔
1747

1✔
1748
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1749
                return explicitVolumeMode, nil
×
1750
        }
×
1751

1752
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1753
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1754
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1755
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1756
                        AccessModes:      accessModes,
1✔
1757
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1758
                        Resources: corev1.VolumeResourceRequirements{
1✔
1759
                                Requests: corev1.ResourceList{
1✔
1760
                                        // Doesn't matter
1✔
1761
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1762
                                },
1✔
1763
                        },
1✔
1764
                },
1✔
1765
        }
1✔
1766
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1767
                return nil, err
×
1768
        }
×
1769

1770
        return inferredPvc.Spec.VolumeMode, nil
1✔
1771
}
1772

1773
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1774
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1775
        if dv.Spec.PVC != nil {
1✔
1776
                return dv.Spec.PVC.VolumeMode
×
1777
        }
×
1778

1779
        if dv.Spec.Storage != nil {
2✔
1780
                return dv.Spec.Storage.VolumeMode
1✔
1781
        }
1✔
1782

1783
        return nil
×
1784
}
1785

1786
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1787
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1788
        if dv.Spec.PVC != nil {
1✔
1789
                return dv.Spec.PVC.AccessModes
×
1790
        }
×
1791

1792
        if dv.Spec.Storage != nil {
2✔
1793
                return dv.Spec.Storage.AccessModes
1✔
1794
        }
1✔
1795

1796
        return nil
×
1797
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc