• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5118

04 Feb 2025 12:01PM UTC coverage: 59.453% (+0.1%) from 59.353%
#5118

push

travis-ci

web-flow
Run go run ./robots/cmd/uploader -workspace /home/prow/go/src/github.com/kubevirt/project-infra/../containerized-data-importer/WORKSPACE -dry-run=false (#3626)

Signed-off-by: kubevirt-bot <kubevirtbot@redhat.com>

16802 of 28261 relevant lines covered (59.45%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.77
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        "github.com/pkg/errors"
33
        cronexpr "github.com/robfig/cron/v3"
34

35
        batchv1 "k8s.io/api/batch/v1"
36
        corev1 "k8s.io/api/core/v1"
37
        storagev1 "k8s.io/api/storage/v1"
38
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
39
        "k8s.io/apimachinery/pkg/api/meta"
40
        "k8s.io/apimachinery/pkg/api/resource"
41
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
42
        "k8s.io/apimachinery/pkg/labels"
43
        "k8s.io/apimachinery/pkg/runtime"
44
        "k8s.io/apimachinery/pkg/types"
45
        "k8s.io/client-go/tools/record"
46
        openapicommon "k8s.io/kube-openapi/pkg/common"
47
        "k8s.io/utils/ptr"
48

49
        "sigs.k8s.io/controller-runtime/pkg/client"
50
        "sigs.k8s.io/controller-runtime/pkg/controller"
51
        "sigs.k8s.io/controller-runtime/pkg/event"
52
        "sigs.k8s.io/controller-runtime/pkg/handler"
53
        "sigs.k8s.io/controller-runtime/pkg/manager"
54
        "sigs.k8s.io/controller-runtime/pkg/predicate"
55
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
56
        "sigs.k8s.io/controller-runtime/pkg/source"
57

58
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
59
        "kubevirt.io/containerized-data-importer/pkg/common"
60
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
61
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
62
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
63
        "kubevirt.io/containerized-data-importer/pkg/operator"
64
        "kubevirt.io/containerized-data-importer/pkg/util"
65
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
66
)
67

68
const (
69
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
70
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
71
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
72
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
73
)
74

75
// DataImportCronReconciler members
76
type DataImportCronReconciler struct {
77
        client          client.Client
78
        uncachedClient  client.Client
79
        recorder        record.EventRecorder
80
        scheme          *runtime.Scheme
81
        log             logr.Logger
82
        image           string
83
        pullPolicy      string
84
        cdiNamespace    string
85
        installerLabels map[string]string
86
}
87

88
const (
89
        // AnnSourceDesiredDigest is the digest of the pending updated image
90
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
91
        // AnnImageStreamDockerRef is the ImageStream Docker reference
92
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
93
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
94
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
95
        // AnnLastCronTime is the cron last execution time stamp
96
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
97
        // AnnLastUseTime is the PVC last use time stamp
98
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
99
        // AnnStorageClass is the cron DV's storage class
100
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
101

102
        dataImportControllerName    = "dataimportcron-controller"
103
        digestSha256Prefix          = "sha256:"
104
        digestUIDPrefix             = "uid:"
105
        digestDvNameSuffixLength    = 12
106
        cronJobUIDSuffixLength      = 8
107
        defaultImportsToKeepPerCron = 3
108
)
109

110
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
111

112
// Reconcile loop for DataImportCronReconciler
113
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
114
        dataImportCron := &cdiv1.DataImportCron{}
1✔
115
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
116
                return reconcile.Result{}, err
1✔
117
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
118
                err := r.cleanup(ctx, req.NamespacedName)
1✔
119
                return reconcile.Result{}, err
1✔
120
        }
1✔
121
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
122
        if !shouldReconcile || err != nil {
1✔
123
                return reconcile.Result{}, err
×
124
        }
×
125

126
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
127
                return reconcile.Result{}, err
×
128
        }
×
129

130
        return r.update(ctx, dataImportCron)
1✔
131
}
132

133
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
134
        dataSource := &cdiv1.DataSource{}
1✔
135
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
136
                if k8serrors.IsNotFound(err) {
2✔
137
                        return true, nil
1✔
138
                }
1✔
139
                return false, err
×
140
        }
141
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
142
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
143
                return true, nil
1✔
144
        }
1✔
145
        otherCron := &cdiv1.DataImportCron{}
1✔
146
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
147
                if k8serrors.IsNotFound(err) {
2✔
148
                        return true, nil
1✔
149
                }
1✔
150
                return false, err
×
151
        }
152
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
153
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
154
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
155
                r.log.V(3).Info(msg)
1✔
156
                return false, nil
1✔
157
        }
1✔
158
        return true, nil
×
159
}
160

161
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
162
        if dataImportCron.Spec.Schedule == "" {
2✔
163
                return nil
1✔
164
        }
1✔
165
        if isControllerPolledSource(dataImportCron) {
2✔
166
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
167
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
168
                }
1✔
169
                return nil
1✔
170
        }
171
        if !isURLSource(dataImportCron) {
1✔
172
                return nil
×
173
        }
×
174
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
175
        if exists || err != nil {
2✔
176
                return err
1✔
177
        }
1✔
178
        cronJob, err := r.newCronJob(dataImportCron)
1✔
179
        if err != nil {
1✔
180
                return err
×
181
        }
×
182
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
183
                return err
×
184
        }
×
185
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
186
        if err != nil {
1✔
187
                return err
×
188
        }
×
189
        if err := r.client.Create(ctx, job); err != nil {
1✔
190
                return err
×
191
        }
×
192
        return nil
1✔
193
}
194

195
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
196
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
197
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
198
        }
×
199
        imageStream := &imagev1.ImageStream{}
1✔
200
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
201
        if err != nil {
2✔
202
                return nil, "", err
1✔
203
        }
1✔
204
        imageStreamNamespacedName := types.NamespacedName{
1✔
205
                Namespace: imageStreamNamespace,
1✔
206
                Name:      name,
1✔
207
        }
1✔
208
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
209
                return nil, "", err
1✔
210
        }
1✔
211
        return imageStream, tag, nil
1✔
212
}
213

214
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
215
        if imageStream == nil {
1✔
216
                return "", "", errors.Errorf("No ImageStream")
×
217
        }
×
218
        tags := imageStream.Status.Tags
1✔
219
        if len(tags) == 0 {
1✔
220
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
221
        }
×
222

223
        tagIdx := 0
1✔
224
        if len(imageStreamTag) > 0 {
2✔
225
                tagIdx = -1
1✔
226
                for i, tag := range tags {
2✔
227
                        if tag.Tag == imageStreamTag {
2✔
228
                                tagIdx = i
1✔
229
                                break
1✔
230
                        }
231
                }
232
        }
233
        if tagIdx == -1 {
2✔
234
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
235
        }
1✔
236

237
        if len(tags[tagIdx].Items) == 0 {
2✔
238
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
239
        }
1✔
240
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
241
}
242

243
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
244
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
245
                return imageStreamName, "", nil
1✔
246
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
247
                return subs[0], subs[1], nil
1✔
248
        }
1✔
249
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
250
}
251

252
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
253
        nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]
1✔
254
        if nextTimeStr == "" {
1✔
255
                return r.setNextCronTime(dataImportCron)
×
256
        }
×
257
        nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
258
        if err != nil {
1✔
259
                return reconcile.Result{}, err
×
260
        }
×
261
        if nextTime.After(time.Now()) {
2✔
262
                return r.setNextCronTime(dataImportCron)
1✔
263
        }
1✔
264
        if isImageStreamSource(dataImportCron) {
2✔
265
                if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
266
                        return reconcile.Result{}, err
1✔
267
                }
1✔
268
        } else if isPvcSource(dataImportCron) {
2✔
269
                if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
2✔
270
                        return reconcile.Result{}, err
1✔
271
                }
1✔
272
        }
273
        return r.setNextCronTime(dataImportCron)
1✔
274
}
275

276
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
277
        now := time.Now()
1✔
278
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
279
        if err != nil {
1✔
280
                return reconcile.Result{}, err
×
281
        }
×
282
        nextTime := expr.Next(now)
1✔
283
        requeueAfter := nextTime.Sub(now)
1✔
284
        res := reconcile.Result{Requeue: true, RequeueAfter: requeueAfter}
1✔
285
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
286
        return res, err
1✔
287
}
288

289
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
290
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
291
        return err == nil && regSource.ImageStream != nil
1✔
292
}
1✔
293

294
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
295
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
296
        return err == nil && regSource.URL != nil
1✔
297
}
1✔
298

299
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
300
        if !isCronRegistrySource(cron) {
2✔
301
                return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
1✔
302
        }
1✔
303
        return cron.Spec.Template.Spec.Source.Registry, nil
1✔
304
}
305

306
func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
1✔
307
        source := cron.Spec.Template.Spec.Source
1✔
308
        return source != nil && source.Registry != nil
1✔
309
}
1✔
310

311
func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
1✔
312
        if !isPvcSource(cron) {
1✔
313
                return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
×
314
        }
×
315
        return cron.Spec.Template.Spec.Source.PVC, nil
1✔
316
}
317

318
func isPvcSource(cron *cdiv1.DataImportCron) bool {
1✔
319
        source := cron.Spec.Template.Spec.Source
1✔
320
        return source != nil && source.PVC != nil
1✔
321
}
1✔
322

323
func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
1✔
324
        return isImageStreamSource(cron) || isPvcSource(cron)
1✔
325
}
1✔
326

327
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
328
        res := reconcile.Result{}
1✔
329

1✔
330
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
331
        if err != nil {
1✔
332
                return res, err
×
333
        }
×
334

335
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
336
        imports := dataImportCron.Status.CurrentImports
1✔
337
        importSucceeded := false
1✔
338

1✔
339
        dataVolume := dataImportCron.Spec.Template
1✔
340
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
341
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
342
        if err != nil {
1✔
343
                return res, err
×
344
        }
×
345
        if desiredStorageClass != nil {
2✔
346
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
347
                        return res, err
1✔
348
                }
1✔
349
                currentSc, hasCurrent := dataImportCron.Annotations[AnnStorageClass]
1✔
350
                desiredSc := desiredStorageClass.Name
1✔
351
                if hasCurrent && currentSc != desiredSc {
2✔
352
                        r.log.Info("Storage class changed, delete most recent source on the old sc as it's no longer the desired", "currentSc", currentSc, "desiredSc", desiredSc)
1✔
353
                        if err := r.handleStorageClassChange(ctx, dataImportCron, desiredSc); err != nil {
1✔
354
                                return res, err
×
355
                        }
×
356
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
357
                }
358
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
359
        }
360
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
361
        if err != nil {
1✔
362
                return res, err
×
363
        }
×
364
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
365
        if err != nil {
1✔
366
                return res, err
×
367
        }
×
368

369
        handlePopulatedPvc := func() error {
2✔
370
                if pvc != nil {
2✔
371
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
372
                                return err
×
373
                        }
×
374
                }
375
                importSucceeded = true
1✔
376
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
377
                        return err
×
378
                }
×
379

380
                return nil
1✔
381
        }
382

383
        switch {
1✔
384
        case dv != nil:
1✔
385
                switch dv.Status.Phase {
1✔
386
                case cdiv1.Succeeded:
1✔
387
                        if err := handlePopulatedPvc(); err != nil {
1✔
388
                                return res, err
×
389
                        }
×
390
                case cdiv1.ImportScheduled:
1✔
391
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
392
                case cdiv1.ImportInProgress:
1✔
393
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
394
                default:
1✔
395
                        dvPhase := string(dv.Status.Phase)
1✔
396
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
397
                }
398
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
399
                if err := handlePopulatedPvc(); err != nil {
1✔
400
                        return res, err
×
401
                }
×
402
        case snapshot != nil:
1✔
403
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
404
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
405
                                return res, err
×
406
                        }
×
407
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
408
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
409
                }
410
                // Below k8s 1.29 there's no way to know the source volume mode
411
                // Let's at least expose this info on our own snapshots
412
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
413
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
414
                        if err != nil {
1✔
415
                                return res, err
×
416
                        }
×
417
                        if volMode != nil {
2✔
418
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
419
                        }
1✔
420
                }
421
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
422
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
423
                if err != nil {
2✔
424
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
425
                                return res, err
×
426
                        }
×
427
                } else {
1✔
428
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
429
                }
1✔
430
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
431
                        return res, err
×
432
                }
×
433
                importSucceeded = true
1✔
434
        default:
1✔
435
                if len(imports) > 0 {
2✔
436
                        imports = imports[1:]
1✔
437
                        dataImportCron.Status.CurrentImports = imports
1✔
438
                }
1✔
439
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
440
        }
441

442
        if importSucceeded {
2✔
443
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
444
                        return res, err
×
445
                }
×
446
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
447
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
448
                        return res, err
×
449
                }
×
450
        }
451

452
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
453
                return res, err
×
454
        }
×
455

456
        // Skip if schedule is disabled
457
        if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
458
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
459
                pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
1✔
460
                if err != nil {
2✔
461
                        return pollRes, err
1✔
462
                }
1✔
463
                res = pollRes
1✔
464
        }
465

466
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
467
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
468
        if digestUpdated {
2✔
469
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
470
                if dv != nil {
1✔
471
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
472
                                return res, err
×
473
                        }
×
474
                }
475
                if importSucceeded || len(imports) == 0 {
2✔
476
                        if err := r.createImportDataVolume(ctx, dataImportCron); err != nil {
2✔
477
                                return res, err
1✔
478
                        }
1✔
479
                }
480
        } else if importSucceeded {
2✔
481
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
482
                        return res, err
×
483
                }
×
484
        } else if len(imports) > 0 {
2✔
485
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
486
        } else {
2✔
487
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
488
        }
1✔
489

490
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
491
                return res, err
×
492
        }
×
493

494
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
495
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
496
                        return res, err
×
497
                }
×
498
        }
499
        return res, nil
1✔
500
}
501

502
// Returns the current import DV if exists, and the last imported PVC
503
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
504
        imports := cron.Status.CurrentImports
1✔
505
        if len(imports) == 0 {
2✔
506
                return nil, nil, nil
1✔
507
        }
1✔
508

509
        dvName := imports[0].DataVolumeName
1✔
510
        dv := &cdiv1.DataVolume{}
1✔
511
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
512
                if !k8serrors.IsNotFound(err) {
1✔
513
                        return nil, nil, err
×
514
                }
×
515
                dv = nil
1✔
516
        }
517

518
        pvc := &corev1.PersistentVolumeClaim{}
1✔
519
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
520
                if !k8serrors.IsNotFound(err) {
1✔
521
                        return nil, nil, err
×
522
                }
×
523
                pvc = nil
1✔
524
        }
525
        return dv, pvc, nil
1✔
526
}
527

528
// Returns the current import DV if exists, and the last imported PVC
529
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
530
        imports := cron.Status.CurrentImports
1✔
531
        if len(imports) == 0 {
2✔
532
                return nil, nil
1✔
533
        }
1✔
534

535
        snapName := imports[0].DataVolumeName
1✔
536
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
537
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
538
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
539
                        return nil, err
×
540
                }
×
541
                return nil, nil
1✔
542
        }
543

544
        return snapshot, nil
1✔
545
}
546

547
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
548
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
549
        dataSource := &cdiv1.DataSource{}
1✔
550
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
551
                return nil, err
1✔
552
        }
1✔
553
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
554
                log := r.log.WithName("getCronManagedDataSource")
×
555
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
×
556
                return nil, ErrNotManagedByCron
×
557
        }
×
558
        return dataSource, nil
1✔
559
}
560

561
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
562
        objCopy := obj.DeepCopyObject()
1✔
563
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
564
        r.setDataImportCronResourceLabels(cron, obj)
1✔
565
        if !reflect.DeepEqual(obj, objCopy) {
2✔
566
                if err := r.client.Update(ctx, obj); err != nil {
1✔
567
                        return err
×
568
                }
×
569
        }
570
        return nil
1✔
571
}
572

573
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
574
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
575
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
576
                if cond.Status == corev1.ConditionFalse &&
×
577
                        (cond.Reason == common.GenericError || cond.Reason == ImagePullFailedReason) {
×
578
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
579
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
580
                        dv.Labels[common.DataImportCronLabel] = ""
×
581
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
582
                                return err
×
583
                        }
×
584
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
585
                                return err
×
586
                        }
×
587
                        cron.Status.CurrentImports = nil
×
588
                }
589
        }
590
        return nil
×
591
}
592

593
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
594
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
595
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
596
        if err != nil {
1✔
597
                return err
×
598
        }
×
599
        if regSource.ImageStream == nil {
1✔
600
                return nil
×
601
        }
×
602
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
603
        if err != nil {
2✔
604
                return err
1✔
605
        }
1✔
606
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
607
        if err != nil {
2✔
608
                return err
1✔
609
        }
1✔
610
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
611
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
612
                log.Info("Updating DataImportCron", "digest", digest)
1✔
613
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
614
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
615
        }
1✔
616
        return nil
1✔
617
}
618

619
func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
620
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
621
        pvcSource, err := getCronPvcSource(dataImportCron)
1✔
622
        if err != nil {
1✔
623
                return err
×
624
        }
×
625
        ns := pvcSource.Namespace
1✔
626
        if ns == "" {
2✔
627
                ns = dataImportCron.Namespace
1✔
628
        }
1✔
629
        pvc := &corev1.PersistentVolumeClaim{}
1✔
630
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
2✔
631
                return err
1✔
632
        }
1✔
633
        digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
1✔
634
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
635
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
636
                log.Info("Updating DataImportCron", "digest", digest)
1✔
637
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
638
        }
1✔
639
        return nil
1✔
640
}
641

642
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
643
        log := r.log.WithName("updateDataSource")
1✔
644
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
645
        if err != nil {
2✔
646
                if k8serrors.IsNotFound(err) {
2✔
647
                        dataSource = r.newDataSource(dataImportCron)
1✔
648
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
649
                                return err
×
650
                        }
×
651
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
1✔
652
                } else if errors.Is(err, ErrNotManagedByCron) {
×
653
                        return nil
×
654
                } else {
×
655
                        return err
×
656
                }
×
657
        }
658
        dataSourceCopy := dataSource.DeepCopy()
1✔
659
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
660

1✔
661
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
662
        populateDataSource(format, dataSource, sourcePVC)
1✔
663

1✔
664
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
665
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
666
                        return err
×
667
                }
×
668
        }
669

670
        return nil
1✔
671
}
672

673
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
674
        if sourcePVC == nil {
2✔
675
                return
1✔
676
        }
1✔
677

678
        switch format {
1✔
679
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
680
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
681
                        PVC: sourcePVC,
1✔
682
                }
1✔
683
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
684
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
685
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
686
                                Namespace: sourcePVC.Namespace,
1✔
687
                                Name:      sourcePVC.Name,
1✔
688
                        },
1✔
689
                }
1✔
690
        }
691
}
692

693
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
694
        if dataImportCron.Status.CurrentImports == nil {
1✔
695
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
696
        }
×
697
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
698
                Namespace: dataImportCron.Namespace,
1✔
699
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
700
        }
1✔
701
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
702
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
703
                now := metav1.Now()
1✔
704
                dataImportCron.Status.LastImportTimestamp = &now
1✔
705
        }
1✔
706
        return nil
1✔
707
}
708

709
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
710
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
711
        if lastTimeStr == "" {
2✔
712
                return nil
1✔
713
        }
1✔
714
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
715
        if err != nil {
1✔
716
                return err
×
717
        }
×
718
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
719
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
720
        }
1✔
721
        return nil
1✔
722
}
723

724
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
725
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
726
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
727
        if digest == "" {
1✔
728
                return nil
×
729
        }
×
730
        dvName, err := createDvName(dataSourceName, digest)
1✔
731
        if err != nil {
2✔
732
                return err
1✔
733
        }
1✔
734
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
735

1✔
736
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
737
        for _, src := range sources {
2✔
738
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
739
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
740
                                return err
×
741
                        }
×
742
                } else {
1✔
743
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
744
                                return err
×
745
                        }
×
746
                        // If source exists don't create DV
747
                        return nil
1✔
748
                }
749
        }
750

751
        dv := r.newSourceDataVolume(dataImportCron, dvName)
1✔
752
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
753
                return err
×
754
        }
×
755

756
        return nil
1✔
757
}
758

759
func (r *DataImportCronReconciler) handleStorageClassChange(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
1✔
760
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
761
        if !ok {
1✔
762
                // nothing to delete
×
763
                return nil
×
764
        }
×
765
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
766
        if err != nil {
1✔
767
                return err
×
768
        }
×
769
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
1✔
770
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
1✔
771
        for _, src := range sources {
2✔
772
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
773
                        return err
×
774
                }
×
775
        }
776
        for _, src := range sources {
2✔
777
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
1✔
778
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
×
779
                }
×
780
        }
781
        // Only update desired storage class once garbage collection went through
782
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
1✔
783
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
784
        if err != nil {
1✔
785
                return err
×
786
        }
×
787

788
        return nil
1✔
789
}
790

791
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
792
        switch format {
1✔
793
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
794
                return nil
1✔
795
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
796
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
797
        default:
×
798
                return fmt.Errorf("unknown source format for snapshot")
×
799
        }
800
}
801

802
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
803
        if pvc == nil {
1✔
804
                return nil
×
805
        }
×
806
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
807
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
808
                return nil
1✔
809
        }
1✔
810
        storageProfile := &cdiv1.StorageProfile{}
1✔
811
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
812
                return err
×
813
        }
×
814
        className, err := cc.GetSnapshotClassForSmartClone(pvc, &desiredStorageClass.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
815
        if err != nil {
1✔
816
                return err
×
817
        }
×
818
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
819
                ObjectMeta: metav1.ObjectMeta{
1✔
820
                        Name:      pvc.Name,
1✔
821
                        Namespace: dataImportCron.Namespace,
1✔
822
                        Labels: map[string]string{
1✔
823
                                common.CDILabelKey:       common.CDILabelValue,
1✔
824
                                common.CDIComponentLabel: "",
1✔
825
                        },
1✔
826
                },
1✔
827
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
828
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
829
                                PersistentVolumeClaimName: &pvc.Name,
1✔
830
                        },
1✔
831
                        VolumeSnapshotClassName: &className,
1✔
832
                },
1✔
833
        }
1✔
834
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
835
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
1✔
836

1✔
837
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
838
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
839
                if !k8serrors.IsNotFound(err) {
1✔
840
                        return err
×
841
                }
×
842
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
843
                if pvc.Spec.VolumeMode != nil {
2✔
844
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
845
                }
1✔
846
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
847
                        return err
×
848
                }
×
849
        } else {
1✔
850
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
851
                        // Clean up DV/PVC as they are not needed anymore
1✔
852
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
853
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
854
                                return err
×
855
                        }
×
856
                }
857
        }
858

859
        return nil
1✔
860
}
861

862
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
863
        dataImportCron.Status.SourceFormat = &format
1✔
864

1✔
865
        switch format {
1✔
866
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
867
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
868
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
869
                if snapshot == nil {
2✔
870
                        // Snapshot create/update will trigger reconcile
1✔
871
                        return nil
1✔
872
                }
1✔
873
                if cc.IsSnapshotReady(snapshot) {
2✔
874
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
875
                } else {
2✔
876
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
877
                }
1✔
878
        default:
×
879
                return fmt.Errorf("unknown source format for snapshot")
×
880
        }
881

882
        return nil
1✔
883
}
884

885
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
886
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
887
        if desiredStorageClass == nil {
2✔
888
                return format, nil
1✔
889
        }
1✔
890

891
        storageProfile := &cdiv1.StorageProfile{}
1✔
892
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
893
                return format, err
×
894
        }
×
895
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
896
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
897
        }
1✔
898

899
        return format, nil
1✔
900
}
901

902
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
903
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
904
                return nil
×
905
        }
×
906
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
907
        if err != nil {
1✔
908
                return err
×
909
        }
×
910

911
        maxImports := defaultImportsToKeepPerCron
1✔
912

1✔
913
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
914
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
915
        }
1✔
916

917
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
918
                return err
×
919
        }
×
920
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
921
                return err
×
922
        }
×
923

924
        return nil
1✔
925
}
926

927
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
928
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
929

1✔
930
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
931
                return err
×
932
        }
×
933
        if len(pvcList.Items) > maxImports {
2✔
934
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
935
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
936
                })
1✔
937
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
938
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
939
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
940
                                return err
×
941
                        }
×
942
                }
943
        }
944

945
        dvList := &cdiv1.DataVolumeList{}
1✔
946
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
947
                return err
×
948
        }
×
949

950
        if len(dvList.Items) > maxImports {
2✔
951
                for _, dv := range dvList.Items {
2✔
952
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
953
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
954
                                return err
×
955
                        }
×
956

957
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
958
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
959
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
960
                                        return err
×
961
                                }
×
962
                        }
963
                }
964
        }
965

966
        return nil
1✔
967
}
968

969
// deleteDvPvc deletes DV or PVC if DV was GCed
970
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
971
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
972
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
973
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
974
                return err
1✔
975
        }
1✔
976
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
977
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
978
                return err
×
979
        }
×
980
        return nil
1✔
981
}
982

983
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
984
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
985

1✔
986
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
987
                if meta.IsNoMatchError(err) {
×
988
                        return nil
×
989
                }
×
990
                return err
×
991
        }
992
        if len(snapList.Items) > maxImports {
1✔
993
                sort.Slice(snapList.Items, func(i, j int) bool {
×
994
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
995
                })
×
996
                for _, snap := range snapList.Items[maxImports:] {
×
997
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
998
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
999
                                return err
×
1000
                        }
×
1001
                }
1002
        }
1003

1004
        return nil
1✔
1005
}
1006

1007
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
1008
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
1009
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
1010

1✔
1011
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
1012
                return err
×
1013
        }
×
1014
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
1015
        if err != nil {
1✔
1016
                return err
×
1017
        }
×
1018
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
1019
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
1020
                return err
×
1021
        }
×
1022
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
1023
                return err
×
1024
        }
×
1025
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
1026
                return err
×
1027
        }
×
1028
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
1029
                return err
×
1030
        }
×
1031
        return nil
1✔
1032
}
1033

1034
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
1035
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
1036
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
1037
        if err != nil {
1✔
1038
                return err
×
1039
        }
×
1040
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
1041
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
1042
                return err
×
1043
        }
×
1044
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
1045
                return err
×
1046
        }
×
1047

1048
        return nil
1✔
1049
}
1050

1051
// NewDataImportCronController creates a new instance of the DataImportCron controller
1052
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
1053
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
1054
                Scheme: mgr.GetScheme(),
×
1055
                Mapper: mgr.GetRESTMapper(),
×
1056
        })
×
1057
        if err != nil {
×
1058
                return nil, err
×
1059
        }
×
1060
        reconciler := &DataImportCronReconciler{
×
1061
                client:          mgr.GetClient(),
×
1062
                uncachedClient:  uncachedClient,
×
1063
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1064
                scheme:          mgr.GetScheme(),
×
1065
                log:             log.WithName(dataImportControllerName),
×
1066
                image:           importerImage,
×
1067
                pullPolicy:      pullPolicy,
×
1068
                cdiNamespace:    util.GetNamespace(),
×
1069
                installerLabels: installerLabels,
×
1070
        }
×
1071
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1072
                MaxConcurrentReconciles: 3,
×
1073
                Reconciler:              reconciler,
×
1074
        })
×
1075
        if err != nil {
×
1076
                return nil, err
×
1077
        }
×
1078
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
1079
                return nil, err
×
1080
        }
×
1081
        log.Info("Initialized DataImportCron controller")
×
1082
        return dataImportCronController, nil
×
1083
}
1084

1085
func getCronName(obj client.Object) string {
×
1086
        return obj.GetLabels()[common.DataImportCronLabel]
×
1087
}
×
1088

1089
func getCronNs(obj client.Object) string {
×
1090
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
1091
}
×
1092

1093
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
1094
        if cronName := getCronName(obj); cronName != "" {
×
1095
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
1096
        }
×
1097
        return nil
×
1098
}
1099

1100
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1101
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1102
                return err
×
1103
        }
×
1104

1105
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1106
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1107
                // Otherwise we risk losing the storage profile event
×
1108
                var crons cdiv1.DataImportCronList
×
1109
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
1110
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
1111
                        return nil
×
1112
                }
×
1113
                // Storage profiles are 1:1 to storage classes
1114
                scName := obj.GetName()
×
1115
                var reqs []reconcile.Request
×
1116
                for _, cron := range crons.Items {
×
1117
                        dataVolume := cron.Spec.Template
×
1118
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1119
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1120
                        if err != nil || templateSc == nil {
×
1121
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1122
                                return reqs
×
1123
                        }
×
1124
                        if templateSc.Name == scName {
×
1125
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1126
                        }
×
1127
                }
1128
                return reqs
×
1129
        }
1130

1131
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1132
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1133
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1134
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1135
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1136
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1137
                },
1138
        )); err != nil {
×
1139
                return err
×
1140
        }
×
1141

1142
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1143
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1144
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1145
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1146
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1147
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1148
                },
1149
        )); err != nil {
×
1150
                return err
×
1151
        }
×
1152

1153
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1154
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1155
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1156
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1157
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1158
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1159
                },
1160
        )); err != nil {
×
1161
                return err
×
1162
        }
×
1163

1164
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1165
                return err
×
1166
        }
×
1167

1168
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1169
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1170
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
1171
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1172
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1173
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1174
                                return e.ObjectOld.Status.DataImportCronSourceFormat != e.ObjectNew.Status.DataImportCronSourceFormat
×
1175
                        },
×
1176
                },
1177
        )); err != nil {
×
1178
                return err
×
1179
        }
×
1180

1181
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1182
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1183
        }
×
1184

1185
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1186
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1187
                predicate.TypedFuncs[*batchv1.CronJob]{
×
1188
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1189
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1190
                        },
×
1191
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
1192
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1193
                },
1194
        )); err != nil {
×
1195
                return err
×
1196
        }
×
1197

1198
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1199
                if meta.IsNoMatchError(err) {
×
1200
                        // Back out if there's no point to attempt watch
×
1201
                        return nil
×
1202
                }
×
1203
                if !cc.IsErrCacheNotStarted(err) {
×
1204
                        return err
×
1205
                }
×
1206
        }
1207
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1208
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
1209
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1210
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1211
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1212
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1213
                },
1214
        )); err != nil {
×
1215
                return err
×
1216
        }
×
1217

1218
        return nil
×
1219
}
1220

1221
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1222
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1223
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1224
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
1225
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1226
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1227
                                log.Info("Update", "sc", obj.GetName(),
×
1228
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1229
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1230
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
×
1231
                                if err != nil {
×
1232
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
1233
                                }
×
1234
                                return reqs
×
1235
                        },
1236
                ),
1237
                predicate.TypedFuncs[*storagev1.StorageClass]{
1238
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1239
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1240
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1241
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1242
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1243
                        },
×
1244
                },
1245
        )); err != nil {
×
1246
                return err
×
1247
        }
×
1248

1249
        return nil
×
1250
}
1251

1252
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1253
        dicList := &cdiv1.DataImportCronList{}
×
1254
        if err := c.List(ctx, dicList); err != nil {
×
1255
                return nil, err
×
1256
        }
×
1257
        reqs := []reconcile.Request{}
×
1258
        for _, dic := range dicList.Items {
×
1259
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
1260
                        continue
×
1261
                }
1262

1263
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1264
        }
1265

1266
        return reqs, nil
×
1267
}
1268

1269
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
1✔
1270
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1271
                return false, nil
1✔
1272
        }
1✔
1273

1274
        sc := pvc.Spec.StorageClassName
1✔
1275
        if sc == nil || *sc == desiredStorageClass {
2✔
1276
                return false, nil
1✔
1277
        }
1✔
1278

1279
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1✔
1280
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
1281
                return false, err
×
1282
        }
×
1283

1284
        return true, nil
1✔
1285
}
1286

1287
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1288
        cronJob := &batchv1.CronJob{}
1✔
1289
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1290
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1291
                return false, cc.IgnoreNotFound(err)
1✔
1292
        }
1✔
1293

1294
        cronJobCopy := cronJob.DeepCopy()
1✔
1295
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1296
                return false, err
×
1297
        }
×
1298

1299
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1300
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1301
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
1302
                        return false, cc.IgnoreNotFound(err)
×
1303
                }
×
1304
        }
1305
        return true, nil
1✔
1306
}
1307

1308
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1309
        cronJob := &batchv1.CronJob{
1✔
1310
                ObjectMeta: metav1.ObjectMeta{
1✔
1311
                        Name:      GetCronJobName(cron),
1✔
1312
                        Namespace: r.cdiNamespace,
1✔
1313
                },
1✔
1314
        }
1✔
1315
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
1316
                return nil, err
×
1317
        }
×
1318
        return cronJob, nil
1✔
1319
}
1320

1321
// InitPollerPodSpec inits poller PodSpec
1322
func InitPollerPodSpec(c client.Client, cron *cdiv1.DataImportCron, podSpec *corev1.PodSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1323
        regSource, err := getCronRegistrySource(cron)
1✔
1324
        if err != nil {
1✔
1325
                return err
×
1326
        }
×
1327
        if regSource.URL == nil {
1✔
1328
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1329
        }
×
1330
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1331
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
1332
                return err
×
1333
        }
×
1334
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1335
        if err != nil {
1✔
1336
                return err
×
1337
        }
×
1338
        container := corev1.Container{
1✔
1339
                Name:  "cdi-source-update-poller",
1✔
1340
                Image: image,
1✔
1341
                Command: []string{
1✔
1342
                        "/usr/bin/cdi-source-update-poller",
1✔
1343
                        "-ns", cron.Namespace,
1✔
1344
                        "-cron", cron.Name,
1✔
1345
                        "-url", *regSource.URL,
1✔
1346
                },
1✔
1347
                ImagePullPolicy:          pullPolicy,
1✔
1348
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1349
                TerminationMessagePolicy: corev1.TerminationMessageReadFile,
1✔
1350
        }
1✔
1351

1✔
1352
        var volumes []corev1.Volume
1✔
1353
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1354
        if hasCertConfigMap {
1✔
1355
                vm := corev1.VolumeMount{
×
1356
                        Name:      CertVolName,
×
1357
                        MountPath: common.ImporterCertDir,
×
1358
                }
×
1359
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1360
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
1361
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
1362
        }
×
1363

1364
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1365
                vm := corev1.VolumeMount{
1✔
1366
                        Name:      ProxyCertVolName,
1✔
1367
                        MountPath: common.ImporterProxyCertDir,
1✔
1368
                }
1✔
1369
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1370
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1371
        }
1✔
1372

1373
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
1374
                container.Env = append(container.Env,
×
1375
                        corev1.EnvVar{
×
1376
                                Name: common.ImporterAccessKeyID,
×
1377
                                ValueFrom: &corev1.EnvVarSource{
×
1378
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1379
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1380
                                                        Name: *regSource.SecretRef,
×
1381
                                                },
×
1382
                                                Key: common.KeyAccess,
×
1383
                                        },
×
1384
                                },
×
1385
                        },
×
1386
                        corev1.EnvVar{
×
1387
                                Name: common.ImporterSecretKey,
×
1388
                                ValueFrom: &corev1.EnvVarSource{
×
1389
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1390
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1391
                                                        Name: *regSource.SecretRef,
×
1392
                                                },
×
1393
                                                Key: common.KeySecret,
×
1394
                                        },
×
1395
                                },
×
1396
                        },
×
1397
                )
×
1398
        }
×
1399

1400
        addEnvVar := func(varName, value string) {
2✔
1401
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1402
        }
1✔
1403

1404
        if insecureTLS {
1✔
1405
                addEnvVar(common.InsecureTLSVar, "true")
×
1406
        }
×
1407

1408
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1409
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1410
                        addEnvVar(varName, value)
1✔
1411
                }
1✔
1412
        }
1413

1414
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1415
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1416
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1417

1✔
1418
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1419
        if err != nil {
1✔
1420
                return err
×
1421
        }
×
1422
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1423
        if err != nil {
1✔
1424
                return err
×
1425
        }
×
1426

1427
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1428
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1429
        podSpec.Containers = []corev1.Container{container}
1✔
1430
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1431
        podSpec.Volumes = volumes
1✔
1432
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1433
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1434
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1435
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1436

1✔
1437
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1438

1✔
1439
        return nil
1✔
1440
}
1441

1442
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1443
        cronJobSpec := &cronJob.Spec
1✔
1444
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1445
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1446
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1447
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1448

1✔
1449
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1450
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1451
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1452

1✔
1453
        podSpec := &jobSpec.Template.Spec
1✔
1454
        if err := InitPollerPodSpec(r.client, cron, podSpec, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1455
                return err
×
1456
        }
×
1457
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1458
                return err
×
1459
        }
×
1460
        return nil
1✔
1461
}
1462

1463
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1464
        job := &batchv1.Job{
1✔
1465
                ObjectMeta: metav1.ObjectMeta{
1✔
1466
                        Name:      GetInitialJobName(cron),
1✔
1467
                        Namespace: cronJob.Namespace,
1✔
1468
                },
1✔
1469
                Spec: cronJob.Spec.JobTemplate.Spec,
1✔
1470
        }
1✔
1471
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1472
                return nil, err
×
1473
        }
×
1474
        return job, nil
1✔
1475
}
1476

1477
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1478
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1479
                return err
×
1480
        }
×
1481
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1482
        labels := obj.GetLabels()
1✔
1483
        labels[common.DataImportCronNsLabel] = cron.Namespace
1✔
1484
        labels[common.DataImportCronLabel] = cron.Name
1✔
1485
        obj.SetLabels(labels)
1✔
1486
        return nil
1✔
1487
}
1488

1489
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string) *cdiv1.DataVolume {
1✔
1490
        dv := cron.Spec.Template.DeepCopy()
1✔
1491
        if isCronRegistrySource(cron) {
2✔
1492
                var digestedURL string
1✔
1493
                if isURLSource(cron) {
2✔
1494
                        digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1495
                } else if isImageStreamSource(cron) {
3✔
1496
                        // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1497
                        digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1498
                        dv.Spec.Source.Registry.ImageStream = nil
1✔
1499
                }
1✔
1500
                dv.Spec.Source.Registry.URL = &digestedURL
1✔
1501
        }
1502
        dv.Name = dataVolumeName
1✔
1503
        dv.Namespace = cron.Namespace
1✔
1504
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1505
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1506
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1507
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1508

1✔
1509
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
1510
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1511
        }
1✔
1512

1513
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1514

1✔
1515
        return dv
1✔
1516
}
1517

1518
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1519
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1520
        labels := obj.GetLabels()
1✔
1521
        labels[common.DataImportCronLabel] = cron.Name
1✔
1522
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
2✔
1523
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1524
        }
1✔
1525
        obj.SetLabels(labels)
1✔
1526
}
1527

1528
func untagDigestedDockerURL(dockerURL string) string {
1✔
1529
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1530
                url := u.Host + u.Path
1✔
1531
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1532
                // Check for tag
1✔
1533
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1534
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
2✔
1535
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1536
                        }
1✔
1537
                }
1538
        }
1539
        return dockerURL
1✔
1540
}
1541

1542
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1543
        if val := cron.Labels[ann]; val != "" {
2✔
1544
                cc.AddLabel(dv, ann, val)
1✔
1545
        }
1✔
1546
}
1547

1548
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1549
        if val := cron.Annotations[ann]; val != "" {
1✔
1550
                cc.AddAnnotation(dv, ann, val)
×
1551
        }
×
1552
}
1553

1554
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1✔
1555
        dataSource := &cdiv1.DataSource{
1✔
1556
                ObjectMeta: metav1.ObjectMeta{
1✔
1557
                        Name:      cron.Spec.ManagedDataSource,
1✔
1558
                        Namespace: cron.Namespace,
1✔
1559
                },
1✔
1560
        }
1✔
1561
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1562
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1563
        return dataSource
1✔
1564
}
1✔
1565

1566
// Create DataVolume name based on the DataSource name + prefix of the digest
1567
func createDvName(prefix, digest string) (string, error) {
1✔
1568
        digestPrefix := ""
1✔
1569
        if strings.HasPrefix(digest, digestSha256Prefix) {
2✔
1570
                digestPrefix = digestSha256Prefix
1✔
1571
        } else if strings.HasPrefix(digest, digestUIDPrefix) {
3✔
1572
                digestPrefix = digestUIDPrefix
1✔
1573
        } else {
2✔
1574
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1575
        }
1✔
1576
        fromIdx := len(digestPrefix)
1✔
1577
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1578
        if len(digest) < toIdx {
2✔
1579
                return "", errors.Errorf("Digest is too short")
1✔
1580
        }
1✔
1581
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1582
}
1583

1584
// GetCronJobName get CronJob name based on cron name and UID
1585
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1586
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1587
}
1✔
1588

1589
// GetInitialJobName get initial job name based on cron name and UID
1590
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1591
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1592
}
1✔
1593

1594
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1595
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1596
}
1✔
1597

1598
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1599
        dv := &cron.Spec.Template
1✔
1600

1✔
1601
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1602
                return explicitVolumeMode, nil
×
1603
        }
×
1604

1605
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1606
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1607
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1608
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1609
                        AccessModes:      accessModes,
1✔
1610
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1611
                        Resources: corev1.VolumeResourceRequirements{
1✔
1612
                                Requests: corev1.ResourceList{
1✔
1613
                                        // Doesn't matter
1✔
1614
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1615
                                },
1✔
1616
                        },
1✔
1617
                },
1✔
1618
        }
1✔
1619
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1620
                return nil, err
×
1621
        }
×
1622

1623
        return inferredPvc.Spec.VolumeMode, nil
1✔
1624
}
1625

1626
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1627
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1628
        if dv.Spec.PVC != nil {
1✔
1629
                return dv.Spec.PVC.VolumeMode
×
1630
        }
×
1631

1632
        if dv.Spec.Storage != nil {
2✔
1633
                return dv.Spec.Storage.VolumeMode
1✔
1634
        }
1✔
1635

1636
        return nil
×
1637
}
1638

1639
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1640
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1641
        if dv.Spec.PVC != nil {
1✔
1642
                return dv.Spec.PVC.AccessModes
×
1643
        }
×
1644

1645
        if dv.Spec.Storage != nil {
2✔
1646
                return dv.Spec.Storage.AccessModes
1✔
1647
        }
1✔
1648

1649
        return nil
×
1650
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc