• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5164

03 Mar 2025 01:51PM UTC coverage: 59.429% (+0.02%) from 59.414%
#5164

push

travis-ci

web-flow
Use new 1.23.6 builder (#3652)

* Use new 1.23.6 builder

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>

* Bump linter to 1.60.3 for go 1.23 support

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>

* Disable linter failures over G115

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>

* Fix lint issues related to error format formatting

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>

* Address remaining lint failures

len is enough/sprintf not really used

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>

---------

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>

8 of 9 new or added lines in 7 files covered. (88.89%)

178 existing lines in 7 files now uncovered.

16810 of 28286 relevant lines covered (59.43%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.97
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        secv1 "github.com/openshift/api/security/v1"
33
        "github.com/pkg/errors"
34
        cronexpr "github.com/robfig/cron/v3"
35

36
        batchv1 "k8s.io/api/batch/v1"
37
        corev1 "k8s.io/api/core/v1"
38
        storagev1 "k8s.io/api/storage/v1"
39
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
40
        "k8s.io/apimachinery/pkg/api/meta"
41
        "k8s.io/apimachinery/pkg/api/resource"
42
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
43
        "k8s.io/apimachinery/pkg/labels"
44
        "k8s.io/apimachinery/pkg/runtime"
45
        "k8s.io/apimachinery/pkg/types"
46
        "k8s.io/client-go/tools/record"
47
        openapicommon "k8s.io/kube-openapi/pkg/common"
48
        "k8s.io/utils/ptr"
49

50
        "sigs.k8s.io/controller-runtime/pkg/client"
51
        "sigs.k8s.io/controller-runtime/pkg/controller"
52
        "sigs.k8s.io/controller-runtime/pkg/event"
53
        "sigs.k8s.io/controller-runtime/pkg/handler"
54
        "sigs.k8s.io/controller-runtime/pkg/manager"
55
        "sigs.k8s.io/controller-runtime/pkg/predicate"
56
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
57
        "sigs.k8s.io/controller-runtime/pkg/source"
58

59
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
60
        "kubevirt.io/containerized-data-importer/pkg/common"
61
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
62
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
63
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
64
        "kubevirt.io/containerized-data-importer/pkg/operator"
65
        "kubevirt.io/containerized-data-importer/pkg/util"
66
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
67
)
68

69
const (
70
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
71
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
72
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
73
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
74
)
75

76
// DataImportCronReconciler members
77
type DataImportCronReconciler struct {
78
        client          client.Client
79
        uncachedClient  client.Client
80
        recorder        record.EventRecorder
81
        scheme          *runtime.Scheme
82
        log             logr.Logger
83
        image           string
84
        pullPolicy      string
85
        cdiNamespace    string
86
        installerLabels map[string]string
87
}
88

89
const (
90
        // AnnSourceDesiredDigest is the digest of the pending updated image
91
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
92
        // AnnImageStreamDockerRef is the ImageStream Docker reference
93
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
94
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
95
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
96
        // AnnLastCronTime is the cron last execution time stamp
97
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
98
        // AnnLastUseTime is the PVC last use time stamp
99
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
100
        // AnnStorageClass is the cron DV's storage class
101
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
102

103
        dataImportControllerName    = "dataimportcron-controller"
104
        digestSha256Prefix          = "sha256:"
105
        digestUIDPrefix             = "uid:"
106
        digestDvNameSuffixLength    = 12
107
        cronJobUIDSuffixLength      = 8
108
        defaultImportsToKeepPerCron = 3
109
)
110

111
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
112

113
// Reconcile loop for DataImportCronReconciler
114
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
115
        dataImportCron := &cdiv1.DataImportCron{}
1✔
116
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
117
                return reconcile.Result{}, err
1✔
118
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
119
                err := r.cleanup(ctx, req.NamespacedName)
1✔
120
                return reconcile.Result{}, err
1✔
121
        }
1✔
122
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
123
        if !shouldReconcile || err != nil {
1✔
124
                return reconcile.Result{}, err
×
UNCOV
125
        }
×
126

127
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
128
                return reconcile.Result{}, err
×
UNCOV
129
        }
×
130

131
        return r.update(ctx, dataImportCron)
1✔
132
}
133

134
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
135
        dataSource := &cdiv1.DataSource{}
1✔
136
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
137
                if k8serrors.IsNotFound(err) {
2✔
138
                        return true, nil
1✔
139
                }
1✔
UNCOV
140
                return false, err
×
141
        }
142
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
143
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
144
                return true, nil
1✔
145
        }
1✔
146
        otherCron := &cdiv1.DataImportCron{}
1✔
147
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
148
                if k8serrors.IsNotFound(err) {
2✔
149
                        return true, nil
1✔
150
                }
1✔
UNCOV
151
                return false, err
×
152
        }
153
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
154
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
155
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
156
                r.log.V(3).Info(msg)
1✔
157
                return false, nil
1✔
158
        }
1✔
UNCOV
159
        return true, nil
×
160
}
161

162
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
163
        if dataImportCron.Spec.Schedule == "" {
2✔
164
                return nil
1✔
165
        }
1✔
166
        if isControllerPolledSource(dataImportCron) {
2✔
167
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
168
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
169
                }
1✔
170
                return nil
1✔
171
        }
172
        if !isURLSource(dataImportCron) {
1✔
173
                return nil
×
UNCOV
174
        }
×
175
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
176
        if exists || err != nil {
2✔
177
                return err
1✔
178
        }
1✔
179
        cronJob, err := r.newCronJob(dataImportCron)
1✔
180
        if err != nil {
1✔
181
                return err
×
UNCOV
182
        }
×
183
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
184
                return err
×
UNCOV
185
        }
×
186
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
187
        if err != nil {
1✔
188
                return err
×
UNCOV
189
        }
×
190
        if err := r.client.Create(ctx, job); err != nil {
1✔
191
                return err
×
UNCOV
192
        }
×
193
        return nil
1✔
194
}
195

196
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
197
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
198
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
UNCOV
199
        }
×
200
        imageStream := &imagev1.ImageStream{}
1✔
201
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
202
        if err != nil {
2✔
203
                return nil, "", err
1✔
204
        }
1✔
205
        imageStreamNamespacedName := types.NamespacedName{
1✔
206
                Namespace: imageStreamNamespace,
1✔
207
                Name:      name,
1✔
208
        }
1✔
209
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
210
                return nil, "", err
1✔
211
        }
1✔
212
        return imageStream, tag, nil
1✔
213
}
214

215
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
216
        if imageStream == nil {
1✔
217
                return "", "", errors.Errorf("No ImageStream")
×
UNCOV
218
        }
×
219
        tags := imageStream.Status.Tags
1✔
220
        if len(tags) == 0 {
1✔
221
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
UNCOV
222
        }
×
223

224
        tagIdx := 0
1✔
225
        if len(imageStreamTag) > 0 {
2✔
226
                tagIdx = -1
1✔
227
                for i, tag := range tags {
2✔
228
                        if tag.Tag == imageStreamTag {
2✔
229
                                tagIdx = i
1✔
230
                                break
1✔
231
                        }
232
                }
233
        }
234
        if tagIdx == -1 {
2✔
235
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
236
        }
1✔
237

238
        if len(tags[tagIdx].Items) == 0 {
2✔
239
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
240
        }
1✔
241
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
242
}
243

244
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
245
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
246
                return imageStreamName, "", nil
1✔
247
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
248
                return subs[0], subs[1], nil
1✔
249
        }
1✔
250
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
251
}
252

253
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
254
        nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]
1✔
255
        if nextTimeStr == "" {
1✔
256
                return r.setNextCronTime(dataImportCron)
×
UNCOV
257
        }
×
258
        nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
259
        if err != nil {
1✔
260
                return reconcile.Result{}, err
×
UNCOV
261
        }
×
262
        if nextTime.After(time.Now()) {
2✔
263
                return r.setNextCronTime(dataImportCron)
1✔
264
        }
1✔
265
        if isImageStreamSource(dataImportCron) {
2✔
266
                if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
267
                        return reconcile.Result{}, err
1✔
268
                }
1✔
269
        } else if isPvcSource(dataImportCron) {
2✔
270
                if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
2✔
271
                        return reconcile.Result{}, err
1✔
272
                }
1✔
273
        }
274
        return r.setNextCronTime(dataImportCron)
1✔
275
}
276

277
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
278
        now := time.Now()
1✔
279
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
280
        if err != nil {
1✔
281
                return reconcile.Result{}, err
×
UNCOV
282
        }
×
283
        nextTime := expr.Next(now)
1✔
284
        requeueAfter := nextTime.Sub(now)
1✔
285
        res := reconcile.Result{Requeue: true, RequeueAfter: requeueAfter}
1✔
286
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
287
        return res, err
1✔
288
}
289

290
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
291
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
292
        return err == nil && regSource.ImageStream != nil
1✔
293
}
1✔
294

295
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
296
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
297
        return err == nil && regSource.URL != nil
1✔
298
}
1✔
299

300
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
301
        if !isCronRegistrySource(cron) {
2✔
302
                return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
1✔
303
        }
1✔
304
        return cron.Spec.Template.Spec.Source.Registry, nil
1✔
305
}
306

307
func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
1✔
308
        source := cron.Spec.Template.Spec.Source
1✔
309
        return source != nil && source.Registry != nil
1✔
310
}
1✔
311

312
func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
1✔
313
        if !isPvcSource(cron) {
1✔
314
                return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
×
UNCOV
315
        }
×
316
        return cron.Spec.Template.Spec.Source.PVC, nil
1✔
317
}
318

319
func isPvcSource(cron *cdiv1.DataImportCron) bool {
1✔
320
        source := cron.Spec.Template.Spec.Source
1✔
321
        return source != nil && source.PVC != nil
1✔
322
}
1✔
323

324
func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
1✔
325
        return isImageStreamSource(cron) || isPvcSource(cron)
1✔
326
}
1✔
327

328
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
329
        res := reconcile.Result{}
1✔
330

1✔
331
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
332
        if err != nil {
1✔
333
                return res, err
×
UNCOV
334
        }
×
335

336
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
337
        imports := dataImportCron.Status.CurrentImports
1✔
338
        importSucceeded := false
1✔
339

1✔
340
        dataVolume := dataImportCron.Spec.Template
1✔
341
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
342
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
343
        if err != nil {
1✔
344
                return res, err
×
UNCOV
345
        }
×
346
        if desiredStorageClass != nil {
2✔
347
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
348
                        return res, err
1✔
349
                }
1✔
350
                currentSc, hasCurrent := dataImportCron.Annotations[AnnStorageClass]
1✔
351
                desiredSc := desiredStorageClass.Name
1✔
352
                if hasCurrent && currentSc != desiredSc {
2✔
353
                        r.log.Info("Storage class changed, delete most recent source on the old sc as it's no longer the desired", "currentSc", currentSc, "desiredSc", desiredSc)
1✔
354
                        if err := r.handleStorageClassChange(ctx, dataImportCron, desiredSc); err != nil {
1✔
355
                                return res, err
×
UNCOV
356
                        }
×
357
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
358
                }
359
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
360
        }
361
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
362
        if err != nil {
1✔
363
                return res, err
×
UNCOV
364
        }
×
365
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
366
        if err != nil {
1✔
367
                return res, err
×
UNCOV
368
        }
×
369

370
        handlePopulatedPvc := func() error {
2✔
371
                if pvc != nil {
2✔
372
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
373
                                return err
×
UNCOV
374
                        }
×
375
                }
376
                importSucceeded = true
1✔
377
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
378
                        return err
×
UNCOV
379
                }
×
380

381
                return nil
1✔
382
        }
383

384
        switch {
1✔
385
        case dv != nil:
1✔
386
                switch dv.Status.Phase {
1✔
387
                case cdiv1.Succeeded:
1✔
388
                        if err := handlePopulatedPvc(); err != nil {
1✔
389
                                return res, err
×
UNCOV
390
                        }
×
391
                case cdiv1.ImportScheduled:
1✔
392
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
393
                case cdiv1.ImportInProgress:
1✔
394
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
395
                default:
1✔
396
                        dvPhase := string(dv.Status.Phase)
1✔
397
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
398
                }
399
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
400
                if err := handlePopulatedPvc(); err != nil {
1✔
401
                        return res, err
×
UNCOV
402
                }
×
403
        case snapshot != nil:
1✔
404
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
405
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
406
                                return res, err
×
407
                        }
×
408
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
UNCOV
409
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
410
                }
411
                // Below k8s 1.29 there's no way to know the source volume mode
412
                // Let's at least expose this info on our own snapshots
413
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
414
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
415
                        if err != nil {
1✔
416
                                return res, err
×
UNCOV
417
                        }
×
418
                        if volMode != nil {
2✔
419
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
420
                        }
1✔
421
                }
422
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
423
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
424
                if err != nil {
2✔
425
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
426
                                return res, err
×
UNCOV
427
                        }
×
428
                } else {
1✔
429
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
430
                }
1✔
431
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
432
                        return res, err
×
UNCOV
433
                }
×
434
                importSucceeded = true
1✔
435
        default:
1✔
436
                if len(imports) > 0 {
2✔
437
                        imports = imports[1:]
1✔
438
                        dataImportCron.Status.CurrentImports = imports
1✔
439
                }
1✔
440
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
441
        }
442

443
        if importSucceeded {
2✔
444
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
445
                        return res, err
×
UNCOV
446
                }
×
447
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
448
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
449
                        return res, err
×
UNCOV
450
                }
×
451
        }
452

453
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
454
                return res, err
×
UNCOV
455
        }
×
456

457
        // Skip if schedule is disabled
458
        if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
459
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
460
                pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
1✔
461
                if err != nil {
2✔
462
                        return pollRes, err
1✔
463
                }
1✔
464
                res = pollRes
1✔
465
        }
466

467
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
468
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
469
        if digestUpdated {
2✔
470
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
471
                if dv != nil {
1✔
472
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
473
                                return res, err
×
UNCOV
474
                        }
×
475
                }
476
                if importSucceeded || len(imports) == 0 {
2✔
477
                        if err := r.createImportDataVolume(ctx, dataImportCron); err != nil {
2✔
478
                                return res, err
1✔
479
                        }
1✔
480
                }
481
        } else if importSucceeded {
2✔
482
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
483
                        return res, err
×
UNCOV
484
                }
×
485
        } else if len(imports) > 0 {
2✔
486
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
487
        } else {
2✔
488
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
489
        }
1✔
490

491
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
492
                return res, err
×
UNCOV
493
        }
×
494

495
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
496
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
497
                        return res, err
×
UNCOV
498
                }
×
499
        }
500
        return res, nil
1✔
501
}
502

503
// Returns the current import DV if exists, and the last imported PVC
504
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
505
        imports := cron.Status.CurrentImports
1✔
506
        if len(imports) == 0 {
2✔
507
                return nil, nil, nil
1✔
508
        }
1✔
509

510
        dvName := imports[0].DataVolumeName
1✔
511
        dv := &cdiv1.DataVolume{}
1✔
512
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
513
                if !k8serrors.IsNotFound(err) {
1✔
514
                        return nil, nil, err
×
UNCOV
515
                }
×
516
                dv = nil
1✔
517
        }
518

519
        pvc := &corev1.PersistentVolumeClaim{}
1✔
520
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
521
                if !k8serrors.IsNotFound(err) {
1✔
522
                        return nil, nil, err
×
UNCOV
523
                }
×
524
                pvc = nil
1✔
525
        }
526
        return dv, pvc, nil
1✔
527
}
528

529
// Returns the current import DV if exists, and the last imported PVC
530
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
531
        imports := cron.Status.CurrentImports
1✔
532
        if len(imports) == 0 {
2✔
533
                return nil, nil
1✔
534
        }
1✔
535

536
        snapName := imports[0].DataVolumeName
1✔
537
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
538
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
539
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
540
                        return nil, err
×
UNCOV
541
                }
×
542
                return nil, nil
1✔
543
        }
544

545
        return snapshot, nil
1✔
546
}
547

548
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
549
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
550
        dataSource := &cdiv1.DataSource{}
1✔
551
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
552
                return nil, err
1✔
553
        }
1✔
554
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
555
                log := r.log.WithName("getCronManagedDataSource")
×
556
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
×
557
                return nil, ErrNotManagedByCron
×
UNCOV
558
        }
×
559
        return dataSource, nil
1✔
560
}
561

562
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
563
        objCopy := obj.DeepCopyObject()
1✔
564
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
565
        r.setDataImportCronResourceLabels(cron, obj)
1✔
566
        if !reflect.DeepEqual(obj, objCopy) {
2✔
567
                if err := r.client.Update(ctx, obj); err != nil {
1✔
568
                        return err
×
UNCOV
569
                }
×
570
        }
571
        return nil
1✔
572
}
573

574
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
575
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
576
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
577
                if cond.Status == corev1.ConditionFalse &&
×
578
                        (cond.Reason == common.GenericError || cond.Reason == ImagePullFailedReason) {
×
579
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
580
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
581
                        dv.Labels[common.DataImportCronLabel] = ""
×
582
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
583
                                return err
×
584
                        }
×
585
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
586
                                return err
×
587
                        }
×
UNCOV
588
                        cron.Status.CurrentImports = nil
×
589
                }
590
        }
UNCOV
591
        return nil
×
592
}
593

594
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
595
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
596
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
597
        if err != nil {
1✔
598
                return err
×
UNCOV
599
        }
×
600
        if regSource.ImageStream == nil {
1✔
601
                return nil
×
UNCOV
602
        }
×
603
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
604
        if err != nil {
2✔
605
                return err
1✔
606
        }
1✔
607
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
608
        if err != nil {
2✔
609
                return err
1✔
610
        }
1✔
611
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
612
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
613
                log.Info("Updating DataImportCron", "digest", digest)
1✔
614
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
615
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
616
        }
1✔
617
        return nil
1✔
618
}
619

620
func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
621
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
622
        pvcSource, err := getCronPvcSource(dataImportCron)
1✔
623
        if err != nil {
1✔
624
                return err
×
UNCOV
625
        }
×
626
        ns := pvcSource.Namespace
1✔
627
        if ns == "" {
2✔
628
                ns = dataImportCron.Namespace
1✔
629
        }
1✔
630
        pvc := &corev1.PersistentVolumeClaim{}
1✔
631
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
2✔
632
                return err
1✔
633
        }
1✔
634
        digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
1✔
635
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
636
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
637
                log.Info("Updating DataImportCron", "digest", digest)
1✔
638
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
639
        }
1✔
640
        return nil
1✔
641
}
642

643
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
644
        log := r.log.WithName("updateDataSource")
1✔
645
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
646
        if err != nil {
2✔
647
                if k8serrors.IsNotFound(err) {
2✔
648
                        dataSource = r.newDataSource(dataImportCron)
1✔
649
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
650
                                return err
×
UNCOV
651
                        }
×
652
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
1✔
653
                } else if errors.Is(err, ErrNotManagedByCron) {
×
654
                        return nil
×
655
                } else {
×
656
                        return err
×
UNCOV
657
                }
×
658
        }
659
        dataSourceCopy := dataSource.DeepCopy()
1✔
660
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
661

1✔
662
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
663
        populateDataSource(format, dataSource, sourcePVC)
1✔
664

1✔
665
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
666
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
667
                        return err
×
UNCOV
668
                }
×
669
        }
670

671
        return nil
1✔
672
}
673

674
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
675
        if sourcePVC == nil {
2✔
676
                return
1✔
677
        }
1✔
678

679
        switch format {
1✔
680
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
681
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
682
                        PVC: sourcePVC,
1✔
683
                }
1✔
684
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
685
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
686
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
687
                                Namespace: sourcePVC.Namespace,
1✔
688
                                Name:      sourcePVC.Name,
1✔
689
                        },
1✔
690
                }
1✔
691
        }
692
}
693

694
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
695
        if dataImportCron.Status.CurrentImports == nil {
1✔
696
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
UNCOV
697
        }
×
698
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
699
                Namespace: dataImportCron.Namespace,
1✔
700
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
701
        }
1✔
702
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
703
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
704
                now := metav1.Now()
1✔
705
                dataImportCron.Status.LastImportTimestamp = &now
1✔
706
        }
1✔
707
        return nil
1✔
708
}
709

710
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
711
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
712
        if lastTimeStr == "" {
2✔
713
                return nil
1✔
714
        }
1✔
715
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
716
        if err != nil {
1✔
717
                return err
×
UNCOV
718
        }
×
719
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
720
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
721
        }
1✔
722
        return nil
1✔
723
}
724

725
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
726
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
727
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
728
        if digest == "" {
1✔
729
                return nil
×
UNCOV
730
        }
×
731
        dvName, err := createDvName(dataSourceName, digest)
1✔
732
        if err != nil {
2✔
733
                return err
1✔
734
        }
1✔
735
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
736

1✔
737
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
738
        for _, src := range sources {
2✔
739
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
740
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
741
                                return err
×
UNCOV
742
                        }
×
743
                } else {
1✔
744
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
745
                                return err
×
UNCOV
746
                        }
×
747
                        // If source exists don't create DV
748
                        return nil
1✔
749
                }
750
        }
751

752
        dv := r.newSourceDataVolume(dataImportCron, dvName)
1✔
753
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
754
                return err
×
UNCOV
755
        }
×
756

757
        return nil
1✔
758
}
759

760
func (r *DataImportCronReconciler) handleStorageClassChange(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
1✔
761
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
762
        if !ok {
1✔
763
                // nothing to delete
×
764
                return nil
×
UNCOV
765
        }
×
766
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
767
        if err != nil {
1✔
768
                return err
×
UNCOV
769
        }
×
770
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
1✔
771
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
1✔
772
        for _, src := range sources {
2✔
773
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
774
                        return err
×
UNCOV
775
                }
×
776
        }
777
        for _, src := range sources {
2✔
778
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
1✔
779
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
×
UNCOV
780
                }
×
781
        }
782
        // Only update desired storage class once garbage collection went through
783
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
1✔
784
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
785
        if err != nil {
1✔
786
                return err
×
UNCOV
787
        }
×
788

789
        return nil
1✔
790
}
791

792
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
793
        switch format {
1✔
794
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
795
                return nil
1✔
796
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
797
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
798
        default:
×
UNCOV
799
                return fmt.Errorf("unknown source format for snapshot")
×
800
        }
801
}
802

803
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
804
        if pvc == nil {
1✔
805
                return nil
×
UNCOV
806
        }
×
807
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
808
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
809
                return nil
1✔
810
        }
1✔
811
        storageProfile := &cdiv1.StorageProfile{}
1✔
812
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
813
                return err
×
UNCOV
814
        }
×
815
        className, err := cc.GetSnapshotClassForSmartClone(pvc, &desiredStorageClass.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
816
        if err != nil {
1✔
817
                return err
×
UNCOV
818
        }
×
819
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
820
                ObjectMeta: metav1.ObjectMeta{
1✔
821
                        Name:      pvc.Name,
1✔
822
                        Namespace: dataImportCron.Namespace,
1✔
823
                        Labels: map[string]string{
1✔
824
                                common.CDILabelKey:       common.CDILabelValue,
1✔
825
                                common.CDIComponentLabel: "",
1✔
826
                        },
1✔
827
                },
1✔
828
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
829
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
830
                                PersistentVolumeClaimName: &pvc.Name,
1✔
831
                        },
1✔
832
                        VolumeSnapshotClassName: &className,
1✔
833
                },
1✔
834
        }
1✔
835
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
836
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
1✔
837

1✔
838
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
839
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
840
                if !k8serrors.IsNotFound(err) {
1✔
841
                        return err
×
UNCOV
842
                }
×
843
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
844
                if pvc.Spec.VolumeMode != nil {
2✔
845
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
846
                }
1✔
847
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
848
                        return err
×
UNCOV
849
                }
×
850
        } else {
1✔
851
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
852
                        // Clean up DV/PVC as they are not needed anymore
1✔
853
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
854
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
855
                                return err
×
UNCOV
856
                        }
×
857
                }
858
        }
859

860
        return nil
1✔
861
}
862

863
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
864
        dataImportCron.Status.SourceFormat = &format
1✔
865

1✔
866
        switch format {
1✔
867
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
868
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
869
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
870
                if snapshot == nil {
2✔
871
                        // Snapshot create/update will trigger reconcile
1✔
872
                        return nil
1✔
873
                }
1✔
874
                if cc.IsSnapshotReady(snapshot) {
2✔
875
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
876
                } else {
2✔
877
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
878
                }
1✔
879
        default:
×
UNCOV
880
                return fmt.Errorf("unknown source format for snapshot")
×
881
        }
882

883
        return nil
1✔
884
}
885

886
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
887
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
888
        if desiredStorageClass == nil {
2✔
889
                return format, nil
1✔
890
        }
1✔
891

892
        storageProfile := &cdiv1.StorageProfile{}
1✔
893
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
894
                return format, err
×
UNCOV
895
        }
×
896
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
897
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
898
        }
1✔
899

900
        return format, nil
1✔
901
}
902

903
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
904
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
905
                return nil
×
UNCOV
906
        }
×
907
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
908
        if err != nil {
1✔
909
                return err
×
UNCOV
910
        }
×
911

912
        maxImports := defaultImportsToKeepPerCron
1✔
913

1✔
914
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
915
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
916
        }
1✔
917

918
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
919
                return err
×
UNCOV
920
        }
×
921
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
922
                return err
×
UNCOV
923
        }
×
924

925
        return nil
1✔
926
}
927

928
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
929
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
930

1✔
931
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
932
                return err
×
UNCOV
933
        }
×
934
        if len(pvcList.Items) > maxImports {
2✔
935
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
936
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
937
                })
1✔
938
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
939
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
940
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
941
                                return err
×
UNCOV
942
                        }
×
943
                }
944
        }
945

946
        dvList := &cdiv1.DataVolumeList{}
1✔
947
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
948
                return err
×
UNCOV
949
        }
×
950

951
        if len(dvList.Items) > maxImports {
2✔
952
                for _, dv := range dvList.Items {
2✔
953
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
954
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
955
                                return err
×
UNCOV
956
                        }
×
957

958
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
959
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
960
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
961
                                        return err
×
UNCOV
962
                                }
×
963
                        }
964
                }
965
        }
966

967
        return nil
1✔
968
}
969

970
// deleteDvPvc deletes DV or PVC if DV was GCed
971
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
972
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
973
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
974
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
975
                return err
1✔
976
        }
1✔
977
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
978
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
979
                return err
×
UNCOV
980
        }
×
981
        return nil
1✔
982
}
983

984
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
985
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
986

1✔
987
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
988
                if meta.IsNoMatchError(err) {
×
989
                        return nil
×
990
                }
×
UNCOV
991
                return err
×
992
        }
993
        if len(snapList.Items) > maxImports {
1✔
994
                sort.Slice(snapList.Items, func(i, j int) bool {
×
995
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
996
                })
×
997
                for _, snap := range snapList.Items[maxImports:] {
×
998
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
999
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
1000
                                return err
×
UNCOV
1001
                        }
×
1002
                }
1003
        }
1004

1005
        return nil
1✔
1006
}
1007

1008
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
1009
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
1010
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
1011

1✔
1012
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
1013
                return err
×
UNCOV
1014
        }
×
1015
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
1016
        if err != nil {
1✔
1017
                return err
×
UNCOV
1018
        }
×
1019
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
1020
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
1021
                return err
×
UNCOV
1022
        }
×
1023
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
1024
                return err
×
UNCOV
1025
        }
×
1026
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
1027
                return err
×
UNCOV
1028
        }
×
1029
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
1030
                return err
×
UNCOV
1031
        }
×
1032
        return nil
1✔
1033
}
1034

1035
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
1036
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
1037
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
1038
        if err != nil {
1✔
1039
                return err
×
UNCOV
1040
        }
×
1041
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
1042
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
1043
                return err
×
UNCOV
1044
        }
×
1045
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
1046
                return err
×
UNCOV
1047
        }
×
1048

1049
        return nil
1✔
1050
}
1051

1052
// NewDataImportCronController creates a new instance of the DataImportCron controller
1053
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
1054
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
1055
                Scheme: mgr.GetScheme(),
×
1056
                Mapper: mgr.GetRESTMapper(),
×
1057
        })
×
1058
        if err != nil {
×
1059
                return nil, err
×
1060
        }
×
1061
        reconciler := &DataImportCronReconciler{
×
1062
                client:          mgr.GetClient(),
×
1063
                uncachedClient:  uncachedClient,
×
1064
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1065
                scheme:          mgr.GetScheme(),
×
1066
                log:             log.WithName(dataImportControllerName),
×
1067
                image:           importerImage,
×
1068
                pullPolicy:      pullPolicy,
×
1069
                cdiNamespace:    util.GetNamespace(),
×
1070
                installerLabels: installerLabels,
×
1071
        }
×
1072
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1073
                MaxConcurrentReconciles: 3,
×
1074
                Reconciler:              reconciler,
×
1075
        })
×
1076
        if err != nil {
×
1077
                return nil, err
×
1078
        }
×
1079
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
1080
                return nil, err
×
1081
        }
×
1082
        log.Info("Initialized DataImportCron controller")
×
UNCOV
1083
        return dataImportCronController, nil
×
1084
}
1085

1086
func getCronName(obj client.Object) string {
×
1087
        return obj.GetLabels()[common.DataImportCronLabel]
×
UNCOV
1088
}
×
1089

1090
func getCronNs(obj client.Object) string {
×
1091
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
UNCOV
1092
}
×
1093

1094
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
1095
        if cronName := getCronName(obj); cronName != "" {
×
1096
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
1097
        }
×
UNCOV
1098
        return nil
×
1099
}
1100

1101
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1102
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1103
                return err
×
UNCOV
1104
        }
×
1105

1106
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1107
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1108
                // Otherwise we risk losing the storage profile event
×
1109
                var crons cdiv1.DataImportCronList
×
1110
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
1111
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
1112
                        return nil
×
UNCOV
1113
                }
×
1114
                // Storage profiles are 1:1 to storage classes
1115
                scName := obj.GetName()
×
1116
                var reqs []reconcile.Request
×
1117
                for _, cron := range crons.Items {
×
1118
                        dataVolume := cron.Spec.Template
×
1119
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1120
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1121
                        if err != nil || templateSc == nil {
×
1122
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1123
                                return reqs
×
1124
                        }
×
1125
                        if templateSc.Name == scName {
×
1126
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
UNCOV
1127
                        }
×
1128
                }
UNCOV
1129
                return reqs
×
1130
        }
1131

1132
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1133
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1134
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1135
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1136
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
UNCOV
1137
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1138
                },
1139
        )); err != nil {
×
1140
                return err
×
UNCOV
1141
        }
×
1142

1143
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1144
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1145
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1146
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1147
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
UNCOV
1148
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1149
                },
1150
        )); err != nil {
×
1151
                return err
×
UNCOV
1152
        }
×
1153

1154
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1155
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1156
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1157
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1158
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
UNCOV
1159
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1160
                },
1161
        )); err != nil {
×
1162
                return err
×
UNCOV
1163
        }
×
1164

1165
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1166
                return err
×
UNCOV
1167
        }
×
1168

1169
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1170
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1171
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
1172
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1173
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1174
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1175
                                return e.ObjectOld.Status.DataImportCronSourceFormat != e.ObjectNew.Status.DataImportCronSourceFormat
×
UNCOV
1176
                        },
×
1177
                },
1178
        )); err != nil {
×
1179
                return err
×
UNCOV
1180
        }
×
1181

1182
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1183
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
UNCOV
1184
        }
×
1185

1186
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1187
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1188
                predicate.TypedFuncs[*batchv1.CronJob]{
×
1189
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1190
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1191
                        },
×
1192
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
UNCOV
1193
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1194
                },
1195
        )); err != nil {
×
1196
                return err
×
UNCOV
1197
        }
×
1198

1199
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1200
                if meta.IsNoMatchError(err) {
×
1201
                        // Back out if there's no point to attempt watch
×
1202
                        return nil
×
1203
                }
×
1204
                if !cc.IsErrCacheNotStarted(err) {
×
1205
                        return err
×
UNCOV
1206
                }
×
1207
        }
1208
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1209
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
1210
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1211
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1212
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
UNCOV
1213
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1214
                },
1215
        )); err != nil {
×
1216
                return err
×
UNCOV
1217
        }
×
1218

UNCOV
1219
        return nil
×
1220
}
1221

1222
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1223
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1224
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1225
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
1226
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1227
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1228
                                log.Info("Update", "sc", obj.GetName(),
×
1229
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1230
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1231
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
×
1232
                                if err != nil {
×
1233
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
1234
                                }
×
UNCOV
1235
                                return reqs
×
1236
                        },
1237
                ),
1238
                predicate.TypedFuncs[*storagev1.StorageClass]{
1239
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1240
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1241
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1242
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1243
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
UNCOV
1244
                        },
×
1245
                },
1246
        )); err != nil {
×
1247
                return err
×
UNCOV
1248
        }
×
1249

UNCOV
1250
        return nil
×
1251
}
1252

1253
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1254
        dicList := &cdiv1.DataImportCronList{}
×
1255
        if err := c.List(ctx, dicList); err != nil {
×
1256
                return nil, err
×
1257
        }
×
1258
        reqs := []reconcile.Request{}
×
1259
        for _, dic := range dicList.Items {
×
1260
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
UNCOV
1261
                        continue
×
1262
                }
1263

UNCOV
1264
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1265
        }
1266

UNCOV
1267
        return reqs, nil
×
1268
}
1269

1270
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
1✔
1271
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1272
                return false, nil
1✔
1273
        }
1✔
1274

1275
        sc := pvc.Spec.StorageClassName
1✔
1276
        if sc == nil || *sc == desiredStorageClass {
2✔
1277
                return false, nil
1✔
1278
        }
1✔
1279

1280
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1✔
1281
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
1282
                return false, err
×
UNCOV
1283
        }
×
1284

1285
        return true, nil
1✔
1286
}
1287

1288
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1289
        cronJob := &batchv1.CronJob{}
1✔
1290
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1291
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1292
                return false, cc.IgnoreNotFound(err)
1✔
1293
        }
1✔
1294

1295
        cronJobCopy := cronJob.DeepCopy()
1✔
1296
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1297
                return false, err
×
UNCOV
1298
        }
×
1299

1300
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1301
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1302
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
1303
                        return false, cc.IgnoreNotFound(err)
×
UNCOV
1304
                }
×
1305
        }
1306
        return true, nil
1✔
1307
}
1308

1309
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1310
        cronJob := &batchv1.CronJob{
1✔
1311
                ObjectMeta: metav1.ObjectMeta{
1✔
1312
                        Name:      GetCronJobName(cron),
1✔
1313
                        Namespace: r.cdiNamespace,
1✔
1314
                },
1✔
1315
        }
1✔
1316
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
1317
                return nil, err
×
UNCOV
1318
        }
×
1319
        return cronJob, nil
1✔
1320
}
1321

1322
// InitPollerPodSpec inits poller PodSpec
1323
func InitPollerPodSpec(c client.Client, cron *cdiv1.DataImportCron, podSpec *corev1.PodSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1324
        regSource, err := getCronRegistrySource(cron)
1✔
1325
        if err != nil {
1✔
1326
                return err
×
UNCOV
1327
        }
×
1328
        if regSource.URL == nil {
1✔
1329
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
UNCOV
1330
        }
×
1331
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1332
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
1333
                return err
×
UNCOV
1334
        }
×
1335
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1336
        if err != nil {
1✔
1337
                return err
×
UNCOV
1338
        }
×
1339
        container := corev1.Container{
1✔
1340
                Name:  "cdi-source-update-poller",
1✔
1341
                Image: image,
1✔
1342
                Command: []string{
1✔
1343
                        "/usr/bin/cdi-source-update-poller",
1✔
1344
                        "-ns", cron.Namespace,
1✔
1345
                        "-cron", cron.Name,
1✔
1346
                        "-url", *regSource.URL,
1✔
1347
                },
1✔
1348
                ImagePullPolicy:          pullPolicy,
1✔
1349
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1350
                TerminationMessagePolicy: corev1.TerminationMessageReadFile,
1✔
1351
        }
1✔
1352

1✔
1353
        var volumes []corev1.Volume
1✔
1354
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1355
        if hasCertConfigMap {
1✔
1356
                vm := corev1.VolumeMount{
×
1357
                        Name:      CertVolName,
×
1358
                        MountPath: common.ImporterCertDir,
×
1359
                }
×
1360
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1361
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
1362
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
UNCOV
1363
        }
×
1364

1365
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1366
                vm := corev1.VolumeMount{
1✔
1367
                        Name:      ProxyCertVolName,
1✔
1368
                        MountPath: common.ImporterProxyCertDir,
1✔
1369
                }
1✔
1370
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1371
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1372
        }
1✔
1373

1374
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
1375
                container.Env = append(container.Env,
×
1376
                        corev1.EnvVar{
×
1377
                                Name: common.ImporterAccessKeyID,
×
1378
                                ValueFrom: &corev1.EnvVarSource{
×
1379
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1380
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1381
                                                        Name: *regSource.SecretRef,
×
1382
                                                },
×
1383
                                                Key: common.KeyAccess,
×
1384
                                        },
×
1385
                                },
×
1386
                        },
×
1387
                        corev1.EnvVar{
×
1388
                                Name: common.ImporterSecretKey,
×
1389
                                ValueFrom: &corev1.EnvVarSource{
×
1390
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1391
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1392
                                                        Name: *regSource.SecretRef,
×
1393
                                                },
×
1394
                                                Key: common.KeySecret,
×
1395
                                        },
×
1396
                                },
×
1397
                        },
×
1398
                )
×
UNCOV
1399
        }
×
1400

1401
        addEnvVar := func(varName, value string) {
2✔
1402
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1403
        }
1✔
1404

1405
        if insecureTLS {
1✔
1406
                addEnvVar(common.InsecureTLSVar, "true")
×
UNCOV
1407
        }
×
1408

1409
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1410
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1411
                        addEnvVar(varName, value)
1✔
1412
                }
1✔
1413
        }
1414

1415
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1416
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1417
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1418

1✔
1419
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1420
        if err != nil {
1✔
1421
                return err
×
UNCOV
1422
        }
×
1423
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1424
        if err != nil {
1✔
1425
                return err
×
UNCOV
1426
        }
×
1427

1428
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1429
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1430
        podSpec.Containers = []corev1.Container{container}
1✔
1431
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1432
        podSpec.Volumes = volumes
1✔
1433
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1434
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1435
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1436
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1437

1✔
1438
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1439
        // No need for specifid uid/fsgroup here since this doesn't write or use qemu
1✔
1440
        if podSpec.SecurityContext != nil {
2✔
1441
                podSpec.SecurityContext.FSGroup = nil
1✔
1442
        }
1✔
1443
        if podSpec.Containers[0].SecurityContext != nil {
2✔
1444
                podSpec.Containers[0].SecurityContext.RunAsUser = nil
1✔
1445
        }
1✔
1446

1447
        return nil
1✔
1448
}
1449

1450
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1451
        cronJobSpec := &cronJob.Spec
1✔
1452
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1453
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1454
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1455
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1456

1✔
1457
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1458
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1459
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1460
        cc.AddAnnotation(&jobSpec.Template, secv1.RequiredSCCAnnotation, common.RestrictedSCCName)
1✔
1461

1✔
1462
        podSpec := &jobSpec.Template.Spec
1✔
1463
        if err := InitPollerPodSpec(r.client, cron, podSpec, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
UNCOV
1464
                return err
×
UNCOV
1465
        }
×
1466
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
UNCOV
1467
                return err
×
UNCOV
1468
        }
×
1469
        return nil
1✔
1470
}
1471

1472
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1473
        job := &batchv1.Job{
1✔
1474
                ObjectMeta: metav1.ObjectMeta{
1✔
1475
                        Name:      GetInitialJobName(cron),
1✔
1476
                        Namespace: cronJob.Namespace,
1✔
1477
                },
1✔
1478
                Spec: cronJob.Spec.JobTemplate.Spec,
1✔
1479
        }
1✔
1480
        if err := r.setJobCommon(cron, job); err != nil {
1✔
UNCOV
1481
                return nil, err
×
UNCOV
1482
        }
×
1483
        return job, nil
1✔
1484
}
1485

1486
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1487
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
UNCOV
1488
                return err
×
UNCOV
1489
        }
×
1490
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1491
        labels := obj.GetLabels()
1✔
1492
        labels[common.DataImportCronNsLabel] = cron.Namespace
1✔
1493
        labels[common.DataImportCronLabel] = cron.Name
1✔
1494
        obj.SetLabels(labels)
1✔
1495
        return nil
1✔
1496
}
1497

1498
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string) *cdiv1.DataVolume {
1✔
1499
        dv := cron.Spec.Template.DeepCopy()
1✔
1500
        if isCronRegistrySource(cron) {
2✔
1501
                var digestedURL string
1✔
1502
                if isURLSource(cron) {
2✔
1503
                        digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1504
                } else if isImageStreamSource(cron) {
3✔
1505
                        // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1506
                        digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1507
                        dv.Spec.Source.Registry.ImageStream = nil
1✔
1508
                }
1✔
1509
                dv.Spec.Source.Registry.URL = &digestedURL
1✔
1510
        }
1511
        dv.Name = dataVolumeName
1✔
1512
        dv.Namespace = cron.Namespace
1✔
1513
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1514
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1515
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1516
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1517

1✔
1518
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
1519
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1520
        }
1✔
1521

1522
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1523

1✔
1524
        return dv
1✔
1525
}
1526

1527
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1528
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1529
        labels := obj.GetLabels()
1✔
1530
        labels[common.DataImportCronLabel] = cron.Name
1✔
1531
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
2✔
1532
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1533
        }
1✔
1534
        obj.SetLabels(labels)
1✔
1535
}
1536

1537
func untagDigestedDockerURL(dockerURL string) string {
1✔
1538
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1539
                url := u.Host + u.Path
1✔
1540
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1541
                // Check for tag
1✔
1542
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1543
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
2✔
1544
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1545
                        }
1✔
1546
                }
1547
        }
1548
        return dockerURL
1✔
1549
}
1550

1551
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1552
        if val := cron.Labels[ann]; val != "" {
2✔
1553
                cc.AddLabel(dv, ann, val)
1✔
1554
        }
1✔
1555
}
1556

1557
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1558
        if val := cron.Annotations[ann]; val != "" {
1✔
UNCOV
1559
                cc.AddAnnotation(dv, ann, val)
×
UNCOV
1560
        }
×
1561
}
1562

1563
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1✔
1564
        dataSource := &cdiv1.DataSource{
1✔
1565
                ObjectMeta: metav1.ObjectMeta{
1✔
1566
                        Name:      cron.Spec.ManagedDataSource,
1✔
1567
                        Namespace: cron.Namespace,
1✔
1568
                },
1✔
1569
        }
1✔
1570
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1571
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1572
        return dataSource
1✔
1573
}
1✔
1574

1575
// Create DataVolume name based on the DataSource name + prefix of the digest
1576
func createDvName(prefix, digest string) (string, error) {
1✔
1577
        digestPrefix := ""
1✔
1578
        if strings.HasPrefix(digest, digestSha256Prefix) {
2✔
1579
                digestPrefix = digestSha256Prefix
1✔
1580
        } else if strings.HasPrefix(digest, digestUIDPrefix) {
3✔
1581
                digestPrefix = digestUIDPrefix
1✔
1582
        } else {
2✔
1583
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1584
        }
1✔
1585
        fromIdx := len(digestPrefix)
1✔
1586
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1587
        if len(digest) < toIdx {
2✔
1588
                return "", errors.Errorf("Digest is too short")
1✔
1589
        }
1✔
1590
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1591
}
1592

1593
// GetCronJobName get CronJob name based on cron name and UID
1594
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1595
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1596
}
1✔
1597

1598
// GetInitialJobName get initial job name based on cron name and UID
1599
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1600
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1601
}
1✔
1602

1603
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1604
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1605
}
1✔
1606

1607
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1608
        dv := &cron.Spec.Template
1✔
1609

1✔
1610
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
UNCOV
1611
                return explicitVolumeMode, nil
×
UNCOV
1612
        }
×
1613

1614
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1615
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1616
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1617
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1618
                        AccessModes:      accessModes,
1✔
1619
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1620
                        Resources: corev1.VolumeResourceRequirements{
1✔
1621
                                Requests: corev1.ResourceList{
1✔
1622
                                        // Doesn't matter
1✔
1623
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1624
                                },
1✔
1625
                        },
1✔
1626
                },
1✔
1627
        }
1✔
1628
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1629
                return nil, err
×
1630
        }
×
1631

1632
        return inferredPvc.Spec.VolumeMode, nil
1✔
1633
}
1634

1635
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1636
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1637
        if dv.Spec.PVC != nil {
1✔
UNCOV
1638
                return dv.Spec.PVC.VolumeMode
×
UNCOV
1639
        }
×
1640

1641
        if dv.Spec.Storage != nil {
2✔
1642
                return dv.Spec.Storage.VolumeMode
1✔
1643
        }
1✔
1644

UNCOV
1645
        return nil
×
1646
}
1647

1648
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1649
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1650
        if dv.Spec.PVC != nil {
1✔
UNCOV
1651
                return dv.Spec.PVC.AccessModes
×
UNCOV
1652
        }
×
1653

1654
        if dv.Spec.Storage != nil {
2✔
1655
                return dv.Spec.Storage.AccessModes
1✔
1656
        }
1✔
1657

UNCOV
1658
        return nil
×
1659
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc