• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #4835

08 Aug 2024 09:29PM UTC coverage: 59.157% (-0.007%) from 59.164%
#4835

push

travis-ci

web-flow
Add memory limit workaround (#3349)

Signed-off-by: Thomas-David Griedel <griedel911@gmail.com>

16468 of 27838 relevant lines covered (59.16%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.32
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        "github.com/pkg/errors"
33
        cronexpr "github.com/robfig/cron/v3"
34

35
        batchv1 "k8s.io/api/batch/v1"
36
        corev1 "k8s.io/api/core/v1"
37
        storagev1 "k8s.io/api/storage/v1"
38
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
39
        "k8s.io/apimachinery/pkg/api/meta"
40
        "k8s.io/apimachinery/pkg/api/resource"
41
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
42
        "k8s.io/apimachinery/pkg/labels"
43
        "k8s.io/apimachinery/pkg/runtime"
44
        "k8s.io/apimachinery/pkg/types"
45
        "k8s.io/client-go/tools/record"
46
        "k8s.io/utils/ptr"
47

48
        "sigs.k8s.io/controller-runtime/pkg/client"
49
        "sigs.k8s.io/controller-runtime/pkg/controller"
50
        "sigs.k8s.io/controller-runtime/pkg/event"
51
        "sigs.k8s.io/controller-runtime/pkg/handler"
52
        "sigs.k8s.io/controller-runtime/pkg/manager"
53
        "sigs.k8s.io/controller-runtime/pkg/predicate"
54
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
55
        "sigs.k8s.io/controller-runtime/pkg/source"
56

57
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
58
        "kubevirt.io/containerized-data-importer/pkg/common"
59
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
60
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
61
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
62
        "kubevirt.io/containerized-data-importer/pkg/operator"
63
        "kubevirt.io/containerized-data-importer/pkg/util"
64
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
65
)
66

67
const (
68
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
69
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
70
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
71
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
72
)
73

74
// DataImportCronReconciler members
75
type DataImportCronReconciler struct {
76
        client          client.Client
77
        uncachedClient  client.Client
78
        recorder        record.EventRecorder
79
        scheme          *runtime.Scheme
80
        log             logr.Logger
81
        image           string
82
        pullPolicy      string
83
        cdiNamespace    string
84
        installerLabels map[string]string
85
}
86

87
const (
88
        // AnnSourceDesiredDigest is the digest of the pending updated image
89
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
90
        // AnnImageStreamDockerRef is the ImageStream Docker reference
91
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
92
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
93
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
94
        // AnnLastCronTime is the cron last execution time stamp
95
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
96
        // AnnLastUseTime is the PVC last use time stamp
97
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
98
        // AnnStorageClass is the cron DV's storage class
99
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
100

101
        dataImportControllerName    = "dataimportcron-controller"
102
        digestPrefix                = "sha256:"
103
        digestDvNameSuffixLength    = 12
104
        cronJobUIDSuffixLength      = 8
105
        defaultImportsToKeepPerCron = 3
106
)
107

108
// Reconcile loop for DataImportCronReconciler
109
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
110
        dataImportCron := &cdiv1.DataImportCron{}
1✔
111
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
112
                return reconcile.Result{}, err
1✔
113
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
114
                err := r.cleanup(ctx, req.NamespacedName)
1✔
115
                return reconcile.Result{}, err
1✔
116
        }
1✔
117
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
118
        if !shouldReconcile || err != nil {
1✔
119
                return reconcile.Result{}, err
×
120
        }
×
121

122
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
123
                return reconcile.Result{}, err
×
124
        }
×
125

126
        return r.update(ctx, dataImportCron)
1✔
127
}
128

129
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
130
        dataSource := &cdiv1.DataSource{}
1✔
131
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
132
                if k8serrors.IsNotFound(err) {
2✔
133
                        return true, nil
1✔
134
                }
1✔
135
                return false, err
×
136
        }
137
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
138
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
139
                return true, nil
1✔
140
        }
1✔
141
        otherCron := &cdiv1.DataImportCron{}
1✔
142
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
143
                if k8serrors.IsNotFound(err) {
2✔
144
                        return true, nil
1✔
145
                }
1✔
146
                return false, err
×
147
        }
148
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
149
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
150
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
151
                r.log.V(3).Info(msg)
1✔
152
                return false, nil
1✔
153
        }
1✔
154
        return true, nil
×
155
}
156

157
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
158
        if dataImportCron.Spec.Schedule == "" {
2✔
159
                return nil
1✔
160
        }
1✔
161
        if isImageStreamSource(dataImportCron) {
2✔
162
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
163
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
164
                }
1✔
165
                return nil
1✔
166
        }
167
        if !isURLSource(dataImportCron) {
1✔
168
                return nil
×
169
        }
×
170
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
171
        if exists || err != nil {
2✔
172
                return err
1✔
173
        }
1✔
174
        cronJob, err := r.newCronJob(dataImportCron)
1✔
175
        if err != nil {
1✔
176
                return err
×
177
        }
×
178
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
179
                return err
×
180
        }
×
181
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
182
        if err != nil {
1✔
183
                return err
×
184
        }
×
185
        if err := r.client.Create(ctx, job); err != nil {
1✔
186
                return err
×
187
        }
×
188
        return nil
1✔
189
}
190

191
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
192
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
193
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
194
        }
×
195
        imageStream := &imagev1.ImageStream{}
1✔
196
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
197
        if err != nil {
2✔
198
                return nil, "", err
1✔
199
        }
1✔
200
        imageStreamNamespacedName := types.NamespacedName{
1✔
201
                Namespace: imageStreamNamespace,
1✔
202
                Name:      name,
1✔
203
        }
1✔
204
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
205
                return nil, "", err
1✔
206
        }
1✔
207
        return imageStream, tag, nil
1✔
208
}
209

210
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
211
        if imageStream == nil {
1✔
212
                return "", "", errors.Errorf("No ImageStream")
×
213
        }
×
214
        tags := imageStream.Status.Tags
1✔
215
        if len(tags) == 0 {
1✔
216
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
217
        }
×
218

219
        tagIdx := 0
1✔
220
        if len(imageStreamTag) > 0 {
2✔
221
                tagIdx = -1
1✔
222
                for i, tag := range tags {
2✔
223
                        if tag.Tag == imageStreamTag {
2✔
224
                                tagIdx = i
1✔
225
                                break
1✔
226
                        }
227
                }
228
        }
229
        if tagIdx == -1 {
2✔
230
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
231
        }
1✔
232

233
        if len(tags[tagIdx].Items) == 0 {
2✔
234
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
235
        }
1✔
236
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
237
}
238

239
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
240
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
241
                return imageStreamName, "", nil
1✔
242
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
243
                return subs[0], subs[1], nil
1✔
244
        }
1✔
245
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
246
}
247

248
func (r *DataImportCronReconciler) pollImageStreamDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
249
        if nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]; nextTimeStr != "" {
2✔
250
                nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
251
                if err != nil {
1✔
252
                        return reconcile.Result{}, err
×
253
                }
×
254
                if nextTime.Before(time.Now()) {
2✔
255
                        if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
256
                                return reconcile.Result{}, err
1✔
257
                        }
1✔
258
                }
259
        }
260
        return r.setNextCronTime(dataImportCron)
1✔
261
}
262

263
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
264
        now := time.Now()
1✔
265
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
266
        if err != nil {
1✔
267
                return reconcile.Result{}, err
×
268
        }
×
269
        nextTime := expr.Next(now)
1✔
270
        requeueAfter := nextTime.Sub(now)
1✔
271
        res := reconcile.Result{Requeue: true, RequeueAfter: requeueAfter}
1✔
272
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
273
        return res, err
1✔
274
}
275

276
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
277
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
278
        return err == nil && regSource.ImageStream != nil
1✔
279
}
1✔
280

281
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
282
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
283
        return err == nil && regSource.URL != nil
1✔
284
}
1✔
285

286
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
287
        source := cron.Spec.Template.Spec.Source
1✔
288
        if source == nil || source.Registry == nil {
1✔
289
                return nil, errors.Errorf("Cron with no registry source %s", cron.Name)
×
290
        }
×
291
        return source.Registry, nil
1✔
292
}
293

294
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
295
        res := reconcile.Result{}
1✔
296

1✔
297
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
298
        if err != nil {
1✔
299
                return res, err
×
300
        }
×
301

302
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
303
        imports := dataImportCron.Status.CurrentImports
1✔
304
        importSucceeded := false
1✔
305

1✔
306
        dataVolume := dataImportCron.Spec.Template
1✔
307
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
308
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
309
        if err != nil {
1✔
310
                return res, err
×
311
        }
×
312
        if desiredStorageClass != nil {
2✔
313
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
314
                        return res, err
1✔
315
                }
1✔
316
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
317
        }
318
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
319
        if err != nil {
1✔
320
                return res, err
×
321
        }
×
322
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
323
        if err != nil {
1✔
324
                return res, err
×
325
        }
×
326

327
        handlePopulatedPvc := func() error {
2✔
328
                if pvc != nil {
2✔
329
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
330
                                return err
×
331
                        }
×
332
                }
333
                importSucceeded = true
1✔
334
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
335
                        return err
×
336
                }
×
337

338
                return nil
1✔
339
        }
340

341
        switch {
1✔
342
        case dv != nil:
1✔
343
                switch dv.Status.Phase {
1✔
344
                case cdiv1.Succeeded:
1✔
345
                        if err := handlePopulatedPvc(); err != nil {
1✔
346
                                return res, err
×
347
                        }
×
348
                case cdiv1.ImportScheduled:
1✔
349
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
350
                case cdiv1.ImportInProgress:
1✔
351
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
352
                default:
1✔
353
                        dvPhase := string(dv.Status.Phase)
1✔
354
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
355
                }
356
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
357
                // TODO: with plain populator PVCs (no DataVolumes) we may need to wait for corev1.Bound
1✔
358
                if err := handlePopulatedPvc(); err != nil {
1✔
359
                        return res, err
×
360
                }
×
361
        case snapshot != nil:
1✔
362
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
363
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
364
                                return res, err
×
365
                        }
×
366
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
367
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
368
                }
369
                // Below k8s 1.29 there's no way to know the source volume mode
370
                // Let's at least expose this info on our own snapshots
371
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
372
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
373
                        if err != nil {
1✔
374
                                return res, err
×
375
                        }
×
376
                        if volMode != nil {
2✔
377
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
378
                        }
1✔
379
                }
380
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
381
                        return res, err
×
382
                }
×
383
                importSucceeded = true
1✔
384
        default:
1✔
385
                if len(imports) > 0 {
2✔
386
                        imports = imports[1:]
1✔
387
                        dataImportCron.Status.CurrentImports = imports
1✔
388
                }
1✔
389
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
390
        }
391

392
        if importSucceeded {
2✔
393
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
394
                        return res, err
×
395
                }
×
396
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
397
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
398
                        return res, err
×
399
                }
×
400
        }
401

402
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
403
                return res, err
×
404
        }
×
405

406
        // Skip if schedule is disabled
407
        if isImageStreamSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
408
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
409
                pollRes, err := r.pollImageStreamDigest(ctx, dataImportCron)
1✔
410
                if err != nil {
2✔
411
                        return pollRes, err
1✔
412
                }
1✔
413
                res = pollRes
1✔
414
        }
415

416
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
417
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
418
        if digestUpdated {
2✔
419
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
420
                if dv != nil {
1✔
421
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
422
                                return res, err
×
423
                        }
×
424
                }
425
                if importSucceeded || len(imports) == 0 {
2✔
426
                        if err := r.createImportDataVolume(ctx, dataImportCron); err != nil {
2✔
427
                                return res, err
1✔
428
                        }
1✔
429
                }
430
        } else if importSucceeded {
2✔
431
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
432
                        return res, err
×
433
                }
×
434
        } else if len(imports) > 0 {
2✔
435
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
436
        } else {
2✔
437
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
438
        }
1✔
439

440
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
441
                return res, err
×
442
        }
×
443

444
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
445
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
446
                        return res, err
×
447
                }
×
448
        }
449
        return res, nil
1✔
450
}
451

452
// Returns the current import DV if exists, and the last imported PVC
453
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
454
        imports := cron.Status.CurrentImports
1✔
455
        if len(imports) == 0 {
2✔
456
                return nil, nil, nil
1✔
457
        }
1✔
458

459
        dvName := imports[0].DataVolumeName
1✔
460
        dv := &cdiv1.DataVolume{}
1✔
461
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
462
                if !k8serrors.IsNotFound(err) {
1✔
463
                        return nil, nil, err
×
464
                }
×
465
                dv = nil
1✔
466
        }
467

468
        pvc := &corev1.PersistentVolumeClaim{}
1✔
469
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
470
                if !k8serrors.IsNotFound(err) {
1✔
471
                        return nil, nil, err
×
472
                }
×
473
                pvc = nil
1✔
474
        }
475
        return dv, pvc, nil
1✔
476
}
477

478
// Returns the current import DV if exists, and the last imported PVC
479
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
480
        imports := cron.Status.CurrentImports
1✔
481
        if len(imports) == 0 {
2✔
482
                return nil, nil
1✔
483
        }
1✔
484

485
        snapName := imports[0].DataVolumeName
1✔
486
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
487
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
488
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
489
                        return nil, err
×
490
                }
×
491
                return nil, nil
1✔
492
        }
493

494
        return snapshot, nil
1✔
495
}
496

497
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
498
        objCopy := obj.DeepCopyObject()
1✔
499
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
500
        r.setDataImportCronResourceLabels(cron, obj)
1✔
501
        if !reflect.DeepEqual(obj, objCopy) {
2✔
502
                if err := r.client.Update(ctx, obj); err != nil {
1✔
503
                        return err
×
504
                }
×
505
        }
506
        return nil
1✔
507
}
508

509
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
510
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
511
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
512
                if cond.Status == corev1.ConditionFalse && cond.Reason == common.GenericError {
×
513
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
514
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
515
                        dv.Labels[common.DataImportCronLabel] = ""
×
516
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
517
                                return err
×
518
                        }
×
519
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
520
                                return err
×
521
                        }
×
522
                        cron.Status.CurrentImports = nil
×
523
                }
524
        }
525
        return nil
×
526
}
527

528
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
529
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
530
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
531
        if err != nil {
1✔
532
                return err
×
533
        }
×
534
        if regSource.ImageStream == nil {
1✔
535
                return nil
×
536
        }
×
537
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
538
        if err != nil {
2✔
539
                return err
1✔
540
        }
1✔
541
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
542
        if err != nil {
2✔
543
                return err
1✔
544
        }
1✔
545
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
546
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
547
                log.Info("Updating DataImportCron", "digest", digest)
1✔
548
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
549
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
550
        }
1✔
551
        return nil
1✔
552
}
553

554
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
555
        log := r.log.WithName("updateDataSource")
1✔
556
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
557
        dataSource := &cdiv1.DataSource{}
1✔
558
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
559
                if k8serrors.IsNotFound(err) {
2✔
560
                        dataSource = r.newDataSource(dataImportCron)
1✔
561
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
562
                                return err
×
563
                        }
×
564
                        log.Info("DataSource created", "name", dataSourceName, "uid", dataSource.UID)
1✔
565
                } else {
×
566
                        return err
×
567
                }
×
568
        }
569
        if dataSource.Labels[common.DataImportCronLabel] == "" {
1✔
570
                log.Info("DataSource has no DataImportCron label, so it is not updated", "name", dataSourceName, "uid", dataSource.UID)
×
571
                return nil
×
572
        }
×
573
        dataSourceCopy := dataSource.DeepCopy()
1✔
574
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
575

1✔
576
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
577
                passCronLabelToDataSource(dataImportCron, dataSource, defaultInstanceTypeLabel)
1✔
578
        }
1✔
579

580
        passCronLabelToDataSource(dataImportCron, dataSource, cc.LabelDynamicCredentialSupport)
1✔
581

1✔
582
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
583
        populateDataSource(format, dataSource, sourcePVC)
1✔
584

1✔
585
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
586
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
587
                        return err
×
588
                }
×
589
        }
590

591
        return nil
1✔
592
}
593

594
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
595
        if sourcePVC == nil {
2✔
596
                return
1✔
597
        }
1✔
598

599
        switch format {
1✔
600
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
601
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
602
                        PVC: sourcePVC,
1✔
603
                }
1✔
604
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
605
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
606
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
607
                                Namespace: sourcePVC.Namespace,
1✔
608
                                Name:      sourcePVC.Name,
1✔
609
                        },
1✔
610
                }
1✔
611
        }
612
}
613

614
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
615
        if dataImportCron.Status.CurrentImports == nil {
1✔
616
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
617
        }
×
618
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
619
                Namespace: dataImportCron.Namespace,
1✔
620
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
621
        }
1✔
622
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
623
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
624
                now := metav1.Now()
1✔
625
                dataImportCron.Status.LastImportTimestamp = &now
1✔
626
        }
1✔
627
        return nil
1✔
628
}
629

630
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
631
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
632
        if lastTimeStr == "" {
2✔
633
                return nil
1✔
634
        }
1✔
635
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
636
        if err != nil {
1✔
637
                return err
×
638
        }
×
639
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
640
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
641
        }
1✔
642
        return nil
1✔
643
}
644

645
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
646
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
647
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
648
        if digest == "" {
1✔
649
                return nil
×
650
        }
×
651
        dvName, err := createDvName(dataSourceName, digest)
1✔
652
        if err != nil {
2✔
653
                return err
1✔
654
        }
1✔
655
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
656

1✔
657
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
658
        for _, src := range sources {
2✔
659
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
660
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
661
                                return err
×
662
                        }
×
663
                } else {
1✔
664
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
665
                                return err
×
666
                        }
×
667
                        // If source exists don't create DV
668
                        return nil
1✔
669
                }
670
        }
671

672
        dv := r.newSourceDataVolume(dataImportCron, dvName)
1✔
673
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
674
                return err
×
675
        }
×
676

677
        return nil
1✔
678
}
679

680
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
681
        switch format {
1✔
682
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
683
                return nil
1✔
684
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
685
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
686
        default:
×
687
                return fmt.Errorf("unknown source format for snapshot")
×
688
        }
689
}
690

691
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
692
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
693
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
694
                return nil
1✔
695
        }
1✔
696
        storageProfile := &cdiv1.StorageProfile{}
1✔
697
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
698
                return err
×
699
        }
×
700
        className, err := cc.GetSnapshotClassForSmartClone(pvc, &desiredStorageClass.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
701
        if err != nil {
1✔
702
                return err
×
703
        }
×
704
        labels := map[string]string{
1✔
705
                common.CDILabelKey:       common.CDILabelValue,
1✔
706
                common.CDIComponentLabel: "",
1✔
707
        }
1✔
708
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
709
                ObjectMeta: metav1.ObjectMeta{
1✔
710
                        Name:      pvc.Name,
1✔
711
                        Namespace: dataImportCron.Namespace,
1✔
712
                        Labels:    labels,
1✔
713
                },
1✔
714
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
715
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
716
                                PersistentVolumeClaimName: &pvc.Name,
1✔
717
                        },
1✔
718
                        VolumeSnapshotClassName: &className,
1✔
719
                },
1✔
720
        }
1✔
721
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
722

1✔
723
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
724
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
725
                if !k8serrors.IsNotFound(err) {
1✔
726
                        return err
×
727
                }
×
728
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
729
                if pvc.Spec.VolumeMode != nil {
2✔
730
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
731
                }
1✔
732
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
733
                        return err
×
734
                }
×
735
        } else {
1✔
736
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
737
                        // Clean up DV/PVC as they are not needed anymore
1✔
738
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
739
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
740
                                return err
×
741
                        }
×
742
                }
743
        }
744

745
        return nil
1✔
746
}
747

748
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
749
        dataImportCron.Status.SourceFormat = &format
1✔
750

1✔
751
        switch format {
1✔
752
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
753
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
754
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
755
                if snapshot == nil {
2✔
756
                        // Snapshot create/update will trigger reconcile
1✔
757
                        return nil
1✔
758
                }
1✔
759
                if cc.IsSnapshotReady(snapshot) {
2✔
760
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
761
                } else {
2✔
762
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
763
                }
1✔
764
        default:
×
765
                return fmt.Errorf("unknown source format for snapshot")
×
766
        }
767

768
        return nil
1✔
769
}
770

771
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
772
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
773
        if desiredStorageClass == nil {
2✔
774
                return format, nil
1✔
775
        }
1✔
776

777
        storageProfile := &cdiv1.StorageProfile{}
1✔
778
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
779
                return format, err
×
780
        }
×
781
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
782
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
783
        }
1✔
784

785
        return format, nil
1✔
786
}
787

788
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
789
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
790
                return nil
×
791
        }
×
792
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
793
        if err != nil {
1✔
794
                return err
×
795
        }
×
796

797
        maxImports := defaultImportsToKeepPerCron
1✔
798

1✔
799
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
800
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
801
        }
1✔
802

803
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
804
                return err
×
805
        }
×
806
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
807
                return err
×
808
        }
×
809

810
        return nil
1✔
811
}
812

813
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
814
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
815

1✔
816
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
817
                return err
×
818
        }
×
819
        if len(pvcList.Items) > maxImports {
2✔
820
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
821
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
822
                })
1✔
823
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
824
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
825
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
826
                                return err
×
827
                        }
×
828
                }
829
        }
830

831
        dvList := &cdiv1.DataVolumeList{}
1✔
832
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
833
                return err
×
834
        }
×
835

836
        if len(dvList.Items) > maxImports {
2✔
837
                for _, dv := range dvList.Items {
2✔
838
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
839
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
840
                                return err
×
841
                        }
×
842

843
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
844
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
845
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
846
                                        return err
×
847
                                }
×
848
                        }
849
                }
850
        }
851

852
        return nil
1✔
853
}
854

855
// deleteDvPvc deletes DV or PVC if DV was GCed
856
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
857
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
858
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
859
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
860
                return err
1✔
861
        }
1✔
862
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
863
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
864
                return err
×
865
        }
×
866
        return nil
1✔
867
}
868

869
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
870
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
871

1✔
872
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
873
                if meta.IsNoMatchError(err) {
×
874
                        return nil
×
875
                }
×
876
                return err
×
877
        }
878
        if len(snapList.Items) > maxImports {
1✔
879
                sort.Slice(snapList.Items, func(i, j int) bool {
×
880
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
881
                })
×
882
                for _, snap := range snapList.Items[maxImports:] {
×
883
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
884
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
885
                                return err
×
886
                        }
×
887
                }
888
        }
889

890
        return nil
1✔
891
}
892

893
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
894
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
895
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
896

1✔
897
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
898
                return err
×
899
        }
×
900
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
901
        if err != nil {
1✔
902
                return err
×
903
        }
×
904
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
905
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
906
                return err
×
907
        }
×
908
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
909
                return err
×
910
        }
×
911
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
912
                return err
×
913
        }
×
914
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
915
                return err
×
916
        }
×
917
        return nil
1✔
918
}
919

920
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
921
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
922
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
923
        if err != nil {
1✔
924
                return err
×
925
        }
×
926
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
927
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
928
                return err
×
929
        }
×
930
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
931
                return err
×
932
        }
×
933

934
        return nil
1✔
935
}
936

937
// NewDataImportCronController creates a new instance of the DataImportCron controller
938
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
939
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
940
                Scheme: mgr.GetScheme(),
×
941
                Mapper: mgr.GetRESTMapper(),
×
942
        })
×
943
        if err != nil {
×
944
                return nil, err
×
945
        }
×
946
        reconciler := &DataImportCronReconciler{
×
947
                client:          mgr.GetClient(),
×
948
                uncachedClient:  uncachedClient,
×
949
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
950
                scheme:          mgr.GetScheme(),
×
951
                log:             log.WithName(dataImportControllerName),
×
952
                image:           importerImage,
×
953
                pullPolicy:      pullPolicy,
×
954
                cdiNamespace:    util.GetNamespace(),
×
955
                installerLabels: installerLabels,
×
956
        }
×
957
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
958
                MaxConcurrentReconciles: 3,
×
959
                Reconciler:              reconciler,
×
960
        })
×
961
        if err != nil {
×
962
                return nil, err
×
963
        }
×
964
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
965
                return nil, err
×
966
        }
×
967
        log.Info("Initialized DataImportCron controller")
×
968
        return dataImportCronController, nil
×
969
}
970

971
func getCronName(obj client.Object) string {
×
972
        return obj.GetLabels()[common.DataImportCronLabel]
×
973
}
×
974

975
func getCronNs(obj client.Object) string {
×
976
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
977
}
×
978

979
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
980
        if cronName := getCronName(obj); cronName != "" {
×
981
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
982
        }
×
983
        return nil
×
984
}
985

986
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
987
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
988
                return err
×
989
        }
×
990

991
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
992
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
993
                // Otherwise we risk losing the storage profile event
×
994
                var crons cdiv1.DataImportCronList
×
995
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
996
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
997
                        return nil
×
998
                }
×
999
                // Storage profiles are 1:1 to storage classes
1000
                scName := obj.GetName()
×
1001
                var reqs []reconcile.Request
×
1002
                for _, cron := range crons.Items {
×
1003
                        dataVolume := cron.Spec.Template
×
1004
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1005
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1006
                        if err != nil || templateSc == nil {
×
1007
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1008
                                return reqs
×
1009
                        }
×
1010
                        if templateSc.Name == scName {
×
1011
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1012
                        }
×
1013
                }
1014
                return reqs
×
1015
        }
1016

1017
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1018
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1019
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1020
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1021
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1022
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1023
                },
1024
        )); err != nil {
×
1025
                return err
×
1026
        }
×
1027

1028
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1029
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1030
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1031
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1032
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1033
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1034
                },
1035
        )); err != nil {
×
1036
                return err
×
1037
        }
×
1038

1039
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1040
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1041
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1042
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1043
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1044
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1045
                },
1046
        )); err != nil {
×
1047
                return err
×
1048
        }
×
1049

1050
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1051
                return err
×
1052
        }
×
1053

1054
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1055
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1056
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
1057
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1058
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1059
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1060
                                return e.ObjectOld.Status.DataImportCronSourceFormat != e.ObjectNew.Status.DataImportCronSourceFormat
×
1061
                        },
×
1062
                },
1063
        )); err != nil {
×
1064
                return err
×
1065
        }
×
1066

1067
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1068
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1069
        }
×
1070

1071
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1072
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1073
                predicate.TypedFuncs[*batchv1.CronJob]{
×
1074
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1075
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1076
                        },
×
1077
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
1078
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1079
                },
1080
        )); err != nil {
×
1081
                return err
×
1082
        }
×
1083

1084
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1085
                if meta.IsNoMatchError(err) {
×
1086
                        // Back out if there's no point to attempt watch
×
1087
                        return nil
×
1088
                }
×
1089
                if !cc.IsErrCacheNotStarted(err) {
×
1090
                        return err
×
1091
                }
×
1092
        }
1093
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1094
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
1095
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1096
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1097
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1098
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1099
                },
1100
        )); err != nil {
×
1101
                return err
×
1102
        }
×
1103

1104
        return nil
×
1105
}
1106

1107
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1108
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1109
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1110
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
1111
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1112
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1113
                                log.Info("Update", "sc", obj.GetName(),
×
1114
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1115
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1116
                                reqs, err := getReconcileRequestsForDicsWithPendingPvc(ctx, mgr.GetClient())
×
1117
                                if err != nil {
×
1118
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
1119
                                }
×
1120
                                return reqs
×
1121
                        },
1122
                ),
1123
                predicate.TypedFuncs[*storagev1.StorageClass]{
1124
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1125
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1126
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1127
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1128
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1129
                        },
×
1130
                },
1131
        )); err != nil {
×
1132
                return err
×
1133
        }
×
1134

1135
        return nil
×
1136
}
1137

1138
func getReconcileRequestsForDicsWithPendingPvc(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1139
        dicList := &cdiv1.DataImportCronList{}
×
1140
        if err := c.List(ctx, dicList); err != nil {
×
1141
                return nil, err
×
1142
        }
×
1143
        reqs := []reconcile.Request{}
×
1144
        for _, dic := range dicList.Items {
×
1145
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
1146
                        continue
×
1147
                }
1148

1149
                imports := dic.Status.CurrentImports
×
1150
                if len(imports) == 0 {
×
1151
                        continue
×
1152
                }
1153

1154
                pvcName := imports[0].DataVolumeName
×
1155
                pvc := &corev1.PersistentVolumeClaim{}
×
1156
                if err := c.Get(ctx, types.NamespacedName{Namespace: dic.Namespace, Name: pvcName}, pvc); err != nil {
×
1157
                        if k8serrors.IsNotFound(err) {
×
1158
                                continue
×
1159
                        }
1160
                        return nil, err
×
1161
                }
1162

1163
                if pvc.Status.Phase == corev1.ClaimPending {
×
1164
                        reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1165
                }
×
1166
        }
1167

1168
        return reqs, nil
×
1169
}
1170

1171
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
1✔
1172
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1173
                return false, nil
1✔
1174
        }
1✔
1175

1176
        sc := pvc.Spec.StorageClassName
1✔
1177
        if sc == nil || *sc == desiredStorageClass {
2✔
1178
                return false, nil
1✔
1179
        }
1✔
1180

1181
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1✔
1182
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
1183
                return false, err
×
1184
        }
×
1185

1186
        return true, nil
1✔
1187
}
1188

1189
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1190
        cronJob := &batchv1.CronJob{}
1✔
1191
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1192
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1193
                return false, cc.IgnoreNotFound(err)
1✔
1194
        }
1✔
1195

1196
        cronJobCopy := cronJob.DeepCopy()
1✔
1197
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1198
                return false, err
×
1199
        }
×
1200

1201
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1202
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1203
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
1204
                        return false, cc.IgnoreNotFound(err)
×
1205
                }
×
1206
        }
1207
        return true, nil
1✔
1208
}
1209

1210
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1211
        cronJob := &batchv1.CronJob{
1✔
1212
                ObjectMeta: metav1.ObjectMeta{
1✔
1213
                        Name:      GetCronJobName(cron),
1✔
1214
                        Namespace: r.cdiNamespace,
1✔
1215
                },
1✔
1216
        }
1✔
1217
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
1218
                return nil, err
×
1219
        }
×
1220
        return cronJob, nil
1✔
1221
}
1222

1223
// InitPollerPodSpec inits poller PodSpec
1224
func InitPollerPodSpec(c client.Client, cron *cdiv1.DataImportCron, podSpec *corev1.PodSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1225
        regSource, err := getCronRegistrySource(cron)
1✔
1226
        if err != nil {
1✔
1227
                return err
×
1228
        }
×
1229
        if regSource.URL == nil {
1✔
1230
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1231
        }
×
1232
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1233
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
1234
                return err
×
1235
        }
×
1236
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1237
        if err != nil {
1✔
1238
                return err
×
1239
        }
×
1240
        container := corev1.Container{
1✔
1241
                Name:  "cdi-source-update-poller",
1✔
1242
                Image: image,
1✔
1243
                Command: []string{
1✔
1244
                        "/usr/bin/cdi-source-update-poller",
1✔
1245
                        "-ns", cron.Namespace,
1✔
1246
                        "-cron", cron.Name,
1✔
1247
                        "-url", *regSource.URL,
1✔
1248
                },
1✔
1249
                ImagePullPolicy:          pullPolicy,
1✔
1250
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1251
                TerminationMessagePolicy: corev1.TerminationMessageReadFile,
1✔
1252
        }
1✔
1253

1✔
1254
        var volumes []corev1.Volume
1✔
1255
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1256
        if hasCertConfigMap {
1✔
1257
                vm := corev1.VolumeMount{
×
1258
                        Name:      CertVolName,
×
1259
                        MountPath: common.ImporterCertDir,
×
1260
                }
×
1261
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1262
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
1263
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
1264
        }
×
1265

1266
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1267
                vm := corev1.VolumeMount{
1✔
1268
                        Name:      ProxyCertVolName,
1✔
1269
                        MountPath: common.ImporterProxyCertDir,
1✔
1270
                }
1✔
1271
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1272
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1273
        }
1✔
1274

1275
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
1276
                container.Env = append(container.Env,
×
1277
                        corev1.EnvVar{
×
1278
                                Name: common.ImporterAccessKeyID,
×
1279
                                ValueFrom: &corev1.EnvVarSource{
×
1280
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1281
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1282
                                                        Name: *regSource.SecretRef,
×
1283
                                                },
×
1284
                                                Key: common.KeyAccess,
×
1285
                                        },
×
1286
                                },
×
1287
                        },
×
1288
                        corev1.EnvVar{
×
1289
                                Name: common.ImporterSecretKey,
×
1290
                                ValueFrom: &corev1.EnvVarSource{
×
1291
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1292
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1293
                                                        Name: *regSource.SecretRef,
×
1294
                                                },
×
1295
                                                Key: common.KeySecret,
×
1296
                                        },
×
1297
                                },
×
1298
                        },
×
1299
                )
×
1300
        }
×
1301

1302
        addEnvVar := func(varName, value string) {
2✔
1303
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1304
        }
1✔
1305

1306
        if insecureTLS {
1✔
1307
                addEnvVar(common.InsecureTLSVar, "true")
×
1308
        }
×
1309

1310
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1311
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1312
                        addEnvVar(varName, value)
1✔
1313
                }
1✔
1314
        }
1315

1316
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1317
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1318
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1319

1✔
1320
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1321
        if err != nil {
1✔
1322
                return err
×
1323
        }
×
1324
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1325
        if err != nil {
1✔
1326
                return err
×
1327
        }
×
1328

1329
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1330
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1331
        podSpec.Containers = []corev1.Container{container}
1✔
1332
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1333
        podSpec.Volumes = volumes
1✔
1334
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1335
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1336
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1337
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1338

1✔
1339
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1340

1✔
1341
        return nil
1✔
1342
}
1343

1344
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1345
        cronJobSpec := &cronJob.Spec
1✔
1346
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1347
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1348
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1349
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1350

1✔
1351
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1352
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1353
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1354

1✔
1355
        podSpec := &jobSpec.Template.Spec
1✔
1356
        if err := InitPollerPodSpec(r.client, cron, podSpec, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1357
                return err
×
1358
        }
×
1359
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1360
                return err
×
1361
        }
×
1362
        return nil
1✔
1363
}
1364

1365
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1366
        job := &batchv1.Job{
1✔
1367
                ObjectMeta: metav1.ObjectMeta{
1✔
1368
                        Name:      GetInitialJobName(cron),
1✔
1369
                        Namespace: cronJob.Namespace,
1✔
1370
                },
1✔
1371
                Spec: cronJob.Spec.JobTemplate.Spec,
1✔
1372
        }
1✔
1373
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1374
                return nil, err
×
1375
        }
×
1376
        return job, nil
1✔
1377
}
1378

1379
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1380
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1381
                return err
×
1382
        }
×
1383
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1384
        labels := obj.GetLabels()
1✔
1385
        labels[common.DataImportCronNsLabel] = cron.Namespace
1✔
1386
        labels[common.DataImportCronLabel] = cron.Name
1✔
1387
        obj.SetLabels(labels)
1✔
1388
        return nil
1✔
1389
}
1390

1391
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string) *cdiv1.DataVolume {
1✔
1392
        var digestedURL string
1✔
1393
        dv := cron.Spec.Template.DeepCopy()
1✔
1394
        if isURLSource(cron) {
2✔
1395
                digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1396
        } else if isImageStreamSource(cron) {
3✔
1397
                // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1398
                digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1399
                dv.Spec.Source.Registry.ImageStream = nil
1✔
1400
        }
1✔
1401
        dv.Spec.Source.Registry.URL = &digestedURL
1✔
1402
        dv.Name = dataVolumeName
1✔
1403
        dv.Namespace = cron.Namespace
1✔
1404
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1405
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1406
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1407
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1408

1✔
1409
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
1410
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1411
        }
1✔
1412

1413
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1414

1✔
1415
        return dv
1✔
1416
}
1417

1418
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1419
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1420
        labels := obj.GetLabels()
1✔
1421
        labels[common.DataImportCronLabel] = cron.Name
1✔
1422
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
2✔
1423
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1424
        }
1✔
1425
        obj.SetLabels(labels)
1✔
1426
}
1427

1428
func untagDigestedDockerURL(dockerURL string) string {
1✔
1429
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1430
                url := u.Host + u.Path
1✔
1431
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1432
                // Check for tag
1✔
1433
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1434
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
2✔
1435
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1436
                        }
1✔
1437
                }
1438
        }
1439
        return dockerURL
1✔
1440
}
1441

1442
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1443
        if val := cron.Labels[ann]; val != "" {
2✔
1444
                cc.AddLabel(dv, ann, val)
1✔
1445
        }
1✔
1446
}
1447

1448
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1449
        if val := cron.Annotations[ann]; val != "" {
1✔
1450
                cc.AddAnnotation(dv, ann, val)
×
1451
        }
×
1452
}
1453

1454
func passCronLabelToDataSource(cron *cdiv1.DataImportCron, ds *cdiv1.DataSource, ann string) {
1✔
1455
        if val := cron.Labels[ann]; val != "" {
2✔
1456
                cc.AddLabel(ds, ann, val)
1✔
1457
        }
1✔
1458
}
1459

1460
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1✔
1461
        dataSource := &cdiv1.DataSource{
1✔
1462
                ObjectMeta: metav1.ObjectMeta{
1✔
1463
                        Name:      cron.Spec.ManagedDataSource,
1✔
1464
                        Namespace: cron.Namespace,
1✔
1465
                },
1✔
1466
        }
1✔
1467
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1468
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1469
        return dataSource
1✔
1470
}
1✔
1471

1472
// Create DataVolume name based on the DataSource name + prefix of the digest sha256
1473
func createDvName(prefix, digest string) (string, error) {
1✔
1474
        fromIdx := len(digestPrefix)
1✔
1475
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1476
        if !strings.HasPrefix(digest, digestPrefix) {
2✔
1477
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1478
        }
1✔
1479
        if len(digest) < toIdx {
2✔
1480
                return "", errors.Errorf("Digest is too short")
1✔
1481
        }
1✔
1482
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1483
}
1484

1485
// GetCronJobName get CronJob name based on cron name and UID
1486
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1487
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1488
}
1✔
1489

1490
// GetInitialJobName get initial job name based on cron name and UID
1491
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1492
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1493
}
1✔
1494

1495
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1496
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1497
}
1✔
1498

1499
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1500
        dv := &cron.Spec.Template
1✔
1501

1✔
1502
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1503
                return explicitVolumeMode, nil
×
1504
        }
×
1505

1506
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1507
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1508
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1509
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1510
                        AccessModes:      accessModes,
1✔
1511
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1512
                        Resources: corev1.VolumeResourceRequirements{
1✔
1513
                                Requests: corev1.ResourceList{
1✔
1514
                                        // Doesn't matter
1✔
1515
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1516
                                },
1✔
1517
                        },
1✔
1518
                },
1✔
1519
        }
1✔
1520
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1521
                return nil, err
×
1522
        }
×
1523

1524
        return inferredPvc.Spec.VolumeMode, nil
1✔
1525
}
1526

1527
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1528
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1529
        if dv.Spec.PVC != nil {
1✔
1530
                return dv.Spec.PVC.VolumeMode
×
1531
        }
×
1532

1533
        if dv.Spec.Storage != nil {
2✔
1534
                return dv.Spec.Storage.VolumeMode
1✔
1535
        }
1✔
1536

1537
        return nil
×
1538
}
1539

1540
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1541
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1542
        if dv.Spec.PVC != nil {
1✔
1543
                return dv.Spec.PVC.AccessModes
×
1544
        }
×
1545

1546
        if dv.Spec.Storage != nil {
2✔
1547
                return dv.Spec.Storage.AccessModes
1✔
1548
        }
1✔
1549

1550
        return nil
×
1551
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc