• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #4794

14 Jul 2024 06:12PM UTC coverage: 58.983% (+0.01%) from 58.972%
#4794

push

travis-ci

web-flow
update to k8s 1.30 libs and controller-runtime 0.18.4 (#3336)

* make deps-update

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* ReourceRequirements -> VolumeResourceRequirements

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* fix calls to controller.Watch()

controller-runtime changed the API!

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* Fix errors with actual openshift/library-go lib

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* make all works now and everything compiles

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* fix "make update-codegen" because generate_groups.sh deprecated

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* run "make generate"

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

* fix transfer unittest because of change to controller-runtime

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

---------

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>

6 of 238 new or added lines in 24 files covered. (2.52%)

10 existing lines in 4 files now uncovered.

16454 of 27896 relevant lines covered (58.98%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.68
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        "github.com/pkg/errors"
33
        cronexpr "github.com/robfig/cron/v3"
34

35
        batchv1 "k8s.io/api/batch/v1"
36
        corev1 "k8s.io/api/core/v1"
37
        storagev1 "k8s.io/api/storage/v1"
38
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
39
        "k8s.io/apimachinery/pkg/api/meta"
40
        "k8s.io/apimachinery/pkg/api/resource"
41
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
42
        "k8s.io/apimachinery/pkg/labels"
43
        "k8s.io/apimachinery/pkg/runtime"
44
        "k8s.io/apimachinery/pkg/types"
45
        "k8s.io/client-go/tools/record"
46
        "k8s.io/utils/ptr"
47

48
        "sigs.k8s.io/controller-runtime/pkg/client"
49
        "sigs.k8s.io/controller-runtime/pkg/controller"
50
        "sigs.k8s.io/controller-runtime/pkg/event"
51
        "sigs.k8s.io/controller-runtime/pkg/handler"
52
        "sigs.k8s.io/controller-runtime/pkg/manager"
53
        "sigs.k8s.io/controller-runtime/pkg/predicate"
54
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
55
        "sigs.k8s.io/controller-runtime/pkg/source"
56

57
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
58
        "kubevirt.io/containerized-data-importer/pkg/common"
59
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
60
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
61
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
62
        "kubevirt.io/containerized-data-importer/pkg/operator"
63
        "kubevirt.io/containerized-data-importer/pkg/util"
64
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
65
)
66

67
const (
68
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
69
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
70
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
71
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
72
)
73

74
// DataImportCronReconciler members
75
type DataImportCronReconciler struct {
76
        client          client.Client
77
        uncachedClient  client.Client
78
        recorder        record.EventRecorder
79
        scheme          *runtime.Scheme
80
        log             logr.Logger
81
        image           string
82
        pullPolicy      string
83
        cdiNamespace    string
84
        installerLabels map[string]string
85
}
86

87
const (
88
        // AnnSourceDesiredDigest is the digest of the pending updated image
89
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
90
        // AnnImageStreamDockerRef is the ImageStream Docker reference
91
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
92
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
93
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
94
        // AnnLastCronTime is the cron last execution time stamp
95
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
96
        // AnnLastUseTime is the PVC last use time stamp
97
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
98
        // AnnStorageClass is the cron DV's storage class
99
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
100

101
        dataImportControllerName    = "dataimportcron-controller"
102
        digestPrefix                = "sha256:"
103
        digestDvNameSuffixLength    = 12
104
        cronJobUIDSuffixLength      = 8
105
        defaultImportsToKeepPerCron = 3
106
)
107

108
// Reconcile loop for DataImportCronReconciler
109
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
110
        dataImportCron := &cdiv1.DataImportCron{}
1✔
111
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
112
                return reconcile.Result{}, err
1✔
113
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
114
                err := r.cleanup(ctx, req.NamespacedName)
1✔
115
                return reconcile.Result{}, err
1✔
116
        }
1✔
117
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
118
        if !shouldReconcile || err != nil {
1✔
119
                return reconcile.Result{}, err
×
120
        }
×
121

122
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
123
                return reconcile.Result{}, err
×
124
        }
×
125

126
        return r.update(ctx, dataImportCron)
1✔
127
}
128

129
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
130
        dataSource := &cdiv1.DataSource{}
1✔
131
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
132
                if k8serrors.IsNotFound(err) {
2✔
133
                        return true, nil
1✔
134
                }
1✔
135
                return false, err
×
136
        }
137
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
138
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
139
                return true, nil
1✔
140
        }
1✔
141
        otherCron := &cdiv1.DataImportCron{}
1✔
142
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
143
                if k8serrors.IsNotFound(err) {
2✔
144
                        return true, nil
1✔
145
                }
1✔
146
                return false, err
×
147
        }
148
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
149
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
150
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
151
                r.log.V(3).Info(msg)
1✔
152
                return false, nil
1✔
153
        }
1✔
154
        return true, nil
×
155
}
156

157
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
158
        if dataImportCron.Spec.Schedule == "" {
2✔
159
                return nil
1✔
160
        }
1✔
161
        if isImageStreamSource(dataImportCron) {
2✔
162
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
163
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
164
                }
1✔
165
                return nil
1✔
166
        }
167
        if !isURLSource(dataImportCron) {
1✔
168
                return nil
×
169
        }
×
170
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
171
        if exists || err != nil {
2✔
172
                return err
1✔
173
        }
1✔
174
        cronJob, err := r.newCronJob(dataImportCron)
1✔
175
        if err != nil {
1✔
176
                return err
×
177
        }
×
178
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
179
                return err
×
180
        }
×
181
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
182
        if err != nil {
1✔
183
                return err
×
184
        }
×
185
        if err := r.client.Create(ctx, job); err != nil {
1✔
186
                return err
×
187
        }
×
188
        return nil
1✔
189
}
190

191
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
192
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
193
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
194
        }
×
195
        imageStream := &imagev1.ImageStream{}
1✔
196
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
197
        if err != nil {
2✔
198
                return nil, "", err
1✔
199
        }
1✔
200
        imageStreamNamespacedName := types.NamespacedName{
1✔
201
                Namespace: imageStreamNamespace,
1✔
202
                Name:      name,
1✔
203
        }
1✔
204
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
205
                return nil, "", err
1✔
206
        }
1✔
207
        return imageStream, tag, nil
1✔
208
}
209

210
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
211
        if imageStream == nil {
1✔
212
                return "", "", errors.Errorf("No ImageStream")
×
213
        }
×
214
        tags := imageStream.Status.Tags
1✔
215
        if len(tags) == 0 {
1✔
216
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
217
        }
×
218

219
        tagIdx := 0
1✔
220
        if len(imageStreamTag) > 0 {
2✔
221
                tagIdx = -1
1✔
222
                for i, tag := range tags {
2✔
223
                        if tag.Tag == imageStreamTag {
2✔
224
                                tagIdx = i
1✔
225
                                break
1✔
226
                        }
227
                }
228
        }
229
        if tagIdx == -1 {
2✔
230
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
231
        }
1✔
232

233
        if len(tags[tagIdx].Items) == 0 {
2✔
234
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
235
        }
1✔
236
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
237
}
238

239
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
240
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
241
                return imageStreamName, "", nil
1✔
242
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
243
                return subs[0], subs[1], nil
1✔
244
        }
1✔
245
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
246
}
247

248
func (r *DataImportCronReconciler) pollImageStreamDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
249
        if nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]; nextTimeStr != "" {
2✔
250
                nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
251
                if err != nil {
1✔
252
                        return reconcile.Result{}, err
×
253
                }
×
254
                if nextTime.Before(time.Now()) {
2✔
255
                        if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
256
                                return reconcile.Result{}, err
1✔
257
                        }
1✔
258
                }
259
        }
260
        return r.setNextCronTime(dataImportCron)
1✔
261
}
262

263
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
264
        now := time.Now()
1✔
265
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
266
        if err != nil {
1✔
267
                return reconcile.Result{}, err
×
268
        }
×
269
        nextTime := expr.Next(now)
1✔
270
        requeueAfter := nextTime.Sub(now)
1✔
271
        res := reconcile.Result{Requeue: true, RequeueAfter: requeueAfter}
1✔
272
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
273
        return res, err
1✔
274
}
275

276
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
277
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
278
        return err == nil && regSource.ImageStream != nil
1✔
279
}
1✔
280

281
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
282
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
283
        return err == nil && regSource.URL != nil
1✔
284
}
1✔
285

286
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
287
        source := cron.Spec.Template.Spec.Source
1✔
288
        if source == nil || source.Registry == nil {
1✔
289
                return nil, errors.Errorf("Cron with no registry source %s", cron.Name)
×
290
        }
×
291
        return source.Registry, nil
1✔
292
}
293

294
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
295
        res := reconcile.Result{}
1✔
296

1✔
297
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
298
        if err != nil {
1✔
299
                return res, err
×
300
        }
×
301

302
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
303
        imports := dataImportCron.Status.CurrentImports
1✔
304
        importSucceeded := false
1✔
305

1✔
306
        dataVolume := dataImportCron.Spec.Template
1✔
307
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
308
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
309
        if err != nil {
1✔
310
                return res, err
×
311
        }
×
312
        if desiredStorageClass != nil {
2✔
313
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
314
                        return res, err
1✔
315
                }
1✔
316
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
317
        }
318
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
319
        if err != nil {
1✔
320
                return res, err
×
321
        }
×
322
        snapshot, err := r.getSnapshot(ctx, dataImportCron, format)
1✔
323
        if err != nil {
1✔
324
                return res, err
×
325
        }
×
326

327
        handlePopulatedPvc := func() error {
2✔
328
                if pvc != nil {
2✔
329
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
330
                                return err
×
331
                        }
×
332
                }
333
                importSucceeded = true
1✔
334
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
335
                        return err
×
336
                }
×
337

338
                return nil
1✔
339
        }
340

341
        if dv != nil {
2✔
342
                switch dv.Status.Phase {
1✔
343
                case cdiv1.Succeeded:
1✔
344
                        if err := handlePopulatedPvc(); err != nil {
1✔
345
                                return res, err
×
346
                        }
×
347
                case cdiv1.ImportScheduled:
1✔
348
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
349
                case cdiv1.ImportInProgress:
1✔
350
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
351
                default:
1✔
352
                        dvPhase := string(dv.Status.Phase)
1✔
353
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
354
                }
355
        } else if pvc != nil {
2✔
356
                // TODO: with plain populator PVCs (no DataVolumes) we may need to wait for corev1.Bound
1✔
357
                if err := handlePopulatedPvc(); err != nil {
1✔
358
                        return res, err
×
359
                }
×
360
        } else if snapshot != nil {
2✔
361
                // Below k8s 1.29 there's no way to know the source volume mode
1✔
362
                // Let's at least expose this info on our own snapshots
1✔
363
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
364
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
365
                        if err != nil {
1✔
366
                                return res, err
×
367
                        }
×
368
                        if volMode != nil {
2✔
369
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
370
                        }
1✔
371
                }
372
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
373
                        return res, err
×
374
                }
×
375
                importSucceeded = true
1✔
376
        } else {
1✔
377
                if len(imports) > 0 {
2✔
378
                        imports = imports[1:]
1✔
379
                        dataImportCron.Status.CurrentImports = imports
1✔
380
                }
1✔
381
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
382
        }
383

384
        if importSucceeded {
2✔
385
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
386
                        return res, err
×
387
                }
×
388
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
389
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
390
                        return res, err
×
391
                }
×
392
        }
393

394
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
395
                return res, err
×
396
        }
×
397

398
        // Skip if schedule is disabled
399
        if isImageStreamSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
400
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
401
                pollRes, err := r.pollImageStreamDigest(ctx, dataImportCron)
1✔
402
                if err != nil {
2✔
403
                        return pollRes, err
1✔
404
                }
1✔
405
                res = pollRes
1✔
406
        }
407

408
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
409
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
410
        if digestUpdated {
2✔
411
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
412
                if dv != nil {
1✔
413
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
414
                                return res, err
×
415
                        }
×
416
                }
417
                if importSucceeded || len(imports) == 0 {
2✔
418
                        if err := r.createImportDataVolume(ctx, dataImportCron); err != nil {
2✔
419
                                return res, err
1✔
420
                        }
1✔
421
                }
422
        } else if importSucceeded {
2✔
423
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
424
                        return res, err
×
425
                }
×
426
        } else if len(imports) > 0 {
2✔
427
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
428
        } else {
2✔
429
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
430
        }
1✔
431

432
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
433
                return res, err
×
434
        }
×
435

436
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
437
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
438
                        return res, err
×
439
                }
×
440
        }
441
        return res, nil
1✔
442
}
443

444
// Returns the current import DV if exists, and the last imported PVC
445
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
446
        imports := cron.Status.CurrentImports
1✔
447
        if len(imports) == 0 {
2✔
448
                return nil, nil, nil
1✔
449
        }
1✔
450

451
        dvName := imports[0].DataVolumeName
1✔
452
        dv := &cdiv1.DataVolume{}
1✔
453
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
454
                if !k8serrors.IsNotFound(err) {
1✔
455
                        return nil, nil, err
×
456
                }
×
457
                dv = nil
1✔
458
        }
459

460
        pvc := &corev1.PersistentVolumeClaim{}
1✔
461
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
462
                if !k8serrors.IsNotFound(err) {
1✔
463
                        return nil, nil, err
×
464
                }
×
465
                pvc = nil
1✔
466
        }
467
        return dv, pvc, nil
1✔
468
}
469

470
// Returns the current import DV if exists, and the last imported PVC
471
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) (*snapshotv1.VolumeSnapshot, error) {
1✔
472
        if format != cdiv1.DataImportCronSourceFormatSnapshot {
2✔
473
                return nil, nil
1✔
474
        }
1✔
475

476
        imports := cron.Status.CurrentImports
1✔
477
        if len(imports) == 0 {
2✔
478
                return nil, nil
1✔
479
        }
1✔
480

481
        snapName := imports[0].DataVolumeName
1✔
482
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
483
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
484
                if !k8serrors.IsNotFound(err) {
1✔
485
                        return nil, err
×
486
                }
×
487
                return nil, nil
1✔
488
        }
489

490
        return snapshot, nil
1✔
491
}
492

493
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
494
        objCopy := obj.DeepCopyObject()
1✔
495
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
496
        r.setDataImportCronResourceLabels(cron, obj)
1✔
497
        if !reflect.DeepEqual(obj, objCopy) {
2✔
498
                if err := r.client.Update(ctx, obj); err != nil {
1✔
499
                        return err
×
500
                }
×
501
        }
502
        return nil
1✔
503
}
504

505
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
506
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
507
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
508
                if cond.Status == corev1.ConditionFalse && cond.Reason == common.GenericError {
×
509
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
510
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
511
                        dv.Labels[common.DataImportCronLabel] = ""
×
512
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
513
                                return err
×
514
                        }
×
515
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
516
                                return err
×
517
                        }
×
518
                        cron.Status.CurrentImports = nil
×
519
                }
520
        }
521
        return nil
×
522
}
523

524
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
525
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
526
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
527
        if err != nil {
1✔
528
                return err
×
529
        }
×
530
        if regSource.ImageStream == nil {
1✔
531
                return nil
×
532
        }
×
533
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
534
        if err != nil {
2✔
535
                return err
1✔
536
        }
1✔
537
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
538
        if err != nil {
2✔
539
                return err
1✔
540
        }
1✔
541
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
542
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
543
                log.Info("Updating DataImportCron", "digest", digest)
1✔
544
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
545
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
546
        }
1✔
547
        return nil
1✔
548
}
549

550
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
551
        log := r.log.WithName("updateDataSource")
1✔
552
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
553
        dataSource := &cdiv1.DataSource{}
1✔
554
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
555
                if k8serrors.IsNotFound(err) {
2✔
556
                        dataSource = r.newDataSource(dataImportCron)
1✔
557
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
558
                                return err
×
559
                        }
×
560
                        log.Info("DataSource created", "name", dataSourceName, "uid", dataSource.UID)
1✔
561
                } else {
×
562
                        return err
×
563
                }
×
564
        }
565
        if dataSource.Labels[common.DataImportCronLabel] == "" {
1✔
566
                log.Info("DataSource has no DataImportCron label, so it is not updated", "name", dataSourceName, "uid", dataSource.UID)
×
567
                return nil
×
568
        }
×
569
        dataSourceCopy := dataSource.DeepCopy()
1✔
570
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
571

1✔
572
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
573
                passCronLabelToDataSource(dataImportCron, dataSource, defaultInstanceTypeLabel)
1✔
574
        }
1✔
575

576
        passCronLabelToDataSource(dataImportCron, dataSource, cc.LabelDynamicCredentialSupport)
1✔
577

1✔
578
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
579
        populateDataSource(format, dataSource, sourcePVC)
1✔
580

1✔
581
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
582
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
583
                        return err
×
584
                }
×
585
        }
586

587
        return nil
1✔
588
}
589

590
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
591
        if sourcePVC == nil {
2✔
592
                return
1✔
593
        }
1✔
594

595
        switch format {
1✔
596
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
597
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
598
                        PVC: sourcePVC,
1✔
599
                }
1✔
600
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
601
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
602
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
603
                                Namespace: sourcePVC.Namespace,
1✔
604
                                Name:      sourcePVC.Name,
1✔
605
                        },
1✔
606
                }
1✔
607
        }
608
}
609

610
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
611
        if dataImportCron.Status.CurrentImports == nil {
1✔
612
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
613
        }
×
614
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
615
                Namespace: dataImportCron.Namespace,
1✔
616
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
617
        }
1✔
618
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
619
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
620
                now := metav1.Now()
1✔
621
                dataImportCron.Status.LastImportTimestamp = &now
1✔
622
        }
1✔
623
        return nil
1✔
624
}
625

626
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
627
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
628
        if lastTimeStr == "" {
2✔
629
                return nil
1✔
630
        }
1✔
631
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
632
        if err != nil {
1✔
633
                return err
×
634
        }
×
635
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
636
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
637
        }
1✔
638
        return nil
1✔
639
}
640

641
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
642
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
643
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
644
        if digest == "" {
1✔
645
                return nil
×
646
        }
×
647
        dvName, err := createDvName(dataSourceName, digest)
1✔
648
        if err != nil {
2✔
649
                return err
1✔
650
        }
1✔
651
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
652

1✔
653
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
654
        for _, src := range sources {
2✔
655
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
656
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
657
                                return err
×
658
                        }
×
659
                } else {
1✔
660
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
661
                                return err
×
662
                        }
×
663
                        // If source exists don't create DV
664
                        return nil
1✔
665
                }
666
        }
667

668
        dv := r.newSourceDataVolume(dataImportCron, dvName)
1✔
669
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
670
                return err
×
671
        }
×
672

673
        return nil
1✔
674
}
675

676
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
677
        switch format {
1✔
678
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
679
                return nil
1✔
680
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
681
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
682
        default:
×
683
                return fmt.Errorf("unknown source format for snapshot")
×
684
        }
685
}
686

687
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
688
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
689
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
690
                return nil
1✔
691
        }
1✔
692
        storageProfile := &cdiv1.StorageProfile{}
1✔
693
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
694
                return err
×
695
        }
×
696
        className, err := cc.GetSnapshotClassForSmartClone(pvc, &desiredStorageClass.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
697
        if err != nil {
1✔
698
                return err
×
699
        }
×
700
        labels := map[string]string{
1✔
701
                common.CDILabelKey:       common.CDILabelValue,
1✔
702
                common.CDIComponentLabel: "",
1✔
703
        }
1✔
704
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
705
                ObjectMeta: metav1.ObjectMeta{
1✔
706
                        Name:      pvc.Name,
1✔
707
                        Namespace: dataImportCron.Namespace,
1✔
708
                        Labels:    labels,
1✔
709
                },
1✔
710
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
711
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
712
                                PersistentVolumeClaimName: &pvc.Name,
1✔
713
                        },
1✔
714
                        VolumeSnapshotClassName: &className,
1✔
715
                },
1✔
716
        }
1✔
717
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
718

1✔
719
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
720
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
721
                if !k8serrors.IsNotFound(err) {
1✔
722
                        return err
×
723
                }
×
724
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
725
                if pvc.Spec.VolumeMode != nil {
2✔
726
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
727
                }
1✔
728
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
729
                        return err
×
730
                }
×
731
        } else {
1✔
732
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
733
                        // Clean up DV/PVC as they are not needed anymore
1✔
734
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
735
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
736
                                return err
×
737
                        }
×
738
                }
739
        }
740

741
        return nil
1✔
742
}
743

744
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
745
        dataImportCron.Status.SourceFormat = &format
1✔
746

1✔
747
        switch format {
1✔
748
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
749
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
750
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
751
                if snapshot == nil {
2✔
752
                        // Snapshot create/update will trigger reconcile
1✔
753
                        return nil
1✔
754
                }
1✔
755
                if cc.IsSnapshotReady(snapshot) {
2✔
756
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
757
                } else {
2✔
758
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
759
                }
1✔
760
        default:
×
761
                return fmt.Errorf("unknown source format for snapshot")
×
762
        }
763

764
        return nil
1✔
765
}
766

767
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
768
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
769
        if desiredStorageClass == nil {
2✔
770
                return format, nil
1✔
771
        }
1✔
772

773
        storageProfile := &cdiv1.StorageProfile{}
1✔
774
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
775
                return format, err
×
776
        }
×
777
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
778
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
779
        }
1✔
780

781
        return format, nil
1✔
782
}
783

784
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
785
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
786
                return nil
×
787
        }
×
788
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
789
        if err != nil {
1✔
790
                return err
×
791
        }
×
792

793
        maxImports := defaultImportsToKeepPerCron
1✔
794

1✔
795
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
796
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
797
        }
1✔
798

799
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
800
                return err
×
801
        }
×
802
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
803
                return err
×
804
        }
×
805

806
        return nil
1✔
807
}
808

809
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
810
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
811

1✔
812
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
813
                return err
×
814
        }
×
815
        if len(pvcList.Items) > maxImports {
2✔
816
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
817
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
818
                })
1✔
819
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
820
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
821
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
822
                                return err
×
823
                        }
×
824
                }
825
        }
826

827
        dvList := &cdiv1.DataVolumeList{}
1✔
828
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
829
                return err
×
830
        }
×
831

832
        if len(dvList.Items) > maxImports {
2✔
833
                for _, dv := range dvList.Items {
2✔
834
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
835
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
836
                                return err
×
837
                        }
×
838

839
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
840
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
841
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
842
                                        return err
×
843
                                }
×
844
                        }
845
                }
846
        }
847

848
        return nil
1✔
849
}
850

851
// deleteDvPvc deletes DV or PVC if DV was GCed
852
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
853
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
854
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
855
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
856
                return err
1✔
857
        }
1✔
858
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
859
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
860
                return err
×
861
        }
×
862
        return nil
1✔
863
}
864

865
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
866
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
867

1✔
868
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
869
                if meta.IsNoMatchError(err) {
×
870
                        return nil
×
871
                }
×
872
                return err
×
873
        }
874
        if len(snapList.Items) > maxImports {
1✔
875
                sort.Slice(snapList.Items, func(i, j int) bool {
×
876
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
877
                })
×
878
                for _, snap := range snapList.Items[maxImports:] {
×
879
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
880
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
881
                                return err
×
882
                        }
×
883
                }
884
        }
885

886
        return nil
1✔
887
}
888

889
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
890
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
891
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
892

1✔
893
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
894
                return err
×
895
        }
×
896
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
897
        if err != nil {
1✔
898
                return err
×
899
        }
×
900
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
901
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
902
                return err
×
903
        }
×
904
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
905
                return err
×
906
        }
×
907
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
908
                return err
×
909
        }
×
910
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
911
                return err
×
912
        }
×
913
        return nil
1✔
914
}
915

916
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
917
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
918
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
919
        if err != nil {
1✔
920
                return err
×
921
        }
×
922
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
923
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
924
                return err
×
925
        }
×
926
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
927
                return err
×
928
        }
×
929

930
        return nil
1✔
931
}
932

933
// NewDataImportCronController creates a new instance of the DataImportCron controller
934
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
935
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
936
                Scheme: mgr.GetScheme(),
×
937
                Mapper: mgr.GetRESTMapper(),
×
938
        })
×
939
        if err != nil {
×
940
                return nil, err
×
941
        }
×
942
        reconciler := &DataImportCronReconciler{
×
943
                client:          mgr.GetClient(),
×
944
                uncachedClient:  uncachedClient,
×
945
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
946
                scheme:          mgr.GetScheme(),
×
947
                log:             log.WithName(dataImportControllerName),
×
948
                image:           importerImage,
×
949
                pullPolicy:      pullPolicy,
×
950
                cdiNamespace:    util.GetNamespace(),
×
951
                installerLabels: installerLabels,
×
952
        }
×
953
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
954
                MaxConcurrentReconciles: 3,
×
955
                Reconciler:              reconciler,
×
956
        })
×
957
        if err != nil {
×
958
                return nil, err
×
959
        }
×
960
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
961
                return nil, err
×
962
        }
×
963
        log.Info("Initialized DataImportCron controller")
×
964
        return dataImportCronController, nil
×
965
}
966

NEW
967
func getCronName(obj client.Object) string {
×
NEW
968
        return obj.GetLabels()[common.DataImportCronLabel]
×
NEW
969
}
×
970

NEW
971
func getCronNs(obj client.Object) string {
×
NEW
972
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
NEW
973
}
×
974

NEW
975
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
NEW
976
        if cronName := getCronName(obj); cronName != "" {
×
NEW
977
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
978
        }
×
NEW
979
        return nil
×
980
}
981

NEW
982
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
NEW
983
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
NEW
984
                return err
×
UNCOV
985
        }
×
986

NEW
987
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
988
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
989
                // Otherwise we risk losing the storage profile event
×
990
                var crons cdiv1.DataImportCronList
×
991
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
992
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
993
                        return nil
×
994
                }
×
995
                // Storage profiles are 1:1 to storage classes
996
                scName := obj.GetName()
×
997
                var reqs []reconcile.Request
×
998
                for _, cron := range crons.Items {
×
999
                        dataVolume := cron.Spec.Template
×
1000
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1001
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1002
                        if err != nil || templateSc == nil {
×
1003
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1004
                                return reqs
×
1005
                        }
×
1006
                        if templateSc.Name == scName {
×
1007
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1008
                        }
×
1009
                }
1010
                return reqs
×
1011
        }
1012

NEW
1013
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
NEW
1014
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
NEW
1015
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
NEW
1016
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
NEW
1017
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
NEW
1018
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1019
                },
NEW
1020
        )); err != nil {
×
1021
                return err
×
1022
        }
×
1023

NEW
1024
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
NEW
1025
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
NEW
1026
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
NEW
1027
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
NEW
1028
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
NEW
1029
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1030
                },
NEW
1031
        )); err != nil {
×
1032
                return err
×
1033
        }
×
1034

NEW
1035
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
NEW
1036
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
NEW
1037
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
NEW
1038
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
NEW
1039
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
NEW
1040
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1041
                },
NEW
1042
        )); err != nil {
×
1043
                return err
×
1044
        }
×
1045

1046
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1047
                return err
×
1048
        }
×
1049

NEW
1050
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
NEW
1051
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
NEW
1052
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
NEW
1053
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
NEW
1054
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
NEW
1055
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
NEW
1056
                                return e.ObjectOld.Status.DataImportCronSourceFormat != e.ObjectNew.Status.DataImportCronSourceFormat
×
UNCOV
1057
                        },
×
1058
                },
NEW
1059
        )); err != nil {
×
1060
                return err
×
1061
        }
×
1062

NEW
1063
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1064
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1065
        }
×
1066

NEW
1067
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
NEW
1068
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
NEW
1069
                predicate.TypedFuncs[*batchv1.CronJob]{
×
NEW
1070
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
NEW
1071
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
NEW
1072
                        },
×
NEW
1073
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
NEW
1074
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1075
                },
NEW
1076
        )); err != nil {
×
1077
                return err
×
1078
        }
×
1079

1080
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1081
                if meta.IsNoMatchError(err) {
×
1082
                        // Back out if there's no point to attempt watch
×
1083
                        return nil
×
1084
                }
×
1085
                if !cc.IsErrCacheNotStarted(err) {
×
1086
                        return err
×
1087
                }
×
1088
        }
NEW
1089
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
NEW
1090
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
NEW
1091
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
NEW
1092
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
NEW
1093
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
NEW
1094
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1095
                },
NEW
1096
        )); err != nil {
×
1097
                return err
×
1098
        }
×
1099

1100
        return nil
×
1101
}
1102

1103
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1104
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
NEW
1105
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
NEW
1106
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
NEW
1107
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1108
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1109
                                log.Info("Update", "sc", obj.GetName(),
×
1110
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1111
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1112
                                reqs, err := getReconcileRequestsForDicsWithPendingPvc(ctx, mgr.GetClient())
×
1113
                                if err != nil {
×
1114
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
1115
                                }
×
1116
                                return reqs
×
1117
                        },
1118
                ),
1119
                predicate.TypedFuncs[*storagev1.StorageClass]{
NEW
1120
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
NEW
1121
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
NEW
1122
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
NEW
1123
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
NEW
1124
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
UNCOV
1125
                        },
×
1126
                },
NEW
1127
        )); err != nil {
×
1128
                return err
×
1129
        }
×
1130

1131
        return nil
×
1132
}
1133

1134
func getReconcileRequestsForDicsWithPendingPvc(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1135
        dicList := &cdiv1.DataImportCronList{}
×
1136
        if err := c.List(ctx, dicList); err != nil {
×
1137
                return nil, err
×
1138
        }
×
1139
        reqs := []reconcile.Request{}
×
1140
        for _, dic := range dicList.Items {
×
1141
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
1142
                        continue
×
1143
                }
1144

1145
                imports := dic.Status.CurrentImports
×
1146
                if len(imports) == 0 {
×
1147
                        continue
×
1148
                }
1149

1150
                pvcName := imports[0].DataVolumeName
×
1151
                pvc := &corev1.PersistentVolumeClaim{}
×
1152
                if err := c.Get(ctx, types.NamespacedName{Namespace: dic.Namespace, Name: pvcName}, pvc); err != nil {
×
1153
                        if k8serrors.IsNotFound(err) {
×
1154
                                continue
×
1155
                        }
1156
                        return nil, err
×
1157
                }
1158

1159
                if pvc.Status.Phase == corev1.ClaimPending {
×
1160
                        reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1161
                }
×
1162
        }
1163

1164
        return reqs, nil
×
1165
}
1166

1167
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
1✔
1168
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1169
                return false, nil
1✔
1170
        }
1✔
1171

1172
        sc := pvc.Spec.StorageClassName
1✔
1173
        if sc == nil || *sc == desiredStorageClass {
2✔
1174
                return false, nil
1✔
1175
        }
1✔
1176

1177
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1✔
1178
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
1179
                return false, err
×
1180
        }
×
1181

1182
        return true, nil
1✔
1183
}
1184

1185
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1186
        cronJob := &batchv1.CronJob{}
1✔
1187
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1188
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1189
                return false, cc.IgnoreNotFound(err)
1✔
1190
        }
1✔
1191

1192
        cronJobCopy := cronJob.DeepCopy()
1✔
1193
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1194
                return false, err
×
1195
        }
×
1196

1197
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1198
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1199
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
1200
                        return false, cc.IgnoreNotFound(err)
×
1201
                }
×
1202
        }
1203
        return true, nil
1✔
1204
}
1205

1206
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1207
        cronJob := &batchv1.CronJob{
1✔
1208
                ObjectMeta: metav1.ObjectMeta{
1✔
1209
                        Name:      GetCronJobName(cron),
1✔
1210
                        Namespace: r.cdiNamespace,
1✔
1211
                },
1✔
1212
        }
1✔
1213
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
1214
                return nil, err
×
1215
        }
×
1216
        return cronJob, nil
1✔
1217
}
1218

1219
// InitPollerPodSpec inits poller PodSpec
1220
func InitPollerPodSpec(c client.Client, cron *cdiv1.DataImportCron, podSpec *corev1.PodSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1221
        regSource, err := getCronRegistrySource(cron)
1✔
1222
        if err != nil {
1✔
1223
                return err
×
1224
        }
×
1225
        if regSource.URL == nil {
1✔
1226
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1227
        }
×
1228
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1229
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
1230
                return err
×
1231
        }
×
1232
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1233
        if err != nil {
1✔
1234
                return err
×
1235
        }
×
1236
        container := corev1.Container{
1✔
1237
                Name:  "cdi-source-update-poller",
1✔
1238
                Image: image,
1✔
1239
                Command: []string{
1✔
1240
                        "/usr/bin/cdi-source-update-poller",
1✔
1241
                        "-ns", cron.Namespace,
1✔
1242
                        "-cron", cron.Name,
1✔
1243
                        "-url", *regSource.URL,
1✔
1244
                },
1✔
1245
                ImagePullPolicy:          pullPolicy,
1✔
1246
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1247
                TerminationMessagePolicy: corev1.TerminationMessageReadFile,
1✔
1248
        }
1✔
1249

1✔
1250
        var volumes []corev1.Volume
1✔
1251
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1252
        if hasCertConfigMap {
1✔
1253
                vm := corev1.VolumeMount{
×
1254
                        Name:      CertVolName,
×
1255
                        MountPath: common.ImporterCertDir,
×
1256
                }
×
1257
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1258
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
1259
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
1260
        }
×
1261

1262
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1263
                vm := corev1.VolumeMount{
1✔
1264
                        Name:      ProxyCertVolName,
1✔
1265
                        MountPath: common.ImporterProxyCertDir,
1✔
1266
                }
1✔
1267
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1268
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1269
        }
1✔
1270

1271
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
1272
                container.Env = append(container.Env,
×
1273
                        corev1.EnvVar{
×
1274
                                Name: common.ImporterAccessKeyID,
×
1275
                                ValueFrom: &corev1.EnvVarSource{
×
1276
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1277
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1278
                                                        Name: *regSource.SecretRef,
×
1279
                                                },
×
1280
                                                Key: common.KeyAccess,
×
1281
                                        },
×
1282
                                },
×
1283
                        },
×
1284
                        corev1.EnvVar{
×
1285
                                Name: common.ImporterSecretKey,
×
1286
                                ValueFrom: &corev1.EnvVarSource{
×
1287
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1288
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1289
                                                        Name: *regSource.SecretRef,
×
1290
                                                },
×
1291
                                                Key: common.KeySecret,
×
1292
                                        },
×
1293
                                },
×
1294
                        },
×
1295
                )
×
1296
        }
×
1297

1298
        addEnvVar := func(varName, value string) {
2✔
1299
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1300
        }
1✔
1301

1302
        if insecureTLS {
1✔
1303
                addEnvVar(common.InsecureTLSVar, "true")
×
1304
        }
×
1305

1306
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1307
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1308
                        addEnvVar(varName, value)
1✔
1309
                }
1✔
1310
        }
1311

1312
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1313
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1314
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1315

1✔
1316
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1317
        if err != nil {
1✔
1318
                return err
×
1319
        }
×
1320
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1321
        if err != nil {
1✔
1322
                return err
×
1323
        }
×
1324

1325
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1326
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1327
        podSpec.Containers = []corev1.Container{container}
1✔
1328
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1329
        podSpec.Volumes = volumes
1✔
1330
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1331
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1332
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1333
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1334

1✔
1335
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1336

1✔
1337
        return nil
1✔
1338
}
1339

1340
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1341
        cronJobSpec := &cronJob.Spec
1✔
1342
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1343
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1344
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1345
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1346

1✔
1347
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1348
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1349
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1350

1✔
1351
        podSpec := &jobSpec.Template.Spec
1✔
1352
        if err := InitPollerPodSpec(r.client, cron, podSpec, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1353
                return err
×
1354
        }
×
1355
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1356
                return err
×
1357
        }
×
1358
        return nil
1✔
1359
}
1360

1361
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1362
        job := &batchv1.Job{
1✔
1363
                ObjectMeta: metav1.ObjectMeta{
1✔
1364
                        Name:      GetInitialJobName(cron),
1✔
1365
                        Namespace: cronJob.Namespace,
1✔
1366
                },
1✔
1367
                Spec: cronJob.Spec.JobTemplate.Spec,
1✔
1368
        }
1✔
1369
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1370
                return nil, err
×
1371
        }
×
1372
        return job, nil
1✔
1373
}
1374

1375
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1376
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1377
                return err
×
1378
        }
×
1379
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1380
        labels := obj.GetLabels()
1✔
1381
        labels[common.DataImportCronNsLabel] = cron.Namespace
1✔
1382
        labels[common.DataImportCronLabel] = cron.Name
1✔
1383
        obj.SetLabels(labels)
1✔
1384
        return nil
1✔
1385
}
1386

1387
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string) *cdiv1.DataVolume {
1✔
1388
        var digestedURL string
1✔
1389
        dv := cron.Spec.Template.DeepCopy()
1✔
1390
        if isURLSource(cron) {
2✔
1391
                digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1392
        } else if isImageStreamSource(cron) {
3✔
1393
                // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1394
                digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1395
                dv.Spec.Source.Registry.ImageStream = nil
1✔
1396
        }
1✔
1397
        dv.Spec.Source.Registry.URL = &digestedURL
1✔
1398
        dv.Name = dataVolumeName
1✔
1399
        dv.Namespace = cron.Namespace
1✔
1400
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1401
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1402
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1403
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1404

1✔
1405
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
1406
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1407
        }
1✔
1408

1409
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1410

1✔
1411
        return dv
1✔
1412
}
1413

1414
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1415
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1416
        labels := obj.GetLabels()
1✔
1417
        labels[common.DataImportCronLabel] = cron.Name
1✔
1418
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
2✔
1419
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1420
        }
1✔
1421
        obj.SetLabels(labels)
1✔
1422
}
1423

1424
func untagDigestedDockerURL(dockerURL string) string {
1✔
1425
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1426
                url := u.Host + u.Path
1✔
1427
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1428
                // Check for tag
1✔
1429
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1430
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
2✔
1431
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1432
                        }
1✔
1433
                }
1434
        }
1435
        return dockerURL
1✔
1436
}
1437

1438
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1439
        if val := cron.Labels[ann]; val != "" {
2✔
1440
                cc.AddLabel(dv, ann, val)
1✔
1441
        }
1✔
1442
}
1443

1444
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1445
        if val := cron.Annotations[ann]; val != "" {
1✔
1446
                cc.AddAnnotation(dv, ann, val)
×
1447
        }
×
1448
}
1449

1450
func passCronLabelToDataSource(cron *cdiv1.DataImportCron, ds *cdiv1.DataSource, ann string) {
1✔
1451
        if val := cron.Labels[ann]; val != "" {
2✔
1452
                cc.AddLabel(ds, ann, val)
1✔
1453
        }
1✔
1454
}
1455

1456
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1✔
1457
        dataSource := &cdiv1.DataSource{
1✔
1458
                ObjectMeta: metav1.ObjectMeta{
1✔
1459
                        Name:      cron.Spec.ManagedDataSource,
1✔
1460
                        Namespace: cron.Namespace,
1✔
1461
                },
1✔
1462
        }
1✔
1463
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1464
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1465
        return dataSource
1✔
1466
}
1✔
1467

1468
// Create DataVolume name based on the DataSource name + prefix of the digest sha256
1469
func createDvName(prefix, digest string) (string, error) {
1✔
1470
        fromIdx := len(digestPrefix)
1✔
1471
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1472
        if !strings.HasPrefix(digest, digestPrefix) {
2✔
1473
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1474
        }
1✔
1475
        if len(digest) < toIdx {
2✔
1476
                return "", errors.Errorf("Digest is too short")
1✔
1477
        }
1✔
1478
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1479
}
1480

1481
// GetCronJobName get CronJob name based on cron name and UID
1482
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1483
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1484
}
1✔
1485

1486
// GetInitialJobName get initial job name based on cron name and UID
1487
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1488
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1489
}
1✔
1490

1491
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1492
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1493
}
1✔
1494

1495
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1496
        dv := &cron.Spec.Template
1✔
1497

1✔
1498
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1499
                return explicitVolumeMode, nil
×
1500
        }
×
1501

1502
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1503
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1504
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1505
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1506
                        AccessModes:      accessModes,
1✔
1507
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1508
                        Resources: corev1.VolumeResourceRequirements{
1✔
1509
                                Requests: corev1.ResourceList{
1✔
1510
                                        // Doesn't matter
1✔
1511
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1512
                                },
1✔
1513
                        },
1✔
1514
                },
1✔
1515
        }
1✔
1516
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1517
                return nil, err
×
1518
        }
×
1519

1520
        return inferredPvc.Spec.VolumeMode, nil
1✔
1521
}
1522

1523
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1524
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1525
        if dv.Spec.PVC != nil {
1✔
1526
                return dv.Spec.PVC.VolumeMode
×
1527
        }
×
1528

1529
        if dv.Spec.Storage != nil {
2✔
1530
                return dv.Spec.Storage.VolumeMode
1✔
1531
        }
1✔
1532

1533
        return nil
×
1534
}
1535

1536
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1537
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1538
        if dv.Spec.PVC != nil {
1✔
1539
                return dv.Spec.PVC.AccessModes
×
1540
        }
×
1541

1542
        if dv.Spec.Storage != nil {
2✔
1543
                return dv.Spec.Storage.AccessModes
1✔
1544
        }
1✔
1545

1546
        return nil
×
1547
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc