• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5597

18 Sep 2025 02:27PM UTC coverage: 59.206% (-0.005%) from 59.211%
#5597

push

travis-ci

web-flow
datasource-controller: decompose addDataSourceControllerWatches (#3889)

Previously the function was bigger than it needed to be, this decomposes
it to several helper functions for clarity.

Signed-off-by: Adi Aloni <aaloni@redhat.com>

0 of 53 new or added lines in 1 file covered. (0.0%)

175 existing lines in 3 files now uncovered.

17175 of 29009 relevant lines covered (59.21%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.62
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        secv1 "github.com/openshift/api/security/v1"
33
        "github.com/pkg/errors"
34
        cronexpr "github.com/robfig/cron/v3"
35

36
        batchv1 "k8s.io/api/batch/v1"
37
        corev1 "k8s.io/api/core/v1"
38
        storagev1 "k8s.io/api/storage/v1"
39
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
40
        "k8s.io/apimachinery/pkg/api/meta"
41
        "k8s.io/apimachinery/pkg/api/resource"
42
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
43
        "k8s.io/apimachinery/pkg/labels"
44
        "k8s.io/apimachinery/pkg/runtime"
45
        "k8s.io/apimachinery/pkg/types"
46
        "k8s.io/client-go/tools/record"
47
        openapicommon "k8s.io/kube-openapi/pkg/common"
48
        "k8s.io/utils/ptr"
49

50
        "sigs.k8s.io/controller-runtime/pkg/client"
51
        "sigs.k8s.io/controller-runtime/pkg/controller"
52
        "sigs.k8s.io/controller-runtime/pkg/event"
53
        "sigs.k8s.io/controller-runtime/pkg/handler"
54
        "sigs.k8s.io/controller-runtime/pkg/manager"
55
        "sigs.k8s.io/controller-runtime/pkg/predicate"
56
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
57
        "sigs.k8s.io/controller-runtime/pkg/source"
58

59
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
60
        "kubevirt.io/containerized-data-importer/pkg/common"
61
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
62
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
63
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
64
        "kubevirt.io/containerized-data-importer/pkg/operator"
65
        "kubevirt.io/containerized-data-importer/pkg/util"
66
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
67
)
68

69
const (
70
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
71
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
72
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
73
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
74
)
75

76
// DataImportCronReconciler members
77
type DataImportCronReconciler struct {
78
        client          client.Client
79
        uncachedClient  client.Client
80
        recorder        record.EventRecorder
81
        scheme          *runtime.Scheme
82
        log             logr.Logger
83
        image           string
84
        pullPolicy      string
85
        cdiNamespace    string
86
        installerLabels map[string]string
87
}
88

89
const (
90
        // AnnSourceDesiredDigest is the digest of the pending updated image
91
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
92
        // AnnImageStreamDockerRef is the ImageStream Docker reference
93
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
94
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
95
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
96
        // AnnLastCronTime is the cron last execution time stamp
97
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
98
        // AnnLastUseTime is the PVC last use time stamp
99
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
100
        // AnnStorageClass is the cron DV's storage class
101
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
102

103
        dataImportControllerName    = "dataimportcron-controller"
104
        digestSha256Prefix          = "sha256:"
105
        digestUIDPrefix             = "uid:"
106
        digestDvNameSuffixLength    = 12
107
        cronJobUIDSuffixLength      = 8
108
        defaultImportsToKeepPerCron = 3
109
)
110

111
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
112

113
// Reconcile loop for DataImportCronReconciler
114
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
115
        dataImportCron := &cdiv1.DataImportCron{}
1✔
116
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
117
                return reconcile.Result{}, err
1✔
118
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
119
                err := r.cleanup(ctx, req.NamespacedName)
1✔
120
                return reconcile.Result{}, err
1✔
121
        }
1✔
122
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
123
        if !shouldReconcile || err != nil {
1✔
124
                return reconcile.Result{}, err
×
125
        }
×
126

127
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
128
                return reconcile.Result{}, err
×
129
        }
×
130

131
        return r.update(ctx, dataImportCron)
1✔
132
}
133

134
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
135
        dataSource := &cdiv1.DataSource{}
1✔
136
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
137
                if k8serrors.IsNotFound(err) {
2✔
138
                        return true, nil
1✔
139
                }
1✔
140
                return false, err
×
141
        }
142
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
143
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
144
                return true, nil
1✔
145
        }
1✔
146
        otherCron := &cdiv1.DataImportCron{}
1✔
147
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
148
                if k8serrors.IsNotFound(err) {
2✔
149
                        return true, nil
1✔
150
                }
1✔
151
                return false, err
×
152
        }
153
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
154
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
155
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
156
                r.log.V(3).Info(msg)
1✔
157
                return false, nil
1✔
158
        }
1✔
159
        return true, nil
×
160
}
161

162
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
163
        if dataImportCron.Spec.Schedule == "" {
2✔
164
                return nil
1✔
165
        }
1✔
166
        if isControllerPolledSource(dataImportCron) {
2✔
167
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
168
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
169
                }
1✔
170
                return nil
1✔
171
        }
172
        if !isURLSource(dataImportCron) {
1✔
173
                return nil
×
174
        }
×
175
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
176
        if exists || err != nil {
2✔
177
                return err
1✔
178
        }
1✔
179
        cronJob, err := r.newCronJob(dataImportCron)
1✔
180
        if err != nil {
1✔
181
                return err
×
182
        }
×
183
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
184
                return err
×
185
        }
×
186
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
187
        if err != nil {
1✔
188
                return err
×
189
        }
×
190
        if err := r.client.Create(ctx, job); err != nil {
1✔
191
                return err
×
192
        }
×
193
        return nil
1✔
194
}
195

196
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
197
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
198
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
199
        }
×
200
        imageStream := &imagev1.ImageStream{}
1✔
201
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
202
        if err != nil {
2✔
203
                return nil, "", err
1✔
204
        }
1✔
205
        imageStreamNamespacedName := types.NamespacedName{
1✔
206
                Namespace: imageStreamNamespace,
1✔
207
                Name:      name,
1✔
208
        }
1✔
209
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
210
                return nil, "", err
1✔
211
        }
1✔
212
        return imageStream, tag, nil
1✔
213
}
214

215
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
216
        if imageStream == nil {
1✔
217
                return "", "", errors.Errorf("No ImageStream")
×
218
        }
×
219
        tags := imageStream.Status.Tags
1✔
220
        if len(tags) == 0 {
1✔
221
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
222
        }
×
223

224
        tagIdx := 0
1✔
225
        if len(imageStreamTag) > 0 {
2✔
226
                tagIdx = -1
1✔
227
                for i, tag := range tags {
2✔
228
                        if tag.Tag == imageStreamTag {
2✔
229
                                tagIdx = i
1✔
230
                                break
1✔
231
                        }
232
                }
233
        }
234
        if tagIdx == -1 {
2✔
235
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
236
        }
1✔
237

238
        if len(tags[tagIdx].Items) == 0 {
2✔
239
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
240
        }
1✔
241
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
242
}
243

244
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
245
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
246
                return imageStreamName, "", nil
1✔
247
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
248
                return subs[0], subs[1], nil
1✔
249
        }
1✔
250
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
251
}
252

253
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
254
        nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]
1✔
255
        if nextTimeStr == "" {
1✔
256
                return r.setNextCronTime(dataImportCron)
×
257
        }
×
258
        nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
259
        if err != nil {
1✔
260
                return reconcile.Result{}, err
×
261
        }
×
262
        if nextTime.After(time.Now()) {
2✔
263
                return r.setNextCronTime(dataImportCron)
1✔
264
        }
1✔
265
        switch {
1✔
266
        case isImageStreamSource(dataImportCron):
1✔
267
                if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
268
                        return reconcile.Result{}, err
1✔
269
                }
1✔
270
        case isPvcSource(dataImportCron):
1✔
271
                if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
2✔
272
                        return reconcile.Result{}, err
1✔
273
                }
1✔
274
        case isNodePull(dataImportCron):
1✔
275
                if done, err := r.updateContainerImageDesiredDigest(ctx, dataImportCron); !done {
2✔
276
                        return reconcile.Result{RequeueAfter: 3 * time.Second}, err
1✔
277
                } else if err != nil {
2✔
278
                        return reconcile.Result{}, err
×
279
                }
×
280
        }
281

282
        return r.setNextCronTime(dataImportCron)
1✔
283
}
284

285
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
286
        now := time.Now()
1✔
287
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
288
        if err != nil {
1✔
289
                return reconcile.Result{}, err
×
290
        }
×
291
        nextTime := expr.Next(now)
1✔
292
        requeueAfter := nextTime.Sub(now)
1✔
293
        res := reconcile.Result{RequeueAfter: requeueAfter}
1✔
294
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
295
        return res, err
1✔
296
}
297

298
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
299
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
300
        return err == nil && regSource.ImageStream != nil
1✔
301
}
1✔
302

303
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
304
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
305
        return err == nil && regSource.URL != nil
1✔
306
}
1✔
307

308
func isNodePull(cron *cdiv1.DataImportCron) bool {
1✔
309
        regSource, err := getCronRegistrySource(cron)
1✔
310
        return err == nil && regSource != nil && regSource.PullMethod != nil &&
1✔
311
                *regSource.PullMethod == cdiv1.RegistryPullNode
1✔
312
}
1✔
313

314
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
315
        if !isCronRegistrySource(cron) {
2✔
316
                return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
1✔
317
        }
1✔
318
        return cron.Spec.Template.Spec.Source.Registry, nil
1✔
319
}
320

321
func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
1✔
322
        source := cron.Spec.Template.Spec.Source
1✔
323
        return source != nil && source.Registry != nil
1✔
324
}
1✔
325

326
func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
1✔
327
        if !isPvcSource(cron) {
1✔
328
                return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
×
329
        }
×
330
        return cron.Spec.Template.Spec.Source.PVC, nil
1✔
331
}
332

333
func isPvcSource(cron *cdiv1.DataImportCron) bool {
1✔
334
        source := cron.Spec.Template.Spec.Source
1✔
335
        return source != nil && source.PVC != nil
1✔
336
}
1✔
337

338
func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
1✔
339
        return isImageStreamSource(cron) || isPvcSource(cron) || isNodePull(cron)
1✔
340
}
1✔
341

342
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
343
        res := reconcile.Result{}
1✔
344

1✔
345
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
346
        if err != nil {
1✔
347
                return res, err
×
348
        }
×
349

350
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
351
        imports := dataImportCron.Status.CurrentImports
1✔
352
        importSucceeded := false
1✔
353

1✔
354
        dataVolume := dataImportCron.Spec.Template
1✔
355
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
356
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
357
        if err != nil {
1✔
358
                return res, err
×
359
        }
×
360
        if desiredStorageClass != nil {
2✔
361
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
362
                        return res, err
1✔
363
                }
1✔
364
                currentSc, hasCurrent := dataImportCron.Annotations[AnnStorageClass]
1✔
365
                desiredSc := desiredStorageClass.Name
1✔
366
                if hasCurrent && currentSc != desiredSc {
2✔
367
                        r.log.Info("Storage class changed, delete most recent source on the old sc as it's no longer the desired", "currentSc", currentSc, "desiredSc", desiredSc)
1✔
368
                        if err := r.handleStorageClassChange(ctx, dataImportCron, desiredSc); err != nil {
1✔
369
                                return res, err
×
370
                        }
×
371
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
372
                }
373
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
374
        }
375
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
376
        if err != nil {
1✔
377
                return res, err
×
378
        }
×
379
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
380
        if err != nil {
1✔
381
                return res, err
×
382
        }
×
383

384
        handlePopulatedPvc := func() error {
2✔
385
                if pvc != nil {
2✔
386
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
387
                                return err
×
388
                        }
×
389
                }
390
                importSucceeded = true
1✔
391
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
392
                        return err
×
393
                }
×
394

395
                return nil
1✔
396
        }
397

398
        switch {
1✔
399
        case dv != nil:
1✔
400
                switch dv.Status.Phase {
1✔
401
                case cdiv1.Succeeded:
1✔
402
                        if err := handlePopulatedPvc(); err != nil {
1✔
403
                                return res, err
×
404
                        }
×
405
                case cdiv1.ImportScheduled:
1✔
406
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
407
                case cdiv1.ImportInProgress:
1✔
408
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
409
                default:
1✔
410
                        dvPhase := string(dv.Status.Phase)
1✔
411
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
412
                }
413
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
414
                if err := handlePopulatedPvc(); err != nil {
1✔
415
                        return res, err
×
416
                }
×
417
        case snapshot != nil:
1✔
418
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
419
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
420
                                return res, err
×
421
                        }
×
422
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
423
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
424
                }
425
                // Below k8s 1.29 there's no way to know the source volume mode
426
                // Let's at least expose this info on our own snapshots
427
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
428
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
429
                        if err != nil {
1✔
430
                                return res, err
×
431
                        }
×
432
                        if volMode != nil {
2✔
433
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
434
                        }
1✔
435
                }
436
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
437
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
438
                if err != nil {
2✔
439
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
440
                                return res, err
×
441
                        }
×
442
                } else {
1✔
443
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
444
                }
1✔
445
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
446
                        return res, err
×
447
                }
×
448
                importSucceeded = true
1✔
449
        default:
1✔
450
                if len(imports) > 0 {
2✔
451
                        imports = imports[1:]
1✔
452
                        dataImportCron.Status.CurrentImports = imports
1✔
453
                }
1✔
454
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
455
        }
456

457
        if importSucceeded {
2✔
458
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
459
                        return res, err
×
460
                }
×
461
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
462
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
463
                        return res, err
×
464
                }
×
465
        }
466

467
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
468
                return res, err
×
469
        }
×
470

471
        // Skip if schedule is disabled
472
        if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
473
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
474
                pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
1✔
475
                if err != nil {
2✔
476
                        return pollRes, err
1✔
477
                }
1✔
478
                res = pollRes
1✔
479
        }
480

481
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
482
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
483
        if digestUpdated {
2✔
484
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
485
                if dv != nil {
1✔
486
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
487
                                return res, err
×
488
                        }
×
489
                }
490
                if importSucceeded || len(imports) == 0 {
2✔
491
                        if err := r.createImportDataVolume(ctx, dataImportCron); err != nil {
2✔
492
                                return res, err
1✔
493
                        }
1✔
494
                }
495
        } else if importSucceeded {
2✔
496
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
497
                        return res, err
×
498
                }
×
499
        } else if len(imports) > 0 {
2✔
500
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
501
        } else {
2✔
502
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
503
        }
1✔
504

505
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
506
                return res, err
×
507
        }
×
508

509
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
510
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
511
                        return res, err
×
512
                }
×
513
        }
514
        return res, nil
1✔
515
}
516

517
// Returns the current import DV if exists, and the last imported PVC
518
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
519
        imports := cron.Status.CurrentImports
1✔
520
        if len(imports) == 0 {
2✔
521
                return nil, nil, nil
1✔
522
        }
1✔
523

524
        dvName := imports[0].DataVolumeName
1✔
525
        dv := &cdiv1.DataVolume{}
1✔
526
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
527
                if !k8serrors.IsNotFound(err) {
1✔
528
                        return nil, nil, err
×
529
                }
×
530
                dv = nil
1✔
531
        }
532

533
        pvc := &corev1.PersistentVolumeClaim{}
1✔
534
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
535
                if !k8serrors.IsNotFound(err) {
1✔
536
                        return nil, nil, err
×
537
                }
×
538
                pvc = nil
1✔
539
        }
540
        return dv, pvc, nil
1✔
541
}
542

543
// Returns the current import DV if exists, and the last imported PVC
544
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
545
        imports := cron.Status.CurrentImports
1✔
546
        if len(imports) == 0 {
2✔
547
                return nil, nil
1✔
548
        }
1✔
549

550
        snapName := imports[0].DataVolumeName
1✔
551
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
552
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
553
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
554
                        return nil, err
×
555
                }
×
556
                return nil, nil
1✔
557
        }
558

559
        return snapshot, nil
1✔
560
}
561

562
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
563
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
564
        dataSource := &cdiv1.DataSource{}
1✔
565
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
566
                return nil, err
1✔
567
        }
1✔
568
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
569
                log := r.log.WithName("getCronManagedDataSource")
×
570
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
×
571
                return nil, ErrNotManagedByCron
×
572
        }
×
573
        return dataSource, nil
1✔
574
}
575

576
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
577
        objCopy := obj.DeepCopyObject()
1✔
578
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
579
        r.setDataImportCronResourceLabels(cron, obj)
1✔
580
        if !reflect.DeepEqual(obj, objCopy) {
2✔
581
                if err := r.client.Update(ctx, obj); err != nil {
1✔
582
                        return err
×
583
                }
×
584
        }
585
        return nil
1✔
586
}
587

588
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
589
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
590
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
591
                if cond.Status == corev1.ConditionFalse &&
×
592
                        (cond.Reason == common.GenericError || cond.Reason == ImagePullFailedReason) {
×
593
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
594
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
595
                        dv.Labels[common.DataImportCronLabel] = ""
×
596
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
597
                                return err
×
598
                        }
×
599
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
600
                                return err
×
601
                        }
×
602
                        cron.Status.CurrentImports = nil
×
603
                }
604
        }
605
        return nil
×
606
}
607

608
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
609
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
610
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
611
        if err != nil {
1✔
612
                return err
×
613
        }
×
614
        if regSource.ImageStream == nil {
1✔
615
                return nil
×
616
        }
×
617
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
618
        if err != nil {
2✔
619
                return err
1✔
620
        }
1✔
621
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
622
        if err != nil {
2✔
623
                return err
1✔
624
        }
1✔
625
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
626
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
627
                log.Info("Updating DataImportCron", "digest", digest)
1✔
628
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
629
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
630
        }
1✔
631
        return nil
1✔
632
}
633

634
func (r *DataImportCronReconciler) updateContainerImageDesiredDigest(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
635
        log := r.log.WithValues("name", cron.Name).WithValues("uid", cron.UID)
1✔
636
        podName := getPollerPodName(cron)
1✔
637
        ns := cron.Namespace
1✔
638
        nn := types.NamespacedName{Name: podName, Namespace: ns}
1✔
639
        pod := &corev1.Pod{}
1✔
640

1✔
641
        if err := r.client.Get(ctx, nn, pod); err == nil {
2✔
642
                digest, err := fetchContainerImageDigest(pod)
1✔
643
                if err != nil || digest == "" {
1✔
644
                        return false, err
×
645
                }
×
646
                cc.AddAnnotation(cron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
647
                if cron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
648
                        log.Info("Updating DataImportCron", "digest", digest)
1✔
649
                        cc.AddAnnotation(cron, AnnSourceDesiredDigest, digest)
1✔
650
                }
1✔
651
                return true, r.client.Delete(ctx, pod)
1✔
652
        } else if cc.IgnoreNotFound(err) != nil {
1✔
653
                return false, err
×
654
        }
×
655

656
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(ctx, r.client)
1✔
657
        if err != nil {
1✔
658
                return false, err
×
659
        }
×
660
        platform := cron.Spec.Template.Spec.Source.Registry.Platform
1✔
661
        if platform != nil && platform.Architecture != "" {
1✔
UNCOV
662
                workloadNodePlacement.NodeSelector[corev1.LabelArchStable] = platform.Architecture
×
UNCOV
663
        }
×
664

665
        containerImage := strings.TrimPrefix(*cron.Spec.Template.Spec.Source.Registry.URL, "docker://")
1✔
666

1✔
667
        pod = &corev1.Pod{
1✔
668
                ObjectMeta: metav1.ObjectMeta{
1✔
669
                        Name:      podName,
1✔
670
                        Namespace: ns,
1✔
671
                        OwnerReferences: []metav1.OwnerReference{
1✔
672
                                {
1✔
673
                                        APIVersion:         cron.APIVersion,
1✔
674
                                        Kind:               cron.Kind,
1✔
675
                                        Name:               cron.Name,
1✔
676
                                        UID:                cron.UID,
1✔
677
                                        BlockOwnerDeletion: ptr.To[bool](true),
1✔
678
                                        Controller:         ptr.To[bool](true),
1✔
679
                                },
1✔
680
                        },
1✔
681
                },
1✔
682
                Spec: corev1.PodSpec{
1✔
683
                        TerminationGracePeriodSeconds: ptr.To[int64](0),
1✔
684
                        RestartPolicy:                 corev1.RestartPolicyNever,
1✔
685
                        NodeSelector:                  workloadNodePlacement.NodeSelector,
1✔
686
                        Tolerations:                   workloadNodePlacement.Tolerations,
1✔
687
                        Affinity:                      workloadNodePlacement.Affinity,
1✔
688
                        Volumes: []corev1.Volume{
1✔
689
                                {
1✔
690
                                        Name: "shared-volume",
1✔
691
                                        VolumeSource: corev1.VolumeSource{
1✔
692
                                                EmptyDir: &corev1.EmptyDirVolumeSource{},
1✔
693
                                        },
1✔
694
                                },
1✔
695
                        },
1✔
696
                        InitContainers: []corev1.Container{
1✔
697
                                {
1✔
698
                                        Name:                     "init",
1✔
699
                                        Image:                    r.image,
1✔
700
                                        ImagePullPolicy:          corev1.PullPolicy(r.pullPolicy),
1✔
701
                                        Command:                  []string{"sh", "-c", "cp /usr/bin/cdi-containerimage-server /shared/server"},
1✔
702
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
703
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
704
                                },
1✔
705
                        },
1✔
706
                        Containers: []corev1.Container{
1✔
707
                                {
1✔
708
                                        Name:                     "image-container",
1✔
709
                                        Image:                    containerImage,
1✔
710
                                        ImagePullPolicy:          corev1.PullAlways,
1✔
711
                                        Command:                  []string{"/shared/server", "-h"},
1✔
712
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
713
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
714
                                },
1✔
715
                        },
1✔
716
                },
1✔
717
        }
1✔
718

1✔
719
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
720
        if pod.Spec.SecurityContext != nil {
2✔
721
                pod.Spec.SecurityContext.FSGroup = nil
1✔
722
        }
1✔
723

724
        return false, r.client.Create(ctx, pod)
1✔
725
}
726

727
func fetchContainerImageDigest(pod *corev1.Pod) (string, error) {
1✔
728
        if len(pod.Status.ContainerStatuses) == 0 {
1✔
UNCOV
729
                return "", nil
×
730
        }
×
731

732
        status := pod.Status.ContainerStatuses[0]
1✔
733
        if status.State.Waiting != nil {
1✔
UNCOV
734
                reason := status.State.Waiting.Reason
×
735
                switch reason {
×
UNCOV
736
                case "ImagePullBackOff", "ErrImagePull", "InvalidImageName":
×
UNCOV
737
                        return "", errors.Errorf("%s %s: %s", common.ImagePullFailureText, status.Image, reason)
×
738
                }
739
                return "", nil
×
740
        }
741

742
        if status.State.Terminated == nil {
1✔
UNCOV
743
                return "", nil
×
744
        }
×
745

746
        imageID := status.ImageID
1✔
747
        if imageID == "" {
1✔
748
                return "", errors.Errorf("Container has no imageID")
×
749
        }
×
750
        idx := strings.Index(imageID, digestSha256Prefix)
1✔
751
        if idx < 0 {
1✔
UNCOV
752
                return "", errors.Errorf("Container image %s ID has no digest: %s", status.Image, imageID)
×
UNCOV
753
        }
×
754

755
        return imageID[idx:], nil
1✔
756
}
757

758
func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
759
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
760
        pvcSource, err := getCronPvcSource(dataImportCron)
1✔
761
        if err != nil {
1✔
UNCOV
762
                return err
×
UNCOV
763
        }
×
764
        ns := pvcSource.Namespace
1✔
765
        if ns == "" {
2✔
766
                ns = dataImportCron.Namespace
1✔
767
        }
1✔
768
        pvc := &corev1.PersistentVolumeClaim{}
1✔
769
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
2✔
770
                return err
1✔
771
        }
1✔
772
        digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
1✔
773
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
774
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
775
                log.Info("Updating DataImportCron", "digest", digest)
1✔
776
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
777
        }
1✔
778
        return nil
1✔
779
}
780

781
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
782
        log := r.log.WithName("updateDataSource")
1✔
783
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
784
        if err != nil {
2✔
785
                if k8serrors.IsNotFound(err) {
2✔
786
                        dataSource = r.newDataSource(dataImportCron)
1✔
787
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
788
                                return err
×
789
                        }
×
790
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
1✔
791
                } else if errors.Is(err, ErrNotManagedByCron) {
×
UNCOV
792
                        return nil
×
UNCOV
793
                } else {
×
UNCOV
794
                        return err
×
UNCOV
795
                }
×
796
        }
797
        dataSourceCopy := dataSource.DeepCopy()
1✔
798
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
799

1✔
800
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
801
        populateDataSource(format, dataSource, sourcePVC)
1✔
802

1✔
803
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
804
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
UNCOV
805
                        return err
×
UNCOV
806
                }
×
807
        }
808

809
        return nil
1✔
810
}
811

812
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
813
        if sourcePVC == nil {
2✔
814
                return
1✔
815
        }
1✔
816

817
        switch format {
1✔
818
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
819
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
820
                        PVC: sourcePVC,
1✔
821
                }
1✔
822
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
823
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
824
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
825
                                Namespace: sourcePVC.Namespace,
1✔
826
                                Name:      sourcePVC.Name,
1✔
827
                        },
1✔
828
                }
1✔
829
        }
830
}
831

832
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
833
        if dataImportCron.Status.CurrentImports == nil {
1✔
UNCOV
834
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
UNCOV
835
        }
×
836
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
837
                Namespace: dataImportCron.Namespace,
1✔
838
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
839
        }
1✔
840
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
841
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
842
                now := metav1.Now()
1✔
843
                dataImportCron.Status.LastImportTimestamp = &now
1✔
844
        }
1✔
845
        return nil
1✔
846
}
847

848
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
849
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
850
        if lastTimeStr == "" {
2✔
851
                return nil
1✔
852
        }
1✔
853
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
854
        if err != nil {
1✔
UNCOV
855
                return err
×
UNCOV
856
        }
×
857
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
858
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
859
        }
1✔
860
        return nil
1✔
861
}
862

863
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
864
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
865
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
866
        if digest == "" {
1✔
UNCOV
867
                return nil
×
UNCOV
868
        }
×
869
        dvName, err := createDvName(dataSourceName, digest)
1✔
870
        if err != nil {
2✔
871
                return err
1✔
872
        }
1✔
873
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
874

1✔
875
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
876
        for _, src := range sources {
2✔
877
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
878
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
879
                                return err
×
880
                        }
×
881
                } else {
1✔
882
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
UNCOV
883
                                return err
×
UNCOV
884
                        }
×
885
                        // If source exists don't create DV
886
                        return nil
1✔
887
                }
888
        }
889

890
        dv := r.newSourceDataVolume(dataImportCron, dvName)
1✔
891
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
UNCOV
892
                return err
×
UNCOV
893
        }
×
894

895
        return nil
1✔
896
}
897

898
func (r *DataImportCronReconciler) handleStorageClassChange(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
1✔
899
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
900
        if !ok {
1✔
UNCOV
901
                // nothing to delete
×
902
                return nil
×
903
        }
×
904
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
905
        if err != nil {
1✔
UNCOV
906
                return err
×
UNCOV
907
        }
×
908
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
1✔
909
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
1✔
910
        for _, src := range sources {
2✔
911
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
UNCOV
912
                        return err
×
913
                }
×
914
        }
915
        for _, src := range sources {
2✔
916
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
1✔
UNCOV
917
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
×
UNCOV
918
                }
×
919
        }
920
        // Only update desired storage class once garbage collection went through
921
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
1✔
922
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
923
        if err != nil {
1✔
UNCOV
924
                return err
×
UNCOV
925
        }
×
926

927
        return nil
1✔
928
}
929

930
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
931
        switch format {
1✔
932
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
933
                return nil
1✔
934
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
935
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
UNCOV
936
        default:
×
UNCOV
937
                return fmt.Errorf("unknown source format for snapshot")
×
938
        }
939
}
940

941
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
942
        if pvc == nil {
1✔
UNCOV
943
                return nil
×
UNCOV
944
        }
×
945
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
946
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
947
                return nil
1✔
948
        }
1✔
949
        storageProfile := &cdiv1.StorageProfile{}
1✔
950
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
951
                return err
×
952
        }
×
953
        className, err := cc.GetSnapshotClassForSmartClone(pvc, &desiredStorageClass.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
954
        if err != nil {
1✔
UNCOV
955
                return err
×
UNCOV
956
        }
×
957
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
958
                ObjectMeta: metav1.ObjectMeta{
1✔
959
                        Name:      pvc.Name,
1✔
960
                        Namespace: dataImportCron.Namespace,
1✔
961
                        Labels: map[string]string{
1✔
962
                                common.CDILabelKey:       common.CDILabelValue,
1✔
963
                                common.CDIComponentLabel: "",
1✔
964
                        },
1✔
965
                },
1✔
966
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
967
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
968
                                PersistentVolumeClaimName: &pvc.Name,
1✔
969
                        },
1✔
970
                        VolumeSnapshotClassName: &className,
1✔
971
                },
1✔
972
        }
1✔
973
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
974
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
1✔
975

1✔
976
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
977
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
978
                if !k8serrors.IsNotFound(err) {
1✔
UNCOV
979
                        return err
×
UNCOV
980
                }
×
981
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
982
                if pvc.Spec.VolumeMode != nil {
2✔
983
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
984
                }
1✔
985
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
UNCOV
986
                        return err
×
UNCOV
987
                }
×
988
        } else {
1✔
989
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
990
                        // Clean up DV/PVC as they are not needed anymore
1✔
991
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
992
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
UNCOV
993
                                return err
×
UNCOV
994
                        }
×
995
                }
996
        }
997

998
        return nil
1✔
999
}
1000

1001
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
1002
        dataImportCron.Status.SourceFormat = &format
1✔
1003

1✔
1004
        switch format {
1✔
1005
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1006
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1007
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1008
                if snapshot == nil {
2✔
1009
                        // Snapshot create/update will trigger reconcile
1✔
1010
                        return nil
1✔
1011
                }
1✔
1012
                if cc.IsSnapshotReady(snapshot) {
2✔
1013
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1014
                } else {
2✔
1015
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
1016
                }
1✔
UNCOV
1017
        default:
×
UNCOV
1018
                return fmt.Errorf("unknown source format for snapshot")
×
1019
        }
1020

1021
        return nil
1✔
1022
}
1023

1024
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
1025
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
1026
        if desiredStorageClass == nil {
2✔
1027
                return format, nil
1✔
1028
        }
1✔
1029

1030
        storageProfile := &cdiv1.StorageProfile{}
1✔
1031
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
UNCOV
1032
                return format, err
×
UNCOV
1033
        }
×
1034
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
1035
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
1036
        }
1✔
1037

1038
        return format, nil
1✔
1039
}
1040

1041
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
1042
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
1043
                return nil
×
1044
        }
×
1045
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
1046
        if err != nil {
1✔
UNCOV
1047
                return err
×
UNCOV
1048
        }
×
1049

1050
        maxImports := defaultImportsToKeepPerCron
1✔
1051

1✔
1052
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
1053
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
1054
        }
1✔
1055

1056
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
1057
                return err
×
UNCOV
1058
        }
×
1059
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
UNCOV
1060
                return err
×
UNCOV
1061
        }
×
1062

1063
        return nil
1✔
1064
}
1065

1066
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
1067
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
1068

1✔
1069
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
UNCOV
1070
                return err
×
UNCOV
1071
        }
×
1072
        if len(pvcList.Items) > maxImports {
2✔
1073
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
1074
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
1075
                })
1✔
1076
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
1077
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1078
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
UNCOV
1079
                                return err
×
UNCOV
1080
                        }
×
1081
                }
1082
        }
1083

1084
        dvList := &cdiv1.DataVolumeList{}
1✔
1085
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
UNCOV
1086
                return err
×
UNCOV
1087
        }
×
1088

1089
        if len(dvList.Items) > maxImports {
2✔
1090
                for _, dv := range dvList.Items {
2✔
1091
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
1092
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
UNCOV
1093
                                return err
×
UNCOV
1094
                        }
×
1095

1096
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1097
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1098
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
UNCOV
1099
                                        return err
×
UNCOV
1100
                                }
×
1101
                        }
1102
                }
1103
        }
1104

1105
        return nil
1✔
1106
}
1107

1108
// deleteDvPvc deletes DV or PVC if DV was GCed
1109
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
1110
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
1111
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
1112
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
1113
                return err
1✔
1114
        }
1✔
1115
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
1116
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
UNCOV
1117
                return err
×
UNCOV
1118
        }
×
1119
        return nil
1✔
1120
}
1121

1122
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
1123
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
1124

1✔
1125
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
UNCOV
1126
                if meta.IsNoMatchError(err) {
×
UNCOV
1127
                        return nil
×
1128
                }
×
1129
                return err
×
1130
        }
1131
        if len(snapList.Items) > maxImports {
1✔
1132
                sort.Slice(snapList.Items, func(i, j int) bool {
×
1133
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
1134
                })
×
1135
                for _, snap := range snapList.Items[maxImports:] {
×
UNCOV
1136
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
UNCOV
1137
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
UNCOV
1138
                                return err
×
UNCOV
1139
                        }
×
1140
                }
1141
        }
1142

1143
        return nil
1✔
1144
}
1145

1146
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
1147
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
1148
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
1149

1✔
1150
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
1151
                return err
×
1152
        }
×
1153
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
1154
        if err != nil {
1✔
1155
                return err
×
1156
        }
×
1157
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
1158
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
1159
                return err
×
UNCOV
1160
        }
×
1161
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
1162
                return err
×
UNCOV
1163
        }
×
1164
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
1165
                return err
×
UNCOV
1166
        }
×
1167
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
UNCOV
1168
                return err
×
UNCOV
1169
        }
×
1170
        return nil
1✔
1171
}
1172

1173
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
1174
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
1175
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
1176
        if err != nil {
1✔
1177
                return err
×
1178
        }
×
1179
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
1180
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
1181
                return err
×
UNCOV
1182
        }
×
1183
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
UNCOV
1184
                return err
×
UNCOV
1185
        }
×
1186

1187
        return nil
1✔
1188
}
1189

1190
// NewDataImportCronController creates a new instance of the DataImportCron controller
1191
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
1192
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
1193
                Scheme: mgr.GetScheme(),
×
1194
                Mapper: mgr.GetRESTMapper(),
×
1195
        })
×
1196
        if err != nil {
×
1197
                return nil, err
×
1198
        }
×
1199
        reconciler := &DataImportCronReconciler{
×
1200
                client:          mgr.GetClient(),
×
1201
                uncachedClient:  uncachedClient,
×
1202
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1203
                scheme:          mgr.GetScheme(),
×
1204
                log:             log.WithName(dataImportControllerName),
×
1205
                image:           importerImage,
×
1206
                pullPolicy:      pullPolicy,
×
1207
                cdiNamespace:    util.GetNamespace(),
×
1208
                installerLabels: installerLabels,
×
1209
        }
×
1210
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1211
                MaxConcurrentReconciles: 3,
×
1212
                Reconciler:              reconciler,
×
1213
        })
×
1214
        if err != nil {
×
1215
                return nil, err
×
1216
        }
×
1217
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
UNCOV
1218
                return nil, err
×
UNCOV
1219
        }
×
1220
        log.Info("Initialized DataImportCron controller")
×
1221
        return dataImportCronController, nil
×
1222
}
1223

1224
func getCronName(obj client.Object) string {
×
1225
        return obj.GetLabels()[common.DataImportCronLabel]
×
1226
}
×
1227

1228
func getCronNs(obj client.Object) string {
×
1229
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
1230
}
×
1231

1232
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
UNCOV
1233
        if cronName := getCronName(obj); cronName != "" {
×
UNCOV
1234
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
1235
        }
×
1236
        return nil
×
1237
}
1238

UNCOV
1239
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1240
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1241
                return err
×
1242
        }
×
1243

1244
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1245
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1246
                // Otherwise we risk losing the storage profile event
×
1247
                var crons cdiv1.DataImportCronList
×
UNCOV
1248
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
1249
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
1250
                        return nil
×
1251
                }
×
1252
                // Storage profiles are 1:1 to storage classes
1253
                scName := obj.GetName()
×
1254
                var reqs []reconcile.Request
×
1255
                for _, cron := range crons.Items {
×
1256
                        dataVolume := cron.Spec.Template
×
1257
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1258
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1259
                        if err != nil || templateSc == nil {
×
1260
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1261
                                return reqs
×
UNCOV
1262
                        }
×
1263
                        if templateSc.Name == scName {
×
UNCOV
1264
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
UNCOV
1265
                        }
×
1266
                }
1267
                return reqs
×
1268
        }
1269

1270
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1271
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
UNCOV
1272
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1273
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1274
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1275
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1276
                },
1277
        )); err != nil {
×
1278
                return err
×
1279
        }
×
1280

1281
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1282
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
UNCOV
1283
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1284
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1285
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1286
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1287
                },
1288
        )); err != nil {
×
1289
                return err
×
1290
        }
×
1291

1292
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1293
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
UNCOV
1294
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1295
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1296
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1297
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1298
                },
1299
        )); err != nil {
×
1300
                return err
×
1301
        }
×
1302

1303
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1304
                return err
×
1305
        }
×
1306

1307
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1308
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1309
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
1310
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
UNCOV
1311
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1312
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1313
                                return e.ObjectOld.Status.DataImportCronSourceFormat != e.ObjectNew.Status.DataImportCronSourceFormat
×
1314
                        },
×
1315
                },
1316
        )); err != nil {
×
1317
                return err
×
1318
        }
×
1319

1320
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1321
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1322
        }
×
1323

1324
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1325
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1326
                predicate.TypedFuncs[*batchv1.CronJob]{
×
1327
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
UNCOV
1328
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1329
                        },
×
1330
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
1331
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1332
                },
1333
        )); err != nil {
×
1334
                return err
×
1335
        }
×
1336

1337
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1338
                if meta.IsNoMatchError(err) {
×
1339
                        // Back out if there's no point to attempt watch
×
1340
                        return nil
×
UNCOV
1341
                }
×
1342
                if !cc.IsErrCacheNotStarted(err) {
×
1343
                        return err
×
1344
                }
×
1345
        }
1346
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1347
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
UNCOV
1348
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1349
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1350
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1351
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1352
                },
1353
        )); err != nil {
×
UNCOV
1354
                return err
×
UNCOV
1355
        }
×
1356

1357
        return nil
×
1358
}
1359

1360
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1361
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1362
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1363
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
1364
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1365
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1366
                                log.Info("Update", "sc", obj.GetName(),
×
1367
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1368
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1369
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
×
UNCOV
1370
                                if err != nil {
×
UNCOV
1371
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
UNCOV
1372
                                }
×
1373
                                return reqs
×
1374
                        },
1375
                ),
1376
                predicate.TypedFuncs[*storagev1.StorageClass]{
1377
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1378
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
UNCOV
1379
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1380
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1381
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1382
                        },
×
1383
                },
1384
        )); err != nil {
×
UNCOV
1385
                return err
×
UNCOV
1386
        }
×
1387

1388
        return nil
×
1389
}
1390

1391
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1392
        dicList := &cdiv1.DataImportCronList{}
×
1393
        if err := c.List(ctx, dicList); err != nil {
×
1394
                return nil, err
×
1395
        }
×
UNCOV
1396
        reqs := []reconcile.Request{}
×
UNCOV
1397
        for _, dic := range dicList.Items {
×
1398
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
UNCOV
1399
                        continue
×
1400
                }
1401

UNCOV
1402
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1403
        }
1404

UNCOV
1405
        return reqs, nil
×
1406
}
1407

1408
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
1✔
1409
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1410
                return false, nil
1✔
1411
        }
1✔
1412

1413
        sc := pvc.Spec.StorageClassName
1✔
1414
        if sc == nil || *sc == desiredStorageClass {
2✔
1415
                return false, nil
1✔
1416
        }
1✔
1417

1418
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1✔
1419
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
UNCOV
1420
                return false, err
×
UNCOV
1421
        }
×
1422

1423
        return true, nil
1✔
1424
}
1425

1426
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1427
        cronJob := &batchv1.CronJob{}
1✔
1428
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1429
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1430
                return false, cc.IgnoreNotFound(err)
1✔
1431
        }
1✔
1432

1433
        cronJobCopy := cronJob.DeepCopy()
1✔
1434
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
UNCOV
1435
                return false, err
×
UNCOV
1436
        }
×
1437

1438
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1439
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1440
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
UNCOV
1441
                        return false, cc.IgnoreNotFound(err)
×
UNCOV
1442
                }
×
1443
        }
1444
        return true, nil
1✔
1445
}
1446

1447
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1448
        cronJob := &batchv1.CronJob{
1✔
1449
                ObjectMeta: metav1.ObjectMeta{
1✔
1450
                        Name:      GetCronJobName(cron),
1✔
1451
                        Namespace: r.cdiNamespace,
1✔
1452
                },
1✔
1453
        }
1✔
1454
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
UNCOV
1455
                return nil, err
×
UNCOV
1456
        }
×
1457
        return cronJob, nil
1✔
1458
}
1459

1460
// InitPollerPodSpec inits poller PodSpec
1461
func InitPollerPodSpec(c client.Client, cron *cdiv1.DataImportCron, podSpec *corev1.PodSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1462
        regSource, err := getCronRegistrySource(cron)
1✔
1463
        if err != nil {
1✔
1464
                return err
×
UNCOV
1465
        }
×
1466
        if regSource.URL == nil {
1✔
1467
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1468
        }
×
1469
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1470
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
1471
                return err
×
1472
        }
×
1473
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1474
        if err != nil {
1✔
UNCOV
1475
                return err
×
UNCOV
1476
        }
×
1477
        container := corev1.Container{
1✔
1478
                Name:  "cdi-source-update-poller",
1✔
1479
                Image: image,
1✔
1480
                Command: []string{
1✔
1481
                        "/usr/bin/cdi-source-update-poller",
1✔
1482
                        "-ns", cron.Namespace,
1✔
1483
                        "-cron", cron.Name,
1✔
1484
                        "-url", *regSource.URL,
1✔
1485
                },
1✔
1486
                ImagePullPolicy:          pullPolicy,
1✔
1487
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1488
                TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
1489
        }
1✔
1490

1✔
1491
        var volumes []corev1.Volume
1✔
1492
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1493
        if hasCertConfigMap {
1✔
1494
                vm := corev1.VolumeMount{
×
1495
                        Name:      CertVolName,
×
1496
                        MountPath: common.ImporterCertDir,
×
1497
                }
×
UNCOV
1498
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
UNCOV
1499
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
UNCOV
1500
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
UNCOV
1501
        }
×
1502

1503
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1504
                vm := corev1.VolumeMount{
1✔
1505
                        Name:      ProxyCertVolName,
1✔
1506
                        MountPath: common.ImporterProxyCertDir,
1✔
1507
                }
1✔
1508
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1509
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1510
        }
1✔
1511

1512
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
1513
                container.Env = append(container.Env,
×
1514
                        corev1.EnvVar{
×
1515
                                Name: common.ImporterAccessKeyID,
×
1516
                                ValueFrom: &corev1.EnvVarSource{
×
1517
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1518
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1519
                                                        Name: *regSource.SecretRef,
×
1520
                                                },
×
1521
                                                Key: common.KeyAccess,
×
1522
                                        },
×
1523
                                },
×
1524
                        },
×
1525
                        corev1.EnvVar{
×
1526
                                Name: common.ImporterSecretKey,
×
1527
                                ValueFrom: &corev1.EnvVarSource{
×
1528
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1529
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1530
                                                        Name: *regSource.SecretRef,
×
1531
                                                },
×
1532
                                                Key: common.KeySecret,
×
1533
                                        },
×
UNCOV
1534
                                },
×
UNCOV
1535
                        },
×
UNCOV
1536
                )
×
UNCOV
1537
        }
×
1538

1539
        addEnvVar := func(varName, value string) {
2✔
1540
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1541
        }
1✔
1542

1543
        if insecureTLS {
1✔
UNCOV
1544
                addEnvVar(common.InsecureTLSVar, "true")
×
UNCOV
1545
        }
×
1546

1547
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1548
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1549
                        addEnvVar(varName, value)
1✔
1550
                }
1✔
1551
        }
1552

1553
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1554
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1555
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1556

1✔
1557
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1558
        if err != nil {
1✔
1559
                return err
×
1560
        }
×
1561
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1562
        if err != nil {
1✔
UNCOV
1563
                return err
×
UNCOV
1564
        }
×
1565

1566
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1567
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1568
        podSpec.Containers = []corev1.Container{container}
1✔
1569
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1570
        podSpec.Volumes = volumes
1✔
1571
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1572
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1573
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1574
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1575

1✔
1576
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1577
        // No need for specifid uid/fsgroup here since this doesn't write or use qemu
1✔
1578
        if podSpec.SecurityContext != nil {
2✔
1579
                podSpec.SecurityContext.FSGroup = nil
1✔
1580
        }
1✔
1581
        if podSpec.Containers[0].SecurityContext != nil {
2✔
1582
                podSpec.Containers[0].SecurityContext.RunAsUser = nil
1✔
1583
        }
1✔
1584

1585
        return nil
1✔
1586
}
1587

1588
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1589
        cronJobSpec := &cronJob.Spec
1✔
1590
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1591
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1592
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1593
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1594

1✔
1595
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1596
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1597
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1598
        cc.AddAnnotation(&jobSpec.Template, secv1.RequiredSCCAnnotation, common.RestrictedSCCName)
1✔
1599

1✔
1600
        podSpec := &jobSpec.Template.Spec
1✔
1601
        if err := InitPollerPodSpec(r.client, cron, podSpec, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1602
                return err
×
UNCOV
1603
        }
×
1604
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
UNCOV
1605
                return err
×
UNCOV
1606
        }
×
1607
        return nil
1✔
1608
}
1609

1610
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1611
        job := &batchv1.Job{
1✔
1612
                ObjectMeta: metav1.ObjectMeta{
1✔
1613
                        Name:      GetInitialJobName(cron),
1✔
1614
                        Namespace: cronJob.Namespace,
1✔
1615
                },
1✔
1616
                Spec: cronJob.Spec.JobTemplate.Spec,
1✔
1617
        }
1✔
1618
        if err := r.setJobCommon(cron, job); err != nil {
1✔
UNCOV
1619
                return nil, err
×
UNCOV
1620
        }
×
1621
        return job, nil
1✔
1622
}
1623

1624
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1625
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
UNCOV
1626
                return err
×
UNCOV
1627
        }
×
1628
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1629
        labels := obj.GetLabels()
1✔
1630
        labels[common.DataImportCronNsLabel] = cron.Namespace
1✔
1631
        labels[common.DataImportCronLabel] = cron.Name
1✔
1632
        obj.SetLabels(labels)
1✔
1633
        return nil
1✔
1634
}
1635

1636
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string) *cdiv1.DataVolume {
1✔
1637
        dv := cron.Spec.Template.DeepCopy()
1✔
1638
        if isCronRegistrySource(cron) {
2✔
1639
                var digestedURL string
1✔
1640
                if isURLSource(cron) {
2✔
1641
                        digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1642
                } else if isImageStreamSource(cron) {
3✔
1643
                        // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1644
                        digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1645
                        dv.Spec.Source.Registry.ImageStream = nil
1✔
1646
                }
1✔
1647
                dv.Spec.Source.Registry.URL = &digestedURL
1✔
1648
        }
1649
        dv.Name = dataVolumeName
1✔
1650
        dv.Namespace = cron.Namespace
1✔
1651
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1652
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1653
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1654
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1655

1✔
1656
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
1657
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1658
        }
1✔
1659

1660
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1661

1✔
1662
        return dv
1✔
1663
}
1664

1665
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1666
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1667
        cc.CopyAllowedLabels(cron.GetLabels(), obj, true)
1✔
1668
        labels := obj.GetLabels()
1✔
1669
        labels[common.DataImportCronLabel] = cron.Name
1✔
1670
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
2✔
1671
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1672
        }
1✔
1673
        obj.SetLabels(labels)
1✔
1674
}
1675

1676
func untagDigestedDockerURL(dockerURL string) string {
1✔
1677
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1678
                url := u.Host + u.Path
1✔
1679
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1680
                // Check for tag
1✔
1681
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1682
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
2✔
1683
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1684
                        }
1✔
1685
                }
1686
        }
1687
        return dockerURL
1✔
1688
}
1689

1690
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1691
        if val := cron.Labels[ann]; val != "" {
2✔
1692
                cc.AddLabel(dv, ann, val)
1✔
1693
        }
1✔
1694
}
1695

1696
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1697
        if val := cron.Annotations[ann]; val != "" {
1✔
UNCOV
1698
                cc.AddAnnotation(dv, ann, val)
×
UNCOV
1699
        }
×
1700
}
1701

1702
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1✔
1703
        dataSource := &cdiv1.DataSource{
1✔
1704
                ObjectMeta: metav1.ObjectMeta{
1✔
1705
                        Name:      cron.Spec.ManagedDataSource,
1✔
1706
                        Namespace: cron.Namespace,
1✔
1707
                },
1✔
1708
        }
1✔
1709
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1710
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1711
        cc.CopyAllowedLabels(cron.GetLabels(), dataSource, true)
1✔
1712
        return dataSource
1✔
1713
}
1✔
1714

1715
// Create DataVolume name based on the DataSource name + prefix of the digest
1716
func createDvName(prefix, digest string) (string, error) {
1✔
1717
        digestPrefix := ""
1✔
1718
        if strings.HasPrefix(digest, digestSha256Prefix) {
2✔
1719
                digestPrefix = digestSha256Prefix
1✔
1720
        } else if strings.HasPrefix(digest, digestUIDPrefix) {
3✔
1721
                digestPrefix = digestUIDPrefix
1✔
1722
        } else {
2✔
1723
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1724
        }
1✔
1725
        fromIdx := len(digestPrefix)
1✔
1726
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1727
        if len(digest) < toIdx {
2✔
1728
                return "", errors.Errorf("Digest is too short")
1✔
1729
        }
1✔
1730
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1731
}
1732

1733
// GetCronJobName get CronJob name based on cron name and UID
1734
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1735
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1736
}
1✔
1737

1738
// GetInitialJobName get initial job name based on cron name and UID
1739
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1740
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1741
}
1✔
1742

1743
func getPollerPodName(cron *cdiv1.DataImportCron) string {
1✔
1744
        return naming.GetResourceName("poller-"+cron.Name, string(cron.UID)[:8])
1✔
1745
}
1✔
1746

1747
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1748
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1749
}
1✔
1750

1751
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1752
        dv := &cron.Spec.Template
1✔
1753

1✔
1754
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
UNCOV
1755
                return explicitVolumeMode, nil
×
UNCOV
1756
        }
×
1757

1758
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1759
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1760
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1761
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1762
                        AccessModes:      accessModes,
1✔
1763
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1764
                        Resources: corev1.VolumeResourceRequirements{
1✔
1765
                                Requests: corev1.ResourceList{
1✔
1766
                                        // Doesn't matter
1✔
1767
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1768
                                },
1✔
1769
                        },
1✔
1770
                },
1✔
1771
        }
1✔
1772
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
UNCOV
1773
                return nil, err
×
UNCOV
1774
        }
×
1775

1776
        return inferredPvc.Spec.VolumeMode, nil
1✔
1777
}
1778

1779
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1780
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1781
        if dv.Spec.PVC != nil {
1✔
UNCOV
1782
                return dv.Spec.PVC.VolumeMode
×
UNCOV
1783
        }
×
1784

1785
        if dv.Spec.Storage != nil {
2✔
1786
                return dv.Spec.Storage.VolumeMode
1✔
1787
        }
1✔
1788

UNCOV
1789
        return nil
×
1790
}
1791

1792
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1793
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1794
        if dv.Spec.PVC != nil {
1✔
UNCOV
1795
                return dv.Spec.PVC.AccessModes
×
UNCOV
1796
        }
×
1797

1798
        if dv.Spec.Storage != nil {
2✔
1799
                return dv.Spec.Storage.AccessModes
1✔
1800
        }
1✔
1801

UNCOV
1802
        return nil
×
1803
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc