• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #6009

14 May 2026 08:29PM UTC coverage: 49.608% (+0.04%) from 49.568%
#6009

Pull #4137

travis-ci

dsanatar
refactor UpdatePVCBoundContionFromEvents

update logic to prioritize bound condition msg
from prime pvc if one exists. update function
name to reflect this behavior change

no longer set conditions for prime pvc as well,
only the target pvc needs to be updated in order for
events to get propogated to the DV.

extract out event parsing logic to new
getLatestEventMessage helper func.

Signed-off-by: dsanatar <dsanatar@redhat.com>
Assisted-by: Claude <noreply@anthropic.com>
Pull Request #4137: Refactor PVC event propagation for populators

20 of 56 new or added lines in 7 files covered. (35.71%)

38 existing lines in 2 files now uncovered.

14989 of 30215 relevant lines covered (49.61%)

0.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

13.09
/pkg/controller/common/util.go
1
/*
2
Copyright 2022 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
        http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package common
18

19
import (
20
        "context"
21
        "crypto/rand"
22
        "crypto/rsa"
23
        "crypto/tls"
24
        "fmt"
25
        "io"
26
        "net"
27
        "net/http"
28
        "reflect"
29
        "regexp"
30
        "sort"
31
        "strconv"
32
        "strings"
33
        "sync"
34
        "time"
35

36
        "github.com/go-logr/logr"
37
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
38
        ocpconfigv1 "github.com/openshift/api/config/v1"
39
        "github.com/pkg/errors"
40

41
        corev1 "k8s.io/api/core/v1"
42
        storagev1 "k8s.io/api/storage/v1"
43
        extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
44
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
45
        "k8s.io/apimachinery/pkg/api/meta"
46
        "k8s.io/apimachinery/pkg/api/resource"
47
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
48
        "k8s.io/apimachinery/pkg/labels"
49
        "k8s.io/apimachinery/pkg/runtime"
50
        "k8s.io/apimachinery/pkg/types"
51
        "k8s.io/apimachinery/pkg/util/sets"
52
        "k8s.io/client-go/tools/cache"
53
        "k8s.io/client-go/tools/record"
54
        "k8s.io/klog/v2"
55
        "k8s.io/utils/ptr"
56

57
        runtimecache "sigs.k8s.io/controller-runtime/pkg/cache"
58
        "sigs.k8s.io/controller-runtime/pkg/client"
59
        "sigs.k8s.io/controller-runtime/pkg/client/fake"
60

61
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
62
        cdiv1utils "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1/utils"
63
        "kubevirt.io/containerized-data-importer/pkg/client/clientset/versioned/scheme"
64
        "kubevirt.io/containerized-data-importer/pkg/common"
65
        featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
66
        "kubevirt.io/containerized-data-importer/pkg/token"
67
        "kubevirt.io/containerized-data-importer/pkg/util"
68
        sdkapi "kubevirt.io/controller-lifecycle-operator-sdk/api"
69
)
70

71
const (
72
        // DataVolName provides a const to use for creating volumes in pod specs
73
        DataVolName = "cdi-data-vol"
74

75
        // ScratchVolName provides a const to use for creating scratch pvc volumes in pod specs
76
        ScratchVolName = "cdi-scratch-vol"
77

78
        // AnnAPIGroup is the APIGroup for CDI
79
        AnnAPIGroup = "cdi.kubevirt.io"
80
        // AnnCreatedBy is a pod annotation indicating if the pod was created by the PVC
81
        AnnCreatedBy = AnnAPIGroup + "/storage.createdByController"
82
        // AnnPodPhase is a PVC annotation indicating the related pod progress (phase)
83
        AnnPodPhase = AnnAPIGroup + "/storage.pod.phase"
84
        // AnnPodReady tells whether the pod is ready
85
        AnnPodReady = AnnAPIGroup + "/storage.pod.ready"
86
        // AnnPodRestarts is a PVC annotation that tells how many times a related pod was restarted
87
        AnnPodRestarts = AnnAPIGroup + "/storage.pod.restarts"
88
        // AnnPodSchedulable is a PVC annotation that tells if the Pod is schedulable or not
89
        AnnPodSchedulable = AnnAPIGroup + "/storage.pod.schedulable"
90
        // AnnPopulatedFor is a PVC annotation telling the datavolume controller that the PVC is already populated
91
        AnnPopulatedFor = AnnAPIGroup + "/storage.populatedFor"
92
        // AnnPrePopulated is a PVC annotation telling the datavolume controller that the PVC is already populated
93
        AnnPrePopulated = AnnAPIGroup + "/storage.prePopulated"
94
        // AnnPriorityClassName is PVC annotation to indicate the priority class name for importer, cloner and uploader pod
95
        AnnPriorityClassName = AnnAPIGroup + "/storage.pod.priorityclassname"
96
        // AnnPodServiceAccount is a PVC annotation to indicate the service account name for importer and uploader pod
97
        AnnPodServiceAccount = AnnAPIGroup + "/storage.pod.serviceAccountName"
98
        // AnnExternalPopulation annotation marks a PVC as "externally populated", allowing the import-controller to skip it
99
        AnnExternalPopulation = AnnAPIGroup + "/externalPopulation"
100

101
        // AnnPodRetainAfterCompletion is PVC annotation for retaining transfer pods after completion
102
        AnnPodRetainAfterCompletion = AnnAPIGroup + "/storage.pod.retainAfterCompletion"
103

104
        // AnnPreviousCheckpoint provides a const to indicate the previous snapshot for a multistage import
105
        AnnPreviousCheckpoint = AnnAPIGroup + "/storage.checkpoint.previous"
106
        // AnnCurrentCheckpoint provides a const to indicate the current snapshot for a multistage import
107
        AnnCurrentCheckpoint = AnnAPIGroup + "/storage.checkpoint.current"
108
        // AnnFinalCheckpoint provides a const to indicate whether the current checkpoint is the last one
109
        AnnFinalCheckpoint = AnnAPIGroup + "/storage.checkpoint.final"
110
        // AnnCheckpointsCopied is a prefix for recording which checkpoints have already been copied
111
        AnnCheckpointsCopied = AnnAPIGroup + "/storage.checkpoint.copied"
112

113
        // AnnCurrentPodID keeps track of the latest pod servicing this PVC
114
        AnnCurrentPodID = AnnAPIGroup + "/storage.checkpoint.pod.id"
115
        // AnnMultiStageImportDone marks a multi-stage import as totally finished
116
        AnnMultiStageImportDone = AnnAPIGroup + "/storage.checkpoint.done"
117

118
        // AnnPopulatorProgress is a standard annotation that can be used progress reporting
119
        AnnPopulatorProgress = AnnAPIGroup + "/storage.populator.progress"
120

121
        // AnnPreallocationRequested provides a const to indicate whether preallocation should be performed on the PV
122
        AnnPreallocationRequested = AnnAPIGroup + "/storage.preallocation.requested"
123
        // AnnPreallocationApplied provides a const for PVC preallocation annotation
124
        AnnPreallocationApplied = AnnAPIGroup + "/storage.preallocation"
125

126
        // AnnRunningCondition provides a const for the running condition
127
        AnnRunningCondition = AnnAPIGroup + "/storage.condition.running"
128
        // AnnRunningConditionMessage provides a const for the running condition
129
        AnnRunningConditionMessage = AnnAPIGroup + "/storage.condition.running.message"
130
        // AnnRunningConditionReason provides a const for the running condition
131
        AnnRunningConditionReason = AnnAPIGroup + "/storage.condition.running.reason"
132

133
        // AnnBoundCondition provides a const for the running condition
134
        AnnBoundCondition = AnnAPIGroup + "/storage.condition.bound"
135
        // AnnBoundConditionMessage provides a const for the running condition
136
        AnnBoundConditionMessage = AnnAPIGroup + "/storage.condition.bound.message"
137
        // AnnBoundConditionReason provides a const for the running condition
138
        AnnBoundConditionReason = AnnAPIGroup + "/storage.condition.bound.reason"
139

140
        // AnnSourceRunningCondition provides a const for the running condition
141
        AnnSourceRunningCondition = AnnAPIGroup + "/storage.condition.source.running"
142
        // AnnSourceRunningConditionMessage provides a const for the running condition
143
        AnnSourceRunningConditionMessage = AnnAPIGroup + "/storage.condition.source.running.message"
144
        // AnnSourceRunningConditionReason provides a const for the running condition
145
        AnnSourceRunningConditionReason = AnnAPIGroup + "/storage.condition.source.running.reason"
146

147
        // AnnVddkVersion shows the last VDDK library version used by a DV's importer pod
148
        AnnVddkVersion = AnnAPIGroup + "/storage.pod.vddk.version"
149
        // AnnVddkHostConnection shows the last ESX host that serviced a DV's importer pod
150
        AnnVddkHostConnection = AnnAPIGroup + "/storage.pod.vddk.host"
151
        // AnnVddkInitImageURL saves a per-DV VDDK image URL on the PVC
152
        AnnVddkInitImageURL = AnnAPIGroup + "/storage.pod.vddk.initimageurl"
153
        // AnnVddkExtraArgs references a ConfigMap that holds arguments to pass directly to the VDDK library
154
        AnnVddkExtraArgs = AnnAPIGroup + "/storage.pod.vddk.extraargs"
155

156
        // AnnRequiresScratch provides a const for our PVC requiring scratch annotation
157
        AnnRequiresScratch = AnnAPIGroup + "/storage.import.requiresScratch"
158

159
        // AnnRequiresDirectIO provides a const for our PVC requiring direct io annotation (due to OOMs we need to try qemu cache=none)
160
        AnnRequiresDirectIO = AnnAPIGroup + "/storage.import.requiresDirectIo"
161
        // OOMKilledReason provides a value that container runtimes must return in the reason field for an OOMKilled container
162
        OOMKilledReason = "OOMKilled"
163

164
        // AnnContentType provides a const for the PVC content-type
165
        AnnContentType = AnnAPIGroup + "/storage.contentType"
166

167
        // AnnSource provide a const for our PVC import source annotation
168
        AnnSource = AnnAPIGroup + "/storage.import.source"
169
        // AnnEndpoint provides a const for our PVC endpoint annotation
170
        AnnEndpoint = AnnAPIGroup + "/storage.import.endpoint"
171

172
        // AnnSecret provides a const for our PVC secretName annotation
173
        AnnSecret = AnnAPIGroup + "/storage.import.secretName"
174
        // AnnCertConfigMap is the name of a configmap containing tls certs
175
        AnnCertConfigMap = AnnAPIGroup + "/storage.import.certConfigMap"
176
        // AnnRegistryImportMethod provides a const for registry import method annotation
177
        AnnRegistryImportMethod = AnnAPIGroup + "/storage.import.registryImportMethod"
178
        // AnnRegistryImageStream provides a const for registry image stream annotation
179
        AnnRegistryImageStream = AnnAPIGroup + "/storage.import.registryImageStream"
180
        // AnnImportPod provides a const for our PVC importPodName annotation
181
        AnnImportPod = AnnAPIGroup + "/storage.import.importPodName"
182
        // AnnDiskID provides a const for our PVC diskId annotation
183
        AnnDiskID = AnnAPIGroup + "/storage.import.diskId"
184
        // AnnUUID provides a const for our PVC uuid annotation
185
        AnnUUID = AnnAPIGroup + "/storage.import.uuid"
186
        // AnnInsecureSkipVerify provides a const for skipping certificate verification
187
        AnnInsecureSkipVerify = AnnAPIGroup + "/storage.import.insecureSkipVerify"
188
        // AnnBackingFile provides a const for our PVC backing file annotation
189
        AnnBackingFile = AnnAPIGroup + "/storage.import.backingFile"
190
        // AnnThumbprint provides a const for our PVC backing thumbprint annotation
191
        AnnThumbprint = AnnAPIGroup + "/storage.import.vddk.thumbprint"
192
        // AnnExtraHeaders provides a const for our PVC extraHeaders annotation
193
        AnnExtraHeaders = AnnAPIGroup + "/storage.import.extraHeaders"
194
        // AnnSecretExtraHeaders provides a const for our PVC secretExtraHeaders annotation
195
        AnnSecretExtraHeaders = AnnAPIGroup + "/storage.import.secretExtraHeaders"
196
        // AnnRegistryImageArchitecture provides a const for our PVC registryImageArchitecture annotation
197
        AnnRegistryImageArchitecture = AnnAPIGroup + "/storage.import.registryImageArchitecture"
198

199
        // AnnCloneToken is the annotation containing the clone token
200
        AnnCloneToken = AnnAPIGroup + "/storage.clone.token"
201
        // AnnExtendedCloneToken is the annotation containing the long term clone token
202
        AnnExtendedCloneToken = AnnAPIGroup + "/storage.extended.clone.token"
203
        // AnnPermissiveClone annotation allows the clone-controller to skip the clone size validation
204
        AnnPermissiveClone = AnnAPIGroup + "/permissiveClone"
205
        // AnnOwnerUID annotation has the owner UID
206
        AnnOwnerUID = AnnAPIGroup + "/ownerUID"
207
        // AnnCloneType is the comuuted/requested clone type
208
        AnnCloneType = AnnAPIGroup + "/cloneType"
209
        // AnnCloneSourcePod name of the source clone pod
210
        AnnCloneSourcePod = AnnAPIGroup + "/storage.sourceClonePodName"
211

212
        // AnnUploadRequest marks that a PVC should be made available for upload
213
        AnnUploadRequest = AnnAPIGroup + "/storage.upload.target"
214

215
        // AnnCheckStaticVolume checks if a statically allocated PV exists before creating the target PVC.
216
        // If so, PVC is still created but population is skipped
217
        AnnCheckStaticVolume = AnnAPIGroup + "/storage.checkStaticVolume"
218

219
        // AnnPersistentVolumeList is an annotation storing a list of PV names
220
        AnnPersistentVolumeList = AnnAPIGroup + "/storage.persistentVolumeList"
221

222
        // AnnPopulatorKind annotation is added to a PVC' to specify the population kind, so it's later
223
        // checked by the common populator watches.
224
        AnnPopulatorKind = AnnAPIGroup + "/storage.populator.kind"
225
        // AnnUsePopulator annotation indicates if the datavolume population will use populators
226
        AnnUsePopulator = AnnAPIGroup + "/storage.usePopulator"
227

228
        // AnnMinimumSupportedPVCSize annotation on a StorageProfile specifies its minimum supported PVC size
229
        AnnMinimumSupportedPVCSize = AnnAPIGroup + "/minimumSupportedPvcSize"
230
        // AnnUseReadWriteOnceForDataImportCron annotation on a StorageProfile signals that DataImportCron should use RWO for DataImportCron PVCs
231
        AnnUseReadWriteOnceForDataImportCron = AnnAPIGroup + "/useReadWriteOnceForDataImportCron"
232
        // AnnSnapshotClassForDataImportCron annotation on a StorageProfile specifies the VolumeSnapshotClass to use for DataImportCron snapshots
233
        AnnSnapshotClassForDataImportCron = AnnAPIGroup + "/snapshotClassForDataImportCron"
234

235
        // AnnDefaultStorageClass is the annotation indicating that a storage class is the default one
236
        AnnDefaultStorageClass = "storageclass.kubernetes.io/is-default-class"
237
        // AnnDefaultVirtStorageClass is the annotation indicating that a storage class is the default one for virtualization purposes
238
        AnnDefaultVirtStorageClass = "storageclass.kubevirt.io/is-default-virt-class"
239
        // AnnDefaultSnapshotClass is the annotation indicating that a snapshot class is the default one
240
        AnnDefaultSnapshotClass = "snapshot.storage.kubernetes.io/is-default-class"
241

242
        // AnnSourceVolumeMode is the volume mode of the source PVC specified as an annotation on snapshots
243
        AnnSourceVolumeMode = AnnAPIGroup + "/storage.import.sourceVolumeMode"
244
        // AnnAdvisedRestoreSize is the advised restore size for disks restored from the snapshot
245
        AnnAdvisedRestoreSize = AnnAPIGroup + "/storage.import.advisedRestoreSize"
246

247
        // AnnOpenShiftImageLookup is the annotation for OpenShift image stream lookup
248
        AnnOpenShiftImageLookup = "alpha.image.policy.openshift.io/resolve-names"
249

250
        // AnnCloneRequest sets our expected annotation for a CloneRequest
251
        AnnCloneRequest = "k8s.io/CloneRequest"
252
        // AnnCloneOf is used to indicate that cloning was complete
253
        AnnCloneOf = "k8s.io/CloneOf"
254

255
        // AnnPodNetwork is used for specifying Pod Network
256
        AnnPodNetwork = "k8s.v1.cni.cncf.io/networks"
257
        // AnnPodMultusDefaultNetwork is used for specifying default Pod Network
258
        AnnPodMultusDefaultNetwork = "v1.multus-cni.io/default-network"
259
        // AnnPodSidecarInjectionIstio is used for enabling/disabling Pod istio/AspenMesh sidecar injection
260
        AnnPodSidecarInjectionIstio = "sidecar.istio.io/inject"
261
        // AnnPodSidecarInjectionIstioDefault is the default value passed for AnnPodSidecarInjection
262
        AnnPodSidecarInjectionIstioDefault = "false"
263
        // AnnPodSidecarInjectionLinkerd is used to enable/disable linkerd sidecar injection
264
        AnnPodSidecarInjectionLinkerd = "linkerd.io/inject"
265
        // AnnPodSidecarInjectionLinkerdDefault is the default value passed for AnnPodSidecarInjectionLinkerd
266
        AnnPodSidecarInjectionLinkerdDefault = "disabled"
267

268
        // AnnImmediateBinding provides a const to indicate whether immediate binding should be performed on the PV (overrides global config)
269
        AnnImmediateBinding = AnnAPIGroup + "/storage.bind.immediate.requested"
270

271
        // AnnSelectedNode annotation is added to a PVC that has been triggered by scheduler to
272
        // be dynamically provisioned. Its value is the name of the selected node.
273
        AnnSelectedNode = "volume.kubernetes.io/selected-node"
274

275
        // CloneUniqueID is used as a special label to be used when we search for the pod
276
        CloneUniqueID = AnnAPIGroup + "/storage.clone.cloneUniqeId"
277

278
        // CloneSourceInUse is reason for event created when clone source pvc is in use
279
        CloneSourceInUse = "CloneSourceInUse"
280

281
        // CloneComplete message
282
        CloneComplete = "Clone Complete"
283

284
        cloneTokenLeeway = 10 * time.Second
285

286
        // Default value for preallocation option if not defined in DV or CDIConfig
287
        defaultPreallocation = false
288

289
        // ErrStartingPod provides a const to indicate that a pod wasn't able to start without providing sensitive information (reason)
290
        ErrStartingPod = "ErrStartingPod"
291
        // MessageErrStartingPod provides a const to indicate that a pod wasn't able to start without providing sensitive information (message)
292
        MessageErrStartingPod = "Error starting pod '%s': For more information, request access to cdi-deploy logs from your sysadmin"
293
        // ErrClaimNotValid provides a const to indicate a claim is not valid
294
        ErrClaimNotValid = "ErrClaimNotValid"
295
        // ErrExceededQuota provides a const to indicate the claim has exceeded the quota
296
        ErrExceededQuota = "ErrExceededQuota"
297
        // ErrIncompatiblePVC provides a const to indicate a clone is not possible due to an incompatible PVC
298
        ErrIncompatiblePVC = "ErrIncompatiblePVC"
299

300
        // SourceHTTP is the source type HTTP, if unspecified or invalid, it defaults to SourceHTTP
301
        SourceHTTP = "http"
302
        // SourceS3 is the source type S3
303
        SourceS3 = "s3"
304
        // SourceGCS is the source type GCS
305
        SourceGCS = "gcs"
306
        // SourceGlance is the source type of glance
307
        SourceGlance = "glance"
308
        // SourceNone means there is no source.
309
        SourceNone = "none"
310
        // SourceRegistry is the source type of Registry
311
        SourceRegistry = "registry"
312
        // SourceImageio is the source type ovirt-imageio
313
        SourceImageio = "imageio"
314
        // SourceVDDK is the source type of VDDK
315
        SourceVDDK = "vddk"
316

317
        // VolumeSnapshotClassSelected reports that a VolumeSnapshotClass was selected
318
        VolumeSnapshotClassSelected = "VolumeSnapshotClassSelected"
319
        // MessageStorageProfileVolumeSnapshotClassSelected reports that a VolumeSnapshotClass was selected according to StorageProfile
320
        MessageStorageProfileVolumeSnapshotClassSelected = "VolumeSnapshotClass selected according to StorageProfile"
321
        // MessageDefaultVolumeSnapshotClassSelected reports that the default VolumeSnapshotClass was selected
322
        MessageDefaultVolumeSnapshotClassSelected = "Default VolumeSnapshotClass selected"
323
        // MessageFirstVolumeSnapshotClassSelected reports that the first VolumeSnapshotClass was selected
324
        MessageFirstVolumeSnapshotClassSelected = "First VolumeSnapshotClass selected"
325

326
        // ClaimLost reason const
327
        ClaimLost = "ClaimLost"
328
        // NotFound reason const
329
        NotFound = "NotFound"
330

331
        // LabelDefaultInstancetype provides a default VirtualMachine{ClusterInstancetype,Instancetype} that can be used by a VirtualMachine booting from a given PVC
332
        LabelDefaultInstancetype = "instancetype.kubevirt.io/default-instancetype"
333
        // LabelDefaultInstancetypeKind provides a default kind of either VirtualMachineClusterInstancetype or VirtualMachineInstancetype
334
        LabelDefaultInstancetypeKind = "instancetype.kubevirt.io/default-instancetype-kind"
335
        // LabelDefaultPreference provides a default VirtualMachine{ClusterPreference,Preference} that can be used by a VirtualMachine booting from a given PVC
336
        LabelDefaultPreference = "instancetype.kubevirt.io/default-preference"
337
        // LabelDefaultPreferenceKind provides a default kind of either VirtualMachineClusterPreference or VirtualMachinePreference
338
        LabelDefaultPreferenceKind = "instancetype.kubevirt.io/default-preference-kind"
339

340
        // LabelDynamicCredentialSupport specifies if the OS supports updating credentials at runtime.
341
        //nolint:gosec // These are not credentials
342
        LabelDynamicCredentialSupport = "kubevirt.io/dynamic-credentials-support"
343

344
        // LabelExcludeFromVeleroBackup provides a const to indicate whether an object should be excluded from velero backup
345
        LabelExcludeFromVeleroBackup = "velero.io/exclude-from-backup"
346

347
        // ProgressDone this means we are DONE
348
        ProgressDone = "100.0%"
349

350
        // AnnEventSourceKind is the source kind that should be related to events
351
        AnnEventSourceKind = AnnAPIGroup + "/events.source.kind"
352
        // AnnEventSource is the source that should be related to events (namespace/name)
353
        AnnEventSource = AnnAPIGroup + "/events.source"
354

355
        // AnnAllowClaimAdoption is the annotation that allows a claim to be adopted by a DataVolume
356
        AnnAllowClaimAdoption = AnnAPIGroup + "/allowClaimAdoption"
357

358
        // AnnCdiCustomizeComponentHash annotation is a hash of all customizations that live under spec.CustomizeComponents
359
        AnnCdiCustomizeComponentHash = AnnAPIGroup + "/customizer-identifier"
360

361
        // AnnCreatedForDataVolume stores the UID of the datavolume that the PVC was created for
362
        AnnCreatedForDataVolume = AnnAPIGroup + "/createdForDataVolume"
363

364
        // AnnPVCPrimeName annotation is the name of the PVC' that is used to populate the PV which is then rebound to the target PVC
365
        AnnPVCPrimeName = AnnAPIGroup + "/storage.populator.pvcPrime"
366
)
367

368
// Size-detection pod error codes
369
const (
370
        NoErr int = iota
371
        ErrBadArguments
372
        ErrInvalidFile
373
        ErrInvalidPath
374
        ErrBadTermFile
375
        ErrUnknown
376
)
377

378
var (
379
        // BlockMode is raw block device mode
380
        BlockMode = corev1.PersistentVolumeBlock
381
        // FilesystemMode is filesystem device mode
382
        FilesystemMode = corev1.PersistentVolumeFilesystem
383

384
        // DefaultInstanceTypeLabels is a list of currently supported default instance type labels
385
        DefaultInstanceTypeLabels = []string{
386
                LabelDefaultInstancetype,
387
                LabelDefaultInstancetypeKind,
388
                LabelDefaultPreference,
389
                LabelDefaultPreferenceKind,
390
        }
391

392
        apiServerKeyOnce sync.Once
393
        apiServerKey     *rsa.PrivateKey
394

395
        // allowedAnnotations is a list of annotations
396
        // that can be propagated from the pvc/dv to a pod
397
        allowedAnnotations = map[string]string{
398
                AnnPodNetwork:                 "",
399
                AnnPodSidecarInjectionIstio:   AnnPodSidecarInjectionIstioDefault,
400
                AnnPodSidecarInjectionLinkerd: AnnPodSidecarInjectionLinkerdDefault,
401
                AnnPriorityClassName:          "",
402
                AnnPodServiceAccount:          "",
403
                AnnPodMultusDefaultNetwork:    "",
404
        }
405

406
        validLabelsMatch = regexp.MustCompile(`^([\w.]+\.kubevirt.io|kubevirt.io)/[\w.-]+$`)
407

408
        ErrDataSourceMaxDepthReached = errors.New("DataSource reference chain exceeds maximum depth of 1")
409
        ErrDataSourceSelfReference   = errors.New("DataSource cannot self-reference")
410
        ErrDataSourceCrossNamespace  = errors.New("DataSource cannot reference a DataSource in another namespace")
411
)
412

413
// FakeValidator is a fake token validator
414
type FakeValidator struct {
415
        Match     string
416
        Operation token.Operation
417
        Name      string
418
        Namespace string
419
        Resource  metav1.GroupVersionResource
420
        Params    map[string]string
421
}
422

423
// Validate is a fake token validation
424
func (v *FakeValidator) Validate(value string) (*token.Payload, error) {
425
        if value != v.Match {
426
                return nil, fmt.Errorf("token does not match expected")
427
        }
428
        resource := metav1.GroupVersionResource{
429
                Resource: "persistentvolumeclaims",
430
        }
431
        return &token.Payload{
×
432
                Name:      v.Name,
×
433
                Namespace: v.Namespace,
×
434
                Operation: token.OperationClone,
×
435
                Resource:  resource,
×
436
                Params:    v.Params,
×
437
        }, nil
×
438
}
×
439

×
440
// MultiTokenValidator is a token validator that can validate both short and long tokens
×
441
type MultiTokenValidator struct {
×
442
        ShortTokenValidator token.Validator
×
443
        LongTokenValidator  token.Validator
×
444
}
×
445

446
// ValidatePVC validates a PVC
447
func (mtv *MultiTokenValidator) ValidatePVC(source, target *corev1.PersistentVolumeClaim) error {
448
        tok, v := mtv.getTokenAndValidator(target)
449
        return ValidateCloneTokenPVC(tok, v, source, target)
450
}
451

452
// ValidatePopulator valades a token for a populator
453
func (mtv *MultiTokenValidator) ValidatePopulator(vcs *cdiv1.VolumeCloneSource, pvc *corev1.PersistentVolumeClaim) error {
454
        if vcs.Namespace == pvc.Namespace {
×
455
                return nil
×
456
        }
×
457

×
458
        tok, v := mtv.getTokenAndValidator(pvc)
459

460
        tokenData, err := v.Validate(tok)
×
461
        if err != nil {
×
462
                return errors.Wrap(err, "error verifying token")
×
463
        }
×
464

465
        var tokenResourceName string
×
466
        switch vcs.Spec.Source.Kind {
×
467
        case "PersistentVolumeClaim":
×
468
                tokenResourceName = "persistentvolumeclaims"
×
469
        case "VolumeSnapshot":
×
470
                tokenResourceName = "volumesnapshots"
×
471
        }
472
        srcName := vcs.Spec.Source.Name
×
473

×
474
        return validateTokenData(tokenData, vcs.Namespace, srcName, pvc.Namespace, pvc.Name, string(pvc.UID), tokenResourceName)
×
475
}
×
476

×
477
func (mtv *MultiTokenValidator) getTokenAndValidator(pvc *corev1.PersistentVolumeClaim) (string, token.Validator) {
×
478
        v := mtv.LongTokenValidator
479
        tok, ok := pvc.Annotations[AnnExtendedCloneToken]
×
480
        if !ok {
×
481
                // if token doesn't exist, no prob for same namespace
×
482
                tok = pvc.Annotations[AnnCloneToken]
483
                v = mtv.ShortTokenValidator
484
        }
×
485
        return tok, v
×
486
}
×
487

×
488
// NewMultiTokenValidator returns a new multi token validator
×
489
func NewMultiTokenValidator(key *rsa.PublicKey) *MultiTokenValidator {
×
490
        return &MultiTokenValidator{
×
491
                ShortTokenValidator: NewCloneTokenValidator(common.CloneTokenIssuer, key),
×
492
                LongTokenValidator:  NewCloneTokenValidator(common.ExtendedCloneTokenIssuer, key),
×
493
        }
494
}
495

496
// NewCloneTokenValidator returns a new token validator
×
497
func NewCloneTokenValidator(issuer string, key *rsa.PublicKey) token.Validator {
×
498
        return token.NewValidator(issuer, key, cloneTokenLeeway)
×
499
}
×
500

×
501
// GetRequestedImageSize returns the PVC requested size
×
502
func GetRequestedImageSize(pvc *corev1.PersistentVolumeClaim) (string, error) {
503
        pvcSize, found := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
504
        if !found {
×
505
                return "", errors.Errorf("storage request is missing in pvc \"%s/%s\"", pvc.Namespace, pvc.Name)
×
506
        }
×
507
        return pvcSize.String(), nil
508
}
509

1✔
510
// GetVolumeMode returns the volumeMode from PVC handling default empty value
1✔
511
func GetVolumeMode(pvc *corev1.PersistentVolumeClaim) corev1.PersistentVolumeMode {
2✔
512
        return util.ResolveVolumeMode(pvc.Spec.VolumeMode)
1✔
513
}
1✔
514

1✔
515
// IsDataVolumeUsingDefaultStorageClass checks if the DataVolume is using the default StorageClass
516
func IsDataVolumeUsingDefaultStorageClass(dv *cdiv1.DataVolume) bool {
517
        return GetStorageClassFromDVSpec(dv) == nil
518
}
×
519

×
520
// GetStorageClassFromDVSpec returns the StorageClassName from DataVolume PVC or Storage spec
×
521
func GetStorageClassFromDVSpec(dv *cdiv1.DataVolume) *string {
522
        if dv.Spec.PVC != nil {
523
                return dv.Spec.PVC.StorageClassName
×
524
        }
×
525

×
526
        if dv.Spec.Storage != nil {
527
                return dv.Spec.Storage.StorageClassName
528
        }
×
529

×
530
        return nil
×
531
}
×
532

533
// getStorageClassByName looks up the storage class based on the name.
×
534
// If name is nil, it performs fallback to default according to the provided content type
×
535
// If no storage class is found, returns nil
×
536
func getStorageClassByName(ctx context.Context, client client.Client, name *string, contentType cdiv1.DataVolumeContentType) (*storagev1.StorageClass, error) {
537
        if name == nil {
×
538
                return getFallbackStorageClass(ctx, client, contentType)
539
        }
540

541
        // look up storage class by name
542
        storageClass := &storagev1.StorageClass{}
543
        if err := client.Get(ctx, types.NamespacedName{Name: *name}, storageClass); err != nil {
1✔
544
                if k8serrors.IsNotFound(err) {
2✔
545
                        return nil, nil
1✔
546
                }
1✔
547
                klog.V(3).Info("Unable to retrieve storage class", "storage class name", *name)
548
                return nil, errors.Errorf("unable to retrieve storage class %s", *name)
549
        }
×
550

×
551
        return storageClass, nil
×
552
}
×
553

×
554
// GetStorageClassByNameWithK8sFallback looks up the storage class based on the name
×
555
// If name is nil, it looks for the default k8s storage class storageclass.kubernetes.io/is-default-class
×
556
// If no storage class is found, returns nil
557
func GetStorageClassByNameWithK8sFallback(ctx context.Context, client client.Client, name *string) (*storagev1.StorageClass, error) {
558
        return getStorageClassByName(ctx, client, name, cdiv1.DataVolumeArchive)
×
559
}
560

561
// GetStorageClassByNameWithVirtFallback looks up the storage class based on the name
562
// If name is nil, it looks for the following, in this order:
563
// default kubevirt storage class (if the caller is interested) storageclass.kubevirt.io/is-default-class
564
// default k8s storage class storageclass.kubernetes.io/is-default-class
1✔
565
// If no storage class is found, returns nil
1✔
566
func GetStorageClassByNameWithVirtFallback(ctx context.Context, client client.Client, name *string, contentType cdiv1.DataVolumeContentType) (*storagev1.StorageClass, error) {
1✔
567
        return getStorageClassByName(ctx, client, name, contentType)
568
}
569

570
// getFallbackStorageClass looks for a default virt/k8s storage class according to the content type
571
// If no storage class is found, returns nil
572
func getFallbackStorageClass(ctx context.Context, client client.Client, contentType cdiv1.DataVolumeContentType) (*storagev1.StorageClass, error) {
573
        storageClasses := &storagev1.StorageClassList{}
1✔
574
        if err := client.List(ctx, storageClasses); err != nil {
1✔
575
                klog.V(3).Info("Unable to retrieve available storage classes")
1✔
576
                return nil, errors.New("unable to retrieve storage classes")
577
        }
578

579
        if GetContentType(contentType) == cdiv1.DataVolumeKubeVirt {
1✔
580
                if virtSc := GetPlatformDefaultStorageClass(storageClasses, AnnDefaultVirtStorageClass); virtSc != nil {
1✔
581
                        return virtSc, nil
1✔
582
                }
×
583
        }
×
584
        return GetPlatformDefaultStorageClass(storageClasses, AnnDefaultStorageClass), nil
×
585
}
586

2✔
587
// GetPlatformDefaultStorageClass returns the default storage class according to the provided annotation or nil if none found
2✔
588
func GetPlatformDefaultStorageClass(storageClasses *storagev1.StorageClassList, defaultAnnotationKey string) *storagev1.StorageClass {
1✔
589
        defaultClasses := []storagev1.StorageClass{}
1✔
590

591
        for _, storageClass := range storageClasses.Items {
1✔
592
                if storageClass.Annotations[defaultAnnotationKey] == "true" {
593
                        defaultClasses = append(defaultClasses, storageClass)
594
                }
595
        }
1✔
596

1✔
597
        if len(defaultClasses) == 0 {
1✔
598
                return nil
2✔
599
        }
2✔
600

1✔
601
        // Primary sort by creation timestamp, newest first
1✔
602
        // Secondary sort by class name, ascending order
603
        // Follows k8s behavior
604
        // https://github.com/kubernetes/kubernetes/blob/731068288e112c8b5af70f676296cc44661e84f4/pkg/volume/util/storageclass.go#L58-L59
2✔
605
        sort.Slice(defaultClasses, func(i, j int) bool {
1✔
606
                if defaultClasses[i].CreationTimestamp.UnixNano() == defaultClasses[j].CreationTimestamp.UnixNano() {
1✔
607
                        return defaultClasses[i].Name < defaultClasses[j].Name
608
                }
609
                return defaultClasses[i].CreationTimestamp.UnixNano() > defaultClasses[j].CreationTimestamp.UnixNano()
610
        })
611
        if len(defaultClasses) > 1 {
612
                klog.V(3).Infof("%d default StorageClasses were found, choosing: %s", len(defaultClasses), defaultClasses[0].Name)
2✔
613
        }
2✔
614

1✔
615
        return &defaultClasses[0]
1✔
616
}
1✔
617

618
// GetFilesystemOverheadForStorageClass determines the filesystem overhead defined in CDIConfig for the storageClass.
2✔
619
func GetFilesystemOverheadForStorageClass(ctx context.Context, client client.Client, storageClassName *string) (cdiv1.Percent, error) {
1✔
620
        if storageClassName != nil && *storageClassName == "" {
1✔
621
                klog.V(3).Info("No storage class name passed")
622
                return "0", nil
1✔
623
        }
624

625
        cdiConfig := &cdiv1.CDIConfig{}
626
        if err := client.Get(ctx, types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
×
627
                if k8serrors.IsNotFound(err) {
×
628
                        klog.V(1).Info("CDIConfig does not exist, pod will not start until it does")
×
629
                        return "0", nil
×
630
                }
×
631
                return "0", err
632
        }
×
633

×
634
        targetStorageClass, err := GetStorageClassByNameWithK8sFallback(ctx, client, storageClassName)
×
635
        if err != nil || targetStorageClass == nil {
×
636
                klog.V(3).Info("Storage class", storageClassName, "not found, trying default storage class")
×
637
                targetStorageClass, err = GetStorageClassByNameWithK8sFallback(ctx, client, nil)
×
638
                if err != nil {
×
639
                        klog.V(3).Info("No default storage class found, continuing with global overhead")
640
                        return cdiConfig.Status.FilesystemOverhead.Global, nil
641
                }
×
642
        }
×
643

×
644
        if cdiConfig.Status.FilesystemOverhead == nil {
×
645
                klog.Errorf("CDIConfig filesystemOverhead used before config controller ran reconcile. Hopefully this only happens during unit testing.")
×
646
                return "0", nil
×
647
        }
×
648

×
649
        if targetStorageClass == nil {
650
                klog.V(3).Info("Storage class", storageClassName, "not found, continuing with global overhead")
651
                return cdiConfig.Status.FilesystemOverhead.Global, nil
×
652
        }
×
653

×
654
        klog.V(3).Info("target storage class for overhead", targetStorageClass.GetName())
×
655

656
        perStorageConfig := cdiConfig.Status.FilesystemOverhead.StorageClass
×
657

×
658
        storageClassOverhead, found := perStorageConfig[targetStorageClass.GetName()]
×
659
        if found {
×
660
                return storageClassOverhead, nil
661
        }
×
662

×
663
        return cdiConfig.Status.FilesystemOverhead.Global, nil
×
664
}
×
665

×
666
// GetDefaultPodResourceRequirements gets default pod resource requirements from cdi config status
×
667
func GetDefaultPodResourceRequirements(client client.Client) (*corev1.ResourceRequirements, error) {
×
668
        cdiconfig := &cdiv1.CDIConfig{}
×
669
        if err := client.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiconfig); err != nil {
670
                klog.Errorf("Unable to find CDI configuration, %v\n", err)
×
671
                return nil, err
672
        }
673

674
        return cdiconfig.Status.DefaultPodResourceRequirements, nil
×
675
}
×
676

×
677
// GetImagePullSecrets gets the imagePullSecrets needed to pull images from the cdi config
×
678
func GetImagePullSecrets(client client.Client) ([]corev1.LocalObjectReference, error) {
×
679
        cdiconfig := &cdiv1.CDIConfig{}
×
680
        if err := client.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiconfig); err != nil {
681
                klog.Errorf("Unable to find CDI configuration, %v\n", err)
×
682
                return nil, err
683
        }
684

685
        return cdiconfig.Status.ImagePullSecrets, nil
×
686
}
×
687

×
688
// GetPodFromPvc determines the pod associated with the pvc passed in.
×
689
func GetPodFromPvc(c client.Client, namespace string, pvc *corev1.PersistentVolumeClaim) (*corev1.Pod, error) {
×
690
        l, _ := labels.Parse(common.PrometheusLabelKey)
×
691
        pods := &corev1.PodList{}
692
        listOptions := client.ListOptions{
×
693
                LabelSelector: l,
694
        }
695
        if err := c.List(context.TODO(), pods, &listOptions); err != nil {
696
                return nil, err
×
697
        }
×
698

×
699
        pvcUID := pvc.GetUID()
×
700
        for _, pod := range pods.Items {
×
701
                if ShouldIgnorePod(&pod, pvc) {
×
702
                        continue
×
703
                }
×
704
                for _, or := range pod.OwnerReferences {
×
705
                        if or.UID == pvcUID {
706
                                return &pod, nil
×
707
                        }
×
708
                }
×
709

×
710
                // TODO: check this
711
                val, exists := pod.Labels[CloneUniqueID]
×
712
                if exists && val == string(pvcUID)+common.ClonerSourcePodNameSuffix {
×
713
                        return &pod, nil
×
714
                }
×
715
        }
716
        return nil, errors.Errorf("Unable to find pod owned by UID: %s, in namespace: %s", string(pvcUID), namespace)
717
}
718

×
719
// AddVolumeDevices returns VolumeDevice slice with one block device for pods using PV with block volume mode
×
720
func AddVolumeDevices() []corev1.VolumeDevice {
×
721
        volumeDevices := []corev1.VolumeDevice{
×
722
                {
723
                        Name:       DataVolName,
×
724
                        DevicePath: common.WriteBlockPath,
725
                },
726
        }
727
        return volumeDevices
×
728
}
×
729

×
730
// GetPodsUsingPVCs returns Pods currently using PVCs
×
731
func GetPodsUsingPVCs(ctx context.Context, c client.Client, namespace string, names sets.Set[string], allowReadOnly bool) ([]corev1.Pod, error) {
×
732
        pl := &corev1.PodList{}
×
733
        // hopefully using cached client here
×
734
        err := c.List(ctx, pl, &client.ListOptions{Namespace: namespace})
×
735
        if err != nil {
×
736
                return nil, err
737
        }
738

×
739
        var pods []corev1.Pod
×
740
        for _, pod := range pl.Items {
×
741
                if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
×
742
                        continue
×
743
                }
×
744
                for _, volume := range pod.Spec.Volumes {
×
745
                        if volume.VolumeSource.PersistentVolumeClaim != nil &&
746
                                names.Has(volume.PersistentVolumeClaim.ClaimName) {
×
747
                                addPod := true
×
748
                                if allowReadOnly {
×
749
                                        if !volume.VolumeSource.PersistentVolumeClaim.ReadOnly {
×
750
                                                onlyReadOnly := true
751
                                                for _, c := range pod.Spec.Containers {
×
752
                                                        for _, vm := range c.VolumeMounts {
×
753
                                                                if vm.Name == volume.Name && !vm.ReadOnly {
×
754
                                                                        onlyReadOnly = false
×
755
                                                                }
×
756
                                                        }
×
757
                                                        for _, vm := range c.VolumeDevices {
×
758
                                                                if vm.Name == volume.Name {
×
759
                                                                        // Node level rw mount and container can't mount block device ro
×
760
                                                                        onlyReadOnly = false
×
761
                                                                }
×
762
                                                        }
×
763
                                                }
764
                                                if onlyReadOnly {
×
765
                                                        // no rw mounts
×
766
                                                        addPod = false
×
767
                                                }
×
768
                                        } else {
×
769
                                                // all mounts must be ro
770
                                                addPod = false
771
                                        }
×
772
                                        if strings.HasSuffix(pod.Name, common.ClonerSourcePodNameSuffix) && pod.Labels != nil &&
×
773
                                                pod.Labels[common.CDIComponentLabel] == common.ClonerSourcePodName {
×
774
                                                // Host assisted clone source pod only reads from source
×
775
                                                // But some drivers disallow mounting a block PVC ReadOnly
×
776
                                                addPod = false
×
777
                                        }
×
778
                                }
×
779
                                if addPod {
×
780
                                        pods = append(pods, pod)
×
781
                                        break
×
782
                                }
×
783
                        }
×
784
                }
×
785
        }
786

×
787
        return pods, nil
×
788
}
×
789

790
// GetWorkloadNodePlacement extracts the workload-specific nodeplacement values from the CDI CR
791
func GetWorkloadNodePlacement(ctx context.Context, c client.Client) (*sdkapi.NodePlacement, error) {
792
        cr, err := GetActiveCDI(ctx, c)
793
        if err != nil {
794
                return nil, err
×
795
        }
796

797
        if cr == nil {
798
                return nil, fmt.Errorf("no active CDI")
×
799
        }
×
800

×
801
        return &cr.Spec.Workloads, nil
×
802
}
×
803

804
// GetActiveCDI returns the active CDI CR
×
805
func GetActiveCDI(ctx context.Context, c client.Client) (*cdiv1.CDI, error) {
×
806
        crList := &cdiv1.CDIList{}
×
807
        if err := c.List(ctx, crList, &client.ListOptions{}); err != nil {
808
                return nil, err
×
809
        }
810

811
        if len(crList.Items) == 0 {
812
                return nil, nil
1✔
813
        }
1✔
814

1✔
815
        if len(crList.Items) == 1 {
×
816
                return &crList.Items[0], nil
×
817
        }
818

2✔
819
        var activeResources []cdiv1.CDI
1✔
820
        for _, cr := range crList.Items {
1✔
821
                if cr.Status.Phase != sdkapi.PhaseError {
822
                        activeResources = append(activeResources, cr)
2✔
823
                }
1✔
824
        }
1✔
825

826
        if len(activeResources) != 1 {
1✔
827
                return nil, fmt.Errorf("invalid number of active CDI resources: %d", len(activeResources))
2✔
828
        }
2✔
829

1✔
830
        return &activeResources[0], nil
1✔
831
}
832

833
// IsPopulated returns if the passed in PVC has been populated according to the rules outlined in pkg/apis/core/<version>/utils.go
2✔
834
func IsPopulated(pvc *corev1.PersistentVolumeClaim, c client.Client) (bool, error) {
1✔
835
        return cdiv1utils.IsPopulated(pvc, func(name, namespace string) (*cdiv1.DataVolume, error) {
1✔
836
                dv := &cdiv1.DataVolume{}
837
                err := c.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, dv)
1✔
838
                return dv, err
839
        })
840
}
841

×
842
// GetPreallocation returns the preallocation setting for the specified object (DV or VolumeImportSource), falling back to StorageClass and global setting (in this order)
×
843
func GetPreallocation(ctx context.Context, client client.Client, preallocation *bool) bool {
×
844
        // First, the DV's preallocation
×
845
        if preallocation != nil {
×
846
                return *preallocation
×
847
        }
848

849
        cdiconfig := &cdiv1.CDIConfig{}
850
        if err := client.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiconfig); err != nil {
×
851
                klog.Errorf("Unable to find CDI configuration, %v\n", err)
×
852
                return defaultPreallocation
×
853
        }
×
854

×
855
        return cdiconfig.Status.Preallocation
856
}
×
857

×
858
// ImmediateBindingRequested returns if an object has the ImmediateBinding annotation
×
859
func ImmediateBindingRequested(obj metav1.Object) bool {
×
860
        _, isImmediateBindingRequested := obj.GetAnnotations()[AnnImmediateBinding]
×
861
        return isImmediateBindingRequested
862
}
×
863

864
// GetPriorityClass gets PVC priority class
865
func GetPriorityClass(pvc *corev1.PersistentVolumeClaim) string {
866
        anno := pvc.GetAnnotations()
×
867
        return anno[AnnPriorityClassName]
×
868
}
×
869

×
870
// GetPodServiceAccount gets PVC service account name
871
func GetPodServiceAccount(pvc *corev1.PersistentVolumeClaim) string {
872
        anno := pvc.GetAnnotations()
×
873
        return anno[AnnPodServiceAccount]
×
874
}
×
875

×
876
// ShouldDeletePod returns whether the PVC workload pod should be deleted
877
func ShouldDeletePod(pvc *corev1.PersistentVolumeClaim) bool {
878
        return pvc.GetAnnotations()[AnnPodRetainAfterCompletion] != "true" || pvc.GetAnnotations()[AnnRequiresScratch] == "true" || pvc.GetAnnotations()[AnnRequiresDirectIO] == "true" || pvc.DeletionTimestamp != nil
×
879
}
×
880

×
881
// AddFinalizer adds a finalizer to a resource
×
882
func AddFinalizer(obj metav1.Object, name string) {
883
        if HasFinalizer(obj, name) {
884
                return
×
885
        }
×
886

×
887
        obj.SetFinalizers(append(obj.GetFinalizers(), name))
888
}
889

×
890
// RemoveFinalizer removes a finalizer from a resource
×
891
func RemoveFinalizer(obj metav1.Object, name string) {
×
892
        if !HasFinalizer(obj, name) {
×
893
                return
894
        }
×
895

896
        var finalizers []string
897
        for _, f := range obj.GetFinalizers() {
898
                if f != name {
×
899
                        finalizers = append(finalizers, f)
×
900
                }
×
901
        }
×
902

903
        obj.SetFinalizers(finalizers)
×
904
}
×
905

×
906
// HasFinalizer returns true if a resource has a specific finalizer
×
907
func HasFinalizer(object metav1.Object, value string) bool {
×
908
        for _, f := range object.GetFinalizers() {
909
                if f == value {
910
                        return true
×
911
                }
912
        }
913
        return false
914
}
×
915

×
916
// ValidateCloneTokenPVC validates clone token for source and target PVCs
×
917
func ValidateCloneTokenPVC(t string, v token.Validator, source, target *corev1.PersistentVolumeClaim) error {
×
918
        if source.Namespace == target.Namespace {
×
919
                return nil
920
        }
×
921

922
        tokenData, err := v.Validate(t)
923
        if err != nil {
924
                return errors.Wrap(err, "error verifying token")
×
925
        }
×
926

×
927
        tokenResourceName := getTokenResourceNamePvc(source)
×
928
        srcName := getSourceNamePvc(source)
929

×
930
        return validateTokenData(tokenData, source.Namespace, srcName, target.Namespace, target.Name, string(target.UID), tokenResourceName)
×
931
}
×
932

×
933
// ValidateCloneTokenDV validates clone token for DV
934
func ValidateCloneTokenDV(validator token.Validator, dv *cdiv1.DataVolume) error {
×
935
        _, sourceName, sourceNamespace := GetCloneSourceInfo(dv)
×
936
        if sourceNamespace == "" || sourceNamespace == dv.Namespace {
×
937
                return nil
×
938
        }
939

940
        tok, ok := dv.Annotations[AnnCloneToken]
941
        if !ok {
×
942
                return errors.New("clone token missing")
×
943
        }
×
944

×
945
        tokenData, err := validator.Validate(tok)
×
946
        if err != nil {
947
                return errors.Wrap(err, "error verifying token")
×
948
        }
×
949

×
950
        tokenResourceName := getTokenResourceNameDataVolume(dv.Spec.Source)
×
951
        if tokenResourceName == "" {
952
                return errors.New("token resource name empty, can't verify properly")
×
953
        }
×
954

×
955
        return validateTokenData(tokenData, sourceNamespace, sourceName, dv.Namespace, dv.Name, "", tokenResourceName)
×
956
}
957

×
958
func getTokenResourceNameDataVolume(source *cdiv1.DataVolumeSource) string {
×
959
        if source.PVC != nil {
×
960
                return "persistentvolumeclaims"
×
961
        } else if source.Snapshot != nil {
962
                return "volumesnapshots"
×
963
        }
964

965
        return ""
×
966
}
×
967

×
968
func getTokenResourceNamePvc(sourcePvc *corev1.PersistentVolumeClaim) string {
×
969
        if v, ok := sourcePvc.Labels[common.CDIComponentLabel]; ok && v == common.CloneFromSnapshotFallbackPVCCDILabel {
×
970
                return "volumesnapshots"
×
971
        }
972

×
973
        return "persistentvolumeclaims"
974
}
975

×
976
func getSourceNamePvc(sourcePvc *corev1.PersistentVolumeClaim) string {
×
977
        if v, ok := sourcePvc.Labels[common.CDIComponentLabel]; ok && v == common.CloneFromSnapshotFallbackPVCCDILabel {
×
978
                if sourcePvc.Spec.DataSourceRef != nil {
×
979
                        return sourcePvc.Spec.DataSourceRef.Name
980
                }
×
981
        }
982

983
        return sourcePvc.Name
×
984
}
×
985

×
986
func validateTokenData(tokenData *token.Payload, srcNamespace, srcName, targetNamespace, targetName, targetUID, tokenResourceName string) error {
×
987
        uid := tokenData.Params["uid"]
×
988
        if tokenData.Operation != token.OperationClone ||
989
                tokenData.Name != srcName ||
990
                tokenData.Namespace != srcNamespace ||
×
991
                tokenData.Resource.Resource != tokenResourceName ||
992
                tokenData.Params["targetNamespace"] != targetNamespace ||
993
                tokenData.Params["targetName"] != targetName ||
×
994
                (uid != "" && uid != targetUID) {
×
995
                return errors.New("invalid token")
×
996
        }
×
997

×
998
        return nil
×
999
}
×
1000

×
1001
// IsSnapshotValidForClone returns an error if the passed snapshot is not valid for cloning
×
1002
func IsSnapshotValidForClone(sourceSnapshot *snapshotv1.VolumeSnapshot) error {
×
1003
        if sourceSnapshot.Status == nil {
×
1004
                return fmt.Errorf("no status on source snapshot yet")
1005
        }
×
1006
        if !IsSnapshotReady(sourceSnapshot) {
1007
                klog.V(3).Info("snapshot not ReadyToUse, while we allow this, probably going to be an issue going forward", "namespace", sourceSnapshot.Namespace, "name", sourceSnapshot.Name)
1008
        }
1009
        if sourceSnapshot.Status.Error != nil {
×
1010
                errMessage := "no details"
×
1011
                if msg := sourceSnapshot.Status.Error.Message; msg != nil {
×
1012
                        errMessage = *msg
×
1013
                }
×
1014
                return fmt.Errorf("snapshot in error state with msg: %s", errMessage)
×
1015
        }
×
1016
        if sourceSnapshot.Spec.VolumeSnapshotClassName == nil ||
×
1017
                *sourceSnapshot.Spec.VolumeSnapshotClassName == "" {
×
1018
                return fmt.Errorf("snapshot %s/%s does not have volume snap class populated, can't clone", sourceSnapshot.Name, sourceSnapshot.Namespace)
×
1019
        }
×
1020
        return nil
×
1021
}
×
1022

1023
// AddAnnotation adds an annotation to an object
×
1024
func AddAnnotation(obj metav1.Object, key, value string) {
×
1025
        if obj.GetAnnotations() == nil {
×
1026
                obj.SetAnnotations(make(map[string]string))
×
1027
        }
×
1028
        obj.GetAnnotations()[key] = value
1029
}
1030

1031
// AddLabel adds a label to an object
1✔
1032
func AddLabel(obj metav1.Object, key, value string) {
2✔
1033
        if obj.GetLabels() == nil {
1✔
1034
                obj.SetLabels(make(map[string]string))
1✔
1035
        }
1✔
1036
        obj.GetLabels()[key] = value
1037
}
1038

1039
// HandleFailedPod handles pod-creation errors and updates the pod's PVC without providing sensitive information
1✔
1040
func HandleFailedPod(err error, podName string, pvc *corev1.PersistentVolumeClaim, recorder record.EventRecorder, c client.Client) error {
2✔
1041
        if err == nil {
1✔
1042
                return nil
1✔
1043
        }
1✔
1044
        // Generic reason and msg to avoid providing sensitive information
1045
        reason := ErrStartingPod
1046
        msg := fmt.Sprintf(MessageErrStartingPod, podName)
1047

×
1048
        // Error handling to fine-tune the event with pertinent info
×
1049
        if ErrQuotaExceeded(err) {
×
1050
                reason = ErrExceededQuota
×
1051
        }
1052

×
1053
        recorder.Event(pvc, corev1.EventTypeWarning, reason, msg)
×
1054

×
1055
        if isCloneSourcePod := CreateCloneSourcePodName(pvc) == podName; isCloneSourcePod {
×
1056
                AddAnnotation(pvc, AnnSourceRunningCondition, "false")
×
1057
                AddAnnotation(pvc, AnnSourceRunningConditionReason, reason)
×
1058
                AddAnnotation(pvc, AnnSourceRunningConditionMessage, msg)
×
1059
        } else {
1060
                AddAnnotation(pvc, AnnRunningCondition, "false")
×
1061
                AddAnnotation(pvc, AnnRunningConditionReason, reason)
×
1062
                AddAnnotation(pvc, AnnRunningConditionMessage, msg)
×
1063
        }
×
1064

×
1065
        AddAnnotation(pvc, AnnPodPhase, string(corev1.PodFailed))
×
1066
        if err := c.Update(context.TODO(), pvc); err != nil {
×
1067
                return err
×
1068
        }
×
1069

×
1070
        return err
×
1071
}
1072

×
1073
// GetSource returns the source string which determines the type of source. If no source or invalid source found, default to http
×
1074
func GetSource(pvc *corev1.PersistentVolumeClaim) string {
×
1075
        source, found := pvc.Annotations[AnnSource]
×
1076
        if !found {
1077
                source = ""
×
1078
        }
1079
        switch source {
1080
        case
1081
                SourceHTTP,
×
1082
                SourceS3,
×
1083
                SourceGCS,
×
1084
                SourceGlance,
×
1085
                SourceNone,
×
1086
                SourceRegistry,
×
1087
                SourceImageio,
1088
                SourceVDDK:
1089
        default:
1090
                source = SourceHTTP
1091
        }
1092
        return source
1093
}
1094

1095
// GetEndpoint returns the endpoint string which contains the full path URI of the target object to be copied.
×
1096
func GetEndpoint(pvc *corev1.PersistentVolumeClaim) (string, error) {
×
1097
        ep, found := pvc.Annotations[AnnEndpoint]
×
1098
        if !found || ep == "" {
1099
                verb := "empty"
×
1100
                if !found {
1101
                        verb = "missing"
1102
                }
1103
                return ep, errors.Errorf("annotation %q in pvc \"%s/%s\" is %s", AnnEndpoint, pvc.Namespace, pvc.Name, verb)
×
1104
        }
×
1105
        return ep, nil
×
1106
}
×
1107

×
1108
// AddImportVolumeMounts is being called for pods using PV with filesystem volume mode
×
1109
func AddImportVolumeMounts() []corev1.VolumeMount {
×
1110
        volumeMounts := []corev1.VolumeMount{
×
1111
                {
1112
                        Name:      DataVolName,
×
1113
                        MountPath: common.ImporterDataDir,
1114
                },
1115
        }
1116
        return volumeMounts
×
1117
}
×
1118

×
1119
// GetEffectiveStorageResources returns the maximum of the passed storageResources and the storageProfile minimumSupportedPVCSize.
×
1120
// If the passed storageResources has no size, it is returned as-is.
×
1121
func GetEffectiveStorageResources(ctx context.Context, client client.Client, storageResources corev1.VolumeResourceRequirements,
×
1122
        storageClassName *string, contentType cdiv1.DataVolumeContentType, log logr.Logger) (corev1.VolumeResourceRequirements, error) {
×
1123
        sc, err := GetStorageClassByNameWithVirtFallback(ctx, client, storageClassName, contentType)
×
1124
        if err != nil || sc == nil {
×
1125
                return storageResources, err
1126
        }
1127

1128
        requestedSize, hasSize := storageResources.Requests[corev1.ResourceStorage]
1129
        if !hasSize {
×
1130
                return storageResources, nil
×
1131
        }
×
1132

×
1133
        if requestedSize, err = GetEffectiveVolumeSize(ctx, client, requestedSize, sc.Name, &log); err != nil {
×
1134
                return storageResources, err
1135
        }
×
1136

×
1137
        return corev1.VolumeResourceRequirements{
×
1138
                Requests: corev1.ResourceList{
×
1139
                        corev1.ResourceStorage: requestedSize,
1140
                },
×
1141
        }, nil
×
1142
}
×
1143

1144
// GetEffectiveVolumeSize returns the maximum of the passed requestedSize and the storageProfile minimumSupportedPVCSize.
×
1145
func GetEffectiveVolumeSize(ctx context.Context, client client.Client, requestedSize resource.Quantity, storageClassName string, log *logr.Logger) (resource.Quantity, error) {
×
1146
        storageProfile := &cdiv1.StorageProfile{}
×
1147
        if err := client.Get(ctx, types.NamespacedName{Name: storageClassName}, storageProfile); err != nil {
×
1148
                return requestedSize, IgnoreNotFound(err)
×
1149
        }
1150

1151
        if val, exists := storageProfile.Annotations[AnnMinimumSupportedPVCSize]; exists {
1152
                if minSize, err := resource.ParseQuantity(val); err == nil {
×
1153
                        if requestedSize.Cmp(minSize) == -1 {
×
1154
                                return minSize, nil
×
1155
                        }
×
1156
                } else if log != nil {
×
1157
                        log.V(1).Info("Invalid minimum PVC size in annotation", "value", val, "error", err)
1158
                }
×
1159
        }
×
1160

×
1161
        return requestedSize, nil
×
1162
}
×
1163

×
1164
// ValidateRequestedCloneSize validates the clone size requirements on block
×
1165
func ValidateRequestedCloneSize(sourceResources, targetResources corev1.VolumeResourceRequirements) error {
×
1166
        sourceRequest, hasSource := sourceResources.Requests[corev1.ResourceStorage]
1167
        targetRequest, hasTarget := targetResources.Requests[corev1.ResourceStorage]
1168
        if !hasSource || !hasTarget {
×
1169
                return errors.New("source/target missing storage resource requests")
1170
        }
1171

1172
        // Verify that the target PVC size is equal or larger than the source.
×
1173
        if sourceRequest.Value() > targetRequest.Value() {
×
1174
                return errors.Errorf("target resources requests storage size is smaller than the source %d < %d", targetRequest.Value(), sourceRequest.Value())
×
1175
        }
×
1176
        return nil
×
1177
}
×
1178

1179
// CreateCloneSourcePodName creates clone source pod name
1180
func CreateCloneSourcePodName(targetPvc *corev1.PersistentVolumeClaim) string {
×
1181
        return string(targetPvc.GetUID()) + common.ClonerSourcePodNameSuffix
×
1182
}
×
1183

×
1184
// IsPVCComplete returns true if a PVC is in 'Succeeded' phase, false if not
1185
func IsPVCComplete(pvc *corev1.PersistentVolumeClaim) bool {
1186
        if pvc != nil {
1187
                phase, exists := pvc.ObjectMeta.Annotations[AnnPodPhase]
×
1188
                return exists && (phase == string(corev1.PodSucceeded))
×
1189
        }
×
1190
        return false
1191
}
1192

×
1193
// IsMultiStageImportInProgress returns true when a PVC is being part of an ongoing multi-stage import
×
1194
func IsMultiStageImportInProgress(pvc *corev1.PersistentVolumeClaim) bool {
×
1195
        if pvc != nil {
×
1196
                multiStageImport := metav1.HasAnnotation(pvc.ObjectMeta, AnnCurrentCheckpoint)
×
1197
                multiStageAlreadyDone := metav1.HasAnnotation(pvc.ObjectMeta, AnnMultiStageImportDone)
×
1198
                return multiStageImport && !multiStageAlreadyDone
1199
        }
1200
        return false
1201
}
×
1202

×
1203
// SetRestrictedSecurityContext sets the pod security params to be compatible with restricted PSA
×
1204
func SetRestrictedSecurityContext(podSpec *corev1.PodSpec) {
×
1205
        hasVolumeMounts := false
×
1206
        for _, containers := range [][]corev1.Container{podSpec.InitContainers, podSpec.Containers} {
×
1207
                for i := range containers {
×
1208
                        container := &containers[i]
1209
                        if container.SecurityContext == nil {
1210
                                container.SecurityContext = &corev1.SecurityContext{}
1211
                        }
×
1212
                        container.SecurityContext.Capabilities = &corev1.Capabilities{
×
1213
                                Drop: []corev1.Capability{
×
1214
                                        "ALL",
×
1215
                                },
×
1216
                        }
×
1217
                        container.SecurityContext.SeccompProfile = &corev1.SeccompProfile{
×
1218
                                Type: corev1.SeccompProfileTypeRuntimeDefault,
×
1219
                        }
×
1220
                        container.SecurityContext.AllowPrivilegeEscalation = ptr.To[bool](false)
×
1221
                        container.SecurityContext.RunAsNonRoot = ptr.To[bool](true)
×
1222
                        container.SecurityContext.RunAsUser = ptr.To[int64](common.QemuSubGid)
×
1223
                        if len(container.VolumeMounts) > 0 {
×
1224
                                hasVolumeMounts = true
×
1225
                        }
×
1226
                }
×
1227
        }
×
1228

×
1229
        if podSpec.SecurityContext == nil {
×
1230
                podSpec.SecurityContext = &corev1.PodSecurityContext{}
×
1231
        }
×
1232
        // Some tools like istio inject containers and thus rely on a pod level seccomp profile being specified
×
1233
        podSpec.SecurityContext.SeccompProfile = &corev1.SeccompProfile{
1234
                Type: corev1.SeccompProfileTypeRuntimeDefault,
1235
        }
1236
        if hasVolumeMounts {
×
1237
                podSpec.SecurityContext.FSGroup = ptr.To[int64](common.QemuSubGid)
×
1238
        }
×
1239
}
1240

×
1241
// SetNodeNameIfPopulator sets NodeName in a pod spec when the PVC is being handled by a CDI volume populator
×
1242
func SetNodeNameIfPopulator(pvc *corev1.PersistentVolumeClaim, podSpec *corev1.PodSpec) {
×
1243
        _, isPopulator := pvc.Annotations[AnnPopulatorKind]
×
1244
        nodeName := pvc.Annotations[AnnSelectedNode]
×
1245
        if isPopulator && nodeName != "" {
×
1246
                podSpec.NodeName = nodeName
1247
        }
1248
}
1249

×
1250
// CreatePvc creates PVC
×
1251
func CreatePvc(name, ns string, annotations, labels map[string]string) *corev1.PersistentVolumeClaim {
×
1252
        return CreatePvcInStorageClass(name, ns, nil, annotations, labels, corev1.ClaimBound)
×
1253
}
×
1254

×
1255
// CreatePvcInStorageClass creates PVC with storgae class
1256
func CreatePvcInStorageClass(name, ns string, storageClassName *string, annotations, labels map[string]string, phase corev1.PersistentVolumeClaimPhase) *corev1.PersistentVolumeClaim {
1257
        pvc := &corev1.PersistentVolumeClaim{
1258
                ObjectMeta: metav1.ObjectMeta{
1✔
1259
                        Name:        name,
1✔
1260
                        Namespace:   ns,
1✔
1261
                        Annotations: annotations,
1262
                        Labels:      labels,
1263
                        UID:         types.UID(ns + "-" + name),
1✔
1264
                },
1✔
1265
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1266
                        AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadOnlyMany, corev1.ReadWriteOnce},
1✔
1267
                        Resources: corev1.VolumeResourceRequirements{
1✔
1268
                                Requests: corev1.ResourceList{
1✔
1269
                                        corev1.ResourceStorage: resource.MustParse("1G"),
1✔
1270
                                },
1✔
1271
                        },
1✔
1272
                        StorageClassName: storageClassName,
1✔
1273
                },
1✔
1274
                Status: corev1.PersistentVolumeClaimStatus{
1✔
1275
                        Phase: phase,
1✔
1276
                },
1✔
1277
        }
1✔
1278
        pvc.Status.Capacity = pvc.Spec.Resources.Requests.DeepCopy()
1✔
1279
        if pvc.Status.Phase == corev1.ClaimBound {
1✔
1280
                pvc.Spec.VolumeName = "pv-" + string(pvc.UID)
1✔
1281
        }
1✔
1282
        return pvc
1✔
1283
}
1✔
1284

1✔
1285
// GetAPIServerKey returns API server RSA key
1✔
1286
func GetAPIServerKey() *rsa.PrivateKey {
2✔
1287
        apiServerKeyOnce.Do(func() {
1✔
1288
                apiServerKey, _ = rsa.GenerateKey(rand.Reader, 2048)
1✔
1289
        })
1✔
1290
        return apiServerKey
1291
}
1292

1293
// CreateStorageClass creates storage class CR
×
1294
func CreateStorageClass(name string, annotations map[string]string) *storagev1.StorageClass {
×
1295
        return &storagev1.StorageClass{
×
1296
                ObjectMeta: metav1.ObjectMeta{
×
1297
                        Name:        name,
×
1298
                        Annotations: annotations,
1299
                },
1300
        }
1301
}
1✔
1302

1✔
1303
// CreateImporterTestPod creates importer test pod CR
1✔
1304
func CreateImporterTestPod(pvc *corev1.PersistentVolumeClaim, dvname string, scratchPvc *corev1.PersistentVolumeClaim) *corev1.Pod {
1✔
1305
        // importer pod name contains the pvc name
1✔
1306
        podName := fmt.Sprintf("%s-%s", common.ImporterPodName, pvc.Name)
1✔
1307

1✔
1308
        blockOwnerDeletion := true
1✔
1309
        isController := true
1310

1311
        volumes := []corev1.Volume{
×
1312
                {
×
1313
                        Name: dvname,
×
1314
                        VolumeSource: corev1.VolumeSource{
×
1315
                                PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
×
1316
                                        ClaimName: pvc.Name,
×
1317
                                        ReadOnly:  false,
×
1318
                                },
×
1319
                        },
×
1320
                },
×
1321
        }
×
1322

×
1323
        if scratchPvc != nil {
×
1324
                volumes = append(volumes, corev1.Volume{
×
1325
                        Name: ScratchVolName,
×
1326
                        VolumeSource: corev1.VolumeSource{
×
1327
                                PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
×
1328
                                        ClaimName: scratchPvc.Name,
×
1329
                                        ReadOnly:  false,
×
1330
                                },
×
1331
                        },
×
1332
                })
×
1333
        }
×
1334

×
1335
        pod := &corev1.Pod{
×
1336
                TypeMeta: metav1.TypeMeta{
×
1337
                        Kind:       "Pod",
×
1338
                        APIVersion: "v1",
×
1339
                },
×
1340
                ObjectMeta: metav1.ObjectMeta{
×
1341
                        Name:      podName,
1342
                        Namespace: pvc.Namespace,
×
1343
                        Annotations: map[string]string{
×
1344
                                AnnCreatedBy: "yes",
×
1345
                        },
×
1346
                        Labels: map[string]string{
×
1347
                                common.CDILabelKey:        common.CDILabelValue,
×
1348
                                common.CDIComponentLabel:  common.ImporterPodName,
×
1349
                                common.PrometheusLabelKey: common.PrometheusLabelValue,
×
1350
                        },
×
1351
                        OwnerReferences: []metav1.OwnerReference{
×
1352
                                {
×
1353
                                        APIVersion:         "v1",
×
1354
                                        Kind:               "PersistentVolumeClaim",
×
1355
                                        Name:               pvc.Name,
×
1356
                                        UID:                pvc.GetUID(),
×
1357
                                        BlockOwnerDeletion: &blockOwnerDeletion,
×
1358
                                        Controller:         &isController,
×
1359
                                },
×
1360
                        },
×
1361
                },
×
1362
                Spec: corev1.PodSpec{
×
1363
                        Containers: []corev1.Container{
×
1364
                                {
×
1365
                                        Name:            common.ImporterPodName,
×
1366
                                        Image:           "test/myimage",
×
1367
                                        ImagePullPolicy: corev1.PullPolicy("Always"),
×
1368
                                        Args:            []string{"-v=5"},
×
1369
                                        Ports: []corev1.ContainerPort{
×
1370
                                                {
×
1371
                                                        Name:          "metrics",
×
1372
                                                        ContainerPort: 8443,
×
1373
                                                        Protocol:      corev1.ProtocolTCP,
×
1374
                                                },
×
1375
                                        },
×
1376
                                },
×
1377
                        },
×
1378
                        RestartPolicy:      corev1.RestartPolicyOnFailure,
×
1379
                        Volumes:            volumes,
×
1380
                        EnableServiceLinks: ptr.To(false),
×
1381
                },
×
1382
        }
×
1383

×
1384
        ep, _ := GetEndpoint(pvc)
×
1385
        source := GetSource(pvc)
×
1386
        contentType := GetPVCContentType(pvc)
×
1387
        imageSize, _ := GetRequestedImageSize(pvc)
×
1388
        volumeMode := GetVolumeMode(pvc)
×
1389

×
1390
        env := []corev1.EnvVar{
×
1391
                {
×
1392
                        Name:  common.ImporterSource,
×
1393
                        Value: source,
×
1394
                },
×
1395
                {
×
1396
                        Name:  common.ImporterEndpoint,
×
1397
                        Value: ep,
×
1398
                },
×
1399
                {
×
1400
                        Name:  common.ImporterContentType,
×
1401
                        Value: string(contentType),
×
1402
                },
×
1403
                {
×
1404
                        Name:  common.ImporterImageSize,
×
1405
                        Value: imageSize,
×
1406
                },
×
1407
                {
×
1408
                        Name:  common.OwnerUID,
×
1409
                        Value: string(pvc.UID),
×
1410
                },
×
1411
                {
×
1412
                        Name:  common.InsecureTLSVar,
×
1413
                        Value: "false",
×
1414
                },
×
1415
        }
×
1416
        pod.Spec.Containers[0].Env = env
×
1417
        if volumeMode == corev1.PersistentVolumeBlock {
×
1418
                pod.Spec.Containers[0].VolumeDevices = AddVolumeDevices()
×
1419
        } else {
×
1420
                pod.Spec.Containers[0].VolumeMounts = AddImportVolumeMounts()
×
1421
        }
×
1422

×
1423
        if scratchPvc != nil {
×
1424
                pod.Spec.Containers[0].VolumeMounts = append(pod.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{
×
1425
                        Name:      ScratchVolName,
×
1426
                        MountPath: common.ScratchDataDir,
×
1427
                })
×
1428
        }
×
1429

1430
        return pod
×
1431
}
×
1432

×
1433
// CreateStorageClassWithProvisioner creates CR of storage class with provisioner
×
1434
func CreateStorageClassWithProvisioner(name string, annotations, labels map[string]string, provisioner string) *storagev1.StorageClass {
×
1435
        return &storagev1.StorageClass{
×
1436
                Provisioner: provisioner,
1437
                ObjectMeta: metav1.ObjectMeta{
×
1438
                        Name:        name,
1439
                        Annotations: annotations,
1440
                        Labels:      labels,
1441
                },
×
1442
        }
×
1443
}
×
1444

×
1445
// CreateClient creates a fake client
×
1446
func CreateClient(objs ...runtime.Object) client.Client {
×
1447
        s := scheme.Scheme
×
1448
        _ = cdiv1.AddToScheme(s)
×
1449
        _ = corev1.AddToScheme(s)
×
1450
        _ = storagev1.AddToScheme(s)
×
1451
        _ = ocpconfigv1.Install(s)
1452

1453
        return fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objs...).Build()
1✔
1454
}
1✔
1455

1✔
1456
// ErrQuotaExceeded checked is the error is of exceeded quota
1✔
1457
func ErrQuotaExceeded(err error) bool {
1✔
1458
        return strings.Contains(err.Error(), "exceeded quota:")
1✔
1459
}
1✔
1460

1✔
1461
// GetContentType returns the content type. If invalid or not set, default to kubevirt
1✔
1462
func GetContentType(contentType cdiv1.DataVolumeContentType) cdiv1.DataVolumeContentType {
1463
        switch contentType {
1464
        case
×
1465
                cdiv1.DataVolumeKubeVirt,
×
1466
                cdiv1.DataVolumeArchive:
×
1467
        default:
1468
                // TODO - shouldn't archive be the default?
1469
                contentType = cdiv1.DataVolumeKubeVirt
1✔
1470
        }
1✔
1471
        return contentType
1472
}
1473

1✔
1474
// GetPVCContentType returns the content type of the source image. If invalid or not set, default to kubevirt
×
1475
func GetPVCContentType(pvc *corev1.PersistentVolumeClaim) cdiv1.DataVolumeContentType {
×
1476
        contentType, found := pvc.Annotations[AnnContentType]
×
1477
        if !found {
1478
                // TODO - shouldn't archive be the default?
1✔
1479
                return cdiv1.DataVolumeKubeVirt
1480
        }
1481

1482
        return GetContentType(cdiv1.DataVolumeContentType(contentType))
×
1483
}
×
1484

×
1485
// GetNamespace returns the given namespace if not empty, otherwise the default namespace
×
1486
func GetNamespace(namespace, defaultNamespace string) string {
×
1487
        if namespace == "" {
×
1488
                return defaultNamespace
1489
        }
×
1490
        return namespace
1491
}
1492

1493
// IsErrCacheNotStarted checked is the error is of cache not started
×
1494
func IsErrCacheNotStarted(err error) bool {
×
1495
        target := &runtimecache.ErrCacheNotStarted{}
×
1496
        return errors.As(err, &target)
×
1497
}
×
1498

1499
// NewImportDataVolume returns new import DataVolume CR
1500
func NewImportDataVolume(name string) *cdiv1.DataVolume {
1501
        return &cdiv1.DataVolume{
×
1502
                TypeMeta: metav1.TypeMeta{APIVersion: cdiv1.SchemeGroupVersion.String()},
×
1503
                ObjectMeta: metav1.ObjectMeta{
×
1504
                        Name:      name,
×
1505
                        Namespace: metav1.NamespaceDefault,
1506
                        UID:       types.UID(metav1.NamespaceDefault + "-" + name),
1507
                },
×
1508
                Spec: cdiv1.DataVolumeSpec{
×
1509
                        Source: &cdiv1.DataVolumeSource{
×
1510
                                HTTP: &cdiv1.DataVolumeSourceHTTP{
×
1511
                                        URL: "http://example.com/data",
×
1512
                                },
×
1513
                        },
×
1514
                        PVC: &corev1.PersistentVolumeClaimSpec{
×
1515
                                AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
×
1516
                        },
×
1517
                        PriorityClassName: "p0",
×
1518
                },
×
1519
        }
×
1520
}
×
1521

×
1522
// GetCloneSourceInfo returns the type, name and namespace of the cloning source
×
1523
func GetCloneSourceInfo(dv *cdiv1.DataVolume) (sourceType, sourceName, sourceNamespace string) {
×
1524
        // Cloning sources are mutually exclusive
×
1525
        if dv.Spec.Source.PVC != nil {
×
1526
                return "pvc", dv.Spec.Source.PVC.Name, dv.Spec.Source.PVC.Namespace
×
1527
        }
×
1528

1529
        if dv.Spec.Source.Snapshot != nil {
1530
                return "snapshot", dv.Spec.Source.Snapshot.Name, dv.Spec.Source.Snapshot.Namespace
×
1531
        }
×
1532

×
1533
        return "", "", ""
×
1534
}
×
1535

1536
// IsWaitForFirstConsumerEnabled tells us if we should respect "real" WFFC behavior or just let our worker pods randomly spawn
×
1537
func IsWaitForFirstConsumerEnabled(obj metav1.Object, gates featuregates.FeatureGates) (bool, error) {
×
1538
        // when PVC requests immediateBinding it cannot honor wffc logic
×
1539
        isImmediateBindingRequested := ImmediateBindingRequested(obj)
1540
        pvcHonorWaitForFirstConsumer := !isImmediateBindingRequested
×
1541
        globalHonorWaitForFirstConsumer, err := gates.HonorWaitForFirstConsumerEnabled()
1542
        if err != nil {
1543
                return false, err
1544
        }
×
1545

×
1546
        return pvcHonorWaitForFirstConsumer && globalHonorWaitForFirstConsumer, nil
×
1547
}
×
1548

×
1549
// AddImmediateBindingAnnotationIfWFFCDisabled adds the immediateBinding annotation if wffc feature gate is disabled
×
1550
func AddImmediateBindingAnnotationIfWFFCDisabled(obj metav1.Object, gates featuregates.FeatureGates) error {
×
1551
        globalHonorWaitForFirstConsumer, err := gates.HonorWaitForFirstConsumerEnabled()
×
1552
        if err != nil {
1553
                return err
×
1554
        }
1555
        if !globalHonorWaitForFirstConsumer {
1556
                AddAnnotation(obj, AnnImmediateBinding, "")
1557
        }
×
1558
        return nil
×
1559
}
×
1560

×
1561
// InflateSizeWithOverhead inflates a storage size with proper overhead calculations
×
1562
func InflateSizeWithOverhead(ctx context.Context, c client.Client, imgSize int64, pvcSpec *corev1.PersistentVolumeClaimSpec) (resource.Quantity, error) {
×
1563
        var returnSize resource.Quantity
×
1564

×
1565
        if util.ResolveVolumeMode(pvcSpec.VolumeMode) == corev1.PersistentVolumeFilesystem {
×
1566
                fsOverhead, err := GetFilesystemOverheadForStorageClass(ctx, c, pvcSpec.StorageClassName)
1567
                if err != nil {
1568
                        return resource.Quantity{}, err
1569
                }
×
1570
                // Parse filesystem overhead (percentage) into a 64-bit float
×
1571
                fsOverheadFloat, _ := strconv.ParseFloat(string(fsOverhead), 64)
×
1572

×
1573
                // Merge the previous values into a 'resource.Quantity' struct
×
1574
                requiredSpace := util.GetRequiredSpace(fsOverheadFloat, imgSize)
×
1575
                returnSize = *resource.NewScaledQuantity(requiredSpace, 0)
×
1576
        } else {
×
1577
                // Inflation is not needed with 'Block' mode
1578
                returnSize = *resource.NewScaledQuantity(imgSize, 0)
×
1579
        }
×
1580

×
1581
        return returnSize, nil
×
1582
}
×
1583

×
1584
// IsBound returns if the pvc is bound
×
1585
func IsBound(pvc *corev1.PersistentVolumeClaim) bool {
×
1586
        return pvc != nil && pvc.Status.Phase == corev1.ClaimBound
×
1587
}
1588

×
1589
// IsUnbound returns if the pvc is not bound yet
1590
func IsUnbound(pvc *corev1.PersistentVolumeClaim) bool {
1591
        return !IsBound(pvc)
1592
}
×
1593

×
1594
// IsLost returns if the pvc is lost
×
1595
func IsLost(pvc *corev1.PersistentVolumeClaim) bool {
×
1596
        return pvc != nil && pvc.Status.Phase == corev1.ClaimLost
×
1597
}
×
1598

×
1599
// IsImageStream returns true if registry source is ImageStream
×
1600
func IsImageStream(pvc *corev1.PersistentVolumeClaim) bool {
×
1601
        return pvc.Annotations[AnnRegistryImageStream] == "true"
1602
}
1603

1604
// ShouldIgnorePod checks if a pod should be ignored.
1605
// If this is a completed pod that was used for one checkpoint of a multi-stage import, it
1606
// should be ignored by pod lookups as long as the retainAfterCompletion annotation is set.
1607
func ShouldIgnorePod(pod *corev1.Pod, pvc *corev1.PersistentVolumeClaim) bool {
1608
        retain := pvc.ObjectMeta.Annotations[AnnPodRetainAfterCompletion]
×
1609
        checkpoint := pvc.ObjectMeta.Annotations[AnnCurrentCheckpoint]
×
1610
        if checkpoint != "" && pod.Status.Phase == corev1.PodSucceeded {
×
1611
                return retain == "true"
×
1612
        }
1613
        return false
×
1614
}
×
1615

×
1616
// BuildHTTPClient generates an http client that accepts any certificate, since we are using
×
1617
// it to get prometheus data it doesn't matter if someone can intercept the data. Once we have
1618
// a mechanism to properly sign the server, we can update this method to get a proper client.
×
1619
func BuildHTTPClient(httpClient *http.Client) *http.Client {
×
1620
        if httpClient == nil {
1621
                defaultTransport := http.DefaultTransport.(*http.Transport)
1622
                // Create new Transport that ignores self-signed SSL
1623
                //nolint:gosec
×
1624
                tr := &http.Transport{
×
1625
                        Proxy:                 defaultTransport.Proxy,
×
1626
                        DialContext:           defaultTransport.DialContext,
1627
                        MaxIdleConns:          defaultTransport.MaxIdleConns,
1628
                        IdleConnTimeout:       defaultTransport.IdleConnTimeout,
×
1629
                        ExpectContinueTimeout: defaultTransport.ExpectContinueTimeout,
×
1630
                        TLSHandshakeTimeout:   defaultTransport.TLSHandshakeTimeout,
×
1631
                        TLSClientConfig:       &tls.Config{InsecureSkipVerify: true},
1632
                }
1633
                httpClient = &http.Client{
×
1634
                        Transport: tr,
×
1635
                }
×
1636
        }
1637
        return httpClient
1638
}
×
1639

×
1640
// ErrConnectionRefused checks for connection refused errors
×
1641
func ErrConnectionRefused(err error) bool {
1642
        return strings.Contains(err.Error(), "connection refused")
1643
}
1644

1645
// GetPodMetricsPort returns, if exists, the metrics port from the passed pod
×
1646
func GetPodMetricsPort(pod *corev1.Pod) (int, error) {
×
1647
        for _, container := range pod.Spec.Containers {
×
1648
                for _, port := range container.Ports {
×
1649
                        if port.Name == "metrics" {
×
1650
                                return int(port.ContainerPort), nil
×
1651
                        }
×
1652
                }
1653
        }
1654
        return 0, errors.New("Metrics port not found in pod")
1655
}
1656

1657
// GetMetricsURL builds the metrics URL according to the specified pod
×
1658
func GetMetricsURL(pod *corev1.Pod) (string, error) {
×
1659
        if pod == nil {
×
1660
                return "", nil
×
1661
        }
×
1662
        port, err := GetPodMetricsPort(pod)
×
1663
        if err != nil || pod.Status.PodIP == "" {
×
1664
                return "", err
×
1665
        }
×
1666
        domain := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(port))
×
1667
        url := fmt.Sprintf("https://%s/metrics", domain)
×
1668
        return url, nil
×
1669
}
×
1670

×
1671
// GetProgressReportFromURL fetches the progress report from the passed URL according to an specific metric expression and ownerUID
×
1672
func GetProgressReportFromURL(ctx context.Context, url string, httpClient *http.Client, metricExp, ownerUID string) (string, error) {
×
1673
        regExp := regexp.MustCompile(fmt.Sprintf("(%s)\\{ownerUID\\=%q\\} (\\d{1,3}\\.?\\d*)", metricExp, ownerUID))
×
1674
        // pod could be gone, don't block an entire thread for 30 seconds
×
1675
        // just to get back an i/o timeout
×
1676
        ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
1677
        defer cancel()
1678
        req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
1679
        if err != nil {
×
1680
                return "", err
×
1681
        }
×
1682
        resp, err := httpClient.Do(req)
1683
        if err != nil {
1684
                if ErrConnectionRefused(err) {
1✔
1685
                        return "", nil
2✔
1686
                }
2✔
1687
                return "", err
2✔
1688
        }
1✔
1689
        defer resp.Body.Close()
1✔
1690
        body, err := io.ReadAll(resp.Body)
1691
        if err != nil {
1692
                return "", err
1✔
1693
        }
1694

1695
        // Parse the progress from the body
1696
        progressReport := ""
1✔
1697
        match := regExp.FindStringSubmatch(string(body))
1✔
1698
        if match != nil {
×
1699
                progressReport = match[len(match)-1]
×
1700
        }
1✔
1701
        return progressReport, nil
2✔
1702
}
1✔
1703

1✔
1704
// UpdateHTTPAnnotations updates the passed annotations for proper http import
1✔
1705
func UpdateHTTPAnnotations(annotations map[string]string, http *cdiv1.DataVolumeSourceHTTP) {
1✔
1706
        annotations[AnnEndpoint] = http.URL
1✔
1707
        annotations[AnnSource] = SourceHTTP
1708

1709
        if http.SecretRef != "" {
1710
                annotations[AnnSecret] = http.SecretRef
×
1711
        }
×
1712
        if http.CertConfigMap != "" {
×
1713
                annotations[AnnCertConfigMap] = http.CertConfigMap
×
1714
        }
×
1715
        for index, header := range http.ExtraHeaders {
×
1716
                annotations[fmt.Sprintf("%s.%d", AnnExtraHeaders, index)] = header
×
1717
        }
×
1718
        for index, header := range http.SecretExtraHeaders {
×
1719
                annotations[fmt.Sprintf("%s.%d", AnnSecretExtraHeaders, index)] = header
×
1720
        }
×
1721
}
×
1722

×
1723
// UpdateS3Annotations updates the passed annotations for proper S3 import
×
1724
func UpdateS3Annotations(annotations map[string]string, s3 *cdiv1.DataVolumeSourceS3) {
×
1725
        annotations[AnnEndpoint] = s3.URL
×
1726
        annotations[AnnSource] = SourceS3
1727
        if s3.SecretRef != "" {
×
1728
                annotations[AnnSecret] = s3.SecretRef
×
1729
        }
×
1730
        if s3.CertConfigMap != "" {
×
1731
                annotations[AnnCertConfigMap] = s3.CertConfigMap
×
1732
        }
1733
}
1734

×
1735
// UpdateGCSAnnotations updates the passed annotations for proper GCS import
×
1736
func UpdateGCSAnnotations(annotations map[string]string, gcs *cdiv1.DataVolumeSourceGCS) {
×
1737
        annotations[AnnEndpoint] = gcs.URL
×
1738
        annotations[AnnSource] = SourceGCS
×
1739
        if gcs.SecretRef != "" {
×
1740
                annotations[AnnSecret] = gcs.SecretRef
1741
        }
1742
}
1743

×
1744
// UpdateRegistryAnnotations updates the passed annotations for proper registry import
×
1745
func UpdateRegistryAnnotations(annotations map[string]string, registry *cdiv1.DataVolumeSourceRegistry) {
×
1746
        annotations[AnnSource] = SourceRegistry
×
1747
        pullMethod := registry.PullMethod
×
1748
        if pullMethod != nil && *pullMethod != "" {
×
1749
                annotations[AnnRegistryImportMethod] = string(*pullMethod)
×
1750
        }
×
1751
        url := registry.URL
×
1752
        if url != nil && *url != "" {
×
1753
                annotations[AnnEndpoint] = *url
×
1754
        } else {
×
1755
                imageStream := registry.ImageStream
×
1756
                if imageStream != nil && *imageStream != "" {
×
1757
                        annotations[AnnEndpoint] = *imageStream
×
1758
                        annotations[AnnRegistryImageStream] = "true"
×
1759
                }
×
1760
        }
×
1761
        secretRef := registry.SecretRef
×
1762
        if secretRef != nil && *secretRef != "" {
1763
                annotations[AnnSecret] = *secretRef
1764
        }
1765
        certConfigMap := registry.CertConfigMap
×
1766
        if certConfigMap != nil && *certConfigMap != "" {
×
1767
                annotations[AnnCertConfigMap] = *certConfigMap
×
1768
        }
×
1769

×
1770
        if registry.Platform != nil && registry.Platform.Architecture != "" {
×
1771
                annotations[AnnRegistryImageArchitecture] = registry.Platform.Architecture
×
1772
        }
×
1773
}
×
1774

1775
// UpdateVDDKAnnotations updates the passed annotations for proper VDDK import
1776
func UpdateVDDKAnnotations(annotations map[string]string, vddk *cdiv1.DataVolumeSourceVDDK) {
1777
        annotations[AnnEndpoint] = vddk.URL
×
1778
        annotations[AnnSource] = SourceVDDK
×
1779
        annotations[AnnSecret] = vddk.SecretRef
×
1780
        annotations[AnnBackingFile] = vddk.BackingFile
×
1781
        annotations[AnnUUID] = vddk.UUID
×
1782
        annotations[AnnThumbprint] = vddk.Thumbprint
×
1783
        if vddk.InitImageURL != "" {
1784
                annotations[AnnVddkInitImageURL] = vddk.InitImageURL
1785
        }
1786
        if vddk.ExtraArgs != "" {
×
1787
                annotations[AnnVddkExtraArgs] = vddk.ExtraArgs
×
1788
        }
×
1789
}
×
1790

×
1791
// UpdateImageIOAnnotations updates the passed annotations for proper imageIO import
×
1792
func UpdateImageIOAnnotations(annotations map[string]string, imageio *cdiv1.DataVolumeSourceImageIO) {
×
1793
        annotations[AnnEndpoint] = imageio.URL
×
1794
        annotations[AnnSource] = SourceImageio
×
1795
        annotations[AnnSecret] = imageio.SecretRef
×
1796
        annotations[AnnCertConfigMap] = imageio.CertConfigMap
×
1797
        annotations[AnnDiskID] = imageio.DiskID
×
1798
        if imageio.InsecureSkipVerify != nil && *imageio.InsecureSkipVerify {
×
1799
                annotations[AnnInsecureSkipVerify] = "true"
×
1800
        }
×
1801
}
1802

×
1803
// IsPVBoundToPVC checks if a PV is bound to a specific PVC
×
1804
func IsPVBoundToPVC(pv *corev1.PersistentVolume, pvc *corev1.PersistentVolumeClaim) bool {
×
1805
        claimRef := pv.Spec.ClaimRef
×
1806
        return claimRef != nil && claimRef.Name == pvc.Name && claimRef.Namespace == pvc.Namespace && claimRef.UID == pvc.UID
×
1807
}
×
1808

×
1809
// Rebind binds the PV of source to target
×
1810
func Rebind(ctx context.Context, c client.Client, source, target *corev1.PersistentVolumeClaim) error {
1811
        pv := &corev1.PersistentVolume{
×
1812
                ObjectMeta: metav1.ObjectMeta{
×
1813
                        Name: source.Spec.VolumeName,
×
1814
                },
1815
        }
1816

1817
        if err := c.Get(ctx, client.ObjectKeyFromObject(pv), pv); err != nil {
×
1818
                return err
×
1819
        }
×
1820

×
1821
        // Examine the claimref for the PV and see if it's still bound to PVC'
×
1822
        if pv.Spec.ClaimRef == nil {
×
1823
                return fmt.Errorf("PV %s claimRef is nil", pv.Name)
×
1824
        }
×
1825

×
1826
        if !IsPVBoundToPVC(pv, source) {
×
1827
                // Something is not right if the PV is neither bound to PVC' nor target PVC
×
1828
                if !IsPVBoundToPVC(pv, target) {
×
1829
                        klog.Errorf("PV bound to unexpected PVC: Could not rebind to target PVC '%s'", target.Name)
×
1830
                        return fmt.Errorf("PV %s bound to unexpected claim %s", pv.Name, pv.Spec.ClaimRef.Name)
1831
                }
1832
                // our work is done
1833
                return nil
×
1834
        }
×
1835

×
1836
        // Rebind PVC to target PVC
×
1837
        pv.Spec.ClaimRef = &corev1.ObjectReference{
×
1838
                Namespace:       target.Namespace,
×
1839
                Name:            target.Name,
×
1840
                UID:             target.UID,
×
1841
                ResourceVersion: target.ResourceVersion,
×
1842
        }
1843
        klog.V(3).Info("Rebinding PV to target PVC", "PVC", target.Name)
1844
        if err := c.Update(context.TODO(), pv); err != nil {
1845
                return err
1✔
1846
        }
1✔
1847

1✔
1848
        return nil
1✔
1849
}
1850

1851
// BulkDeleteResources deletes a bunch of resources
1✔
1852
func BulkDeleteResources(ctx context.Context, c client.Client, obj client.ObjectList, lo client.ListOption) error {
1✔
1853
        if err := c.List(ctx, obj, lo); err != nil {
1✔
1854
                if meta.IsNoMatchError(err) {
1✔
1855
                        return nil
1✔
1856
                }
1✔
1857
                return err
1✔
1858
        }
2✔
1859

1✔
1860
        sv := reflect.ValueOf(obj).Elem()
1✔
1861
        iv := sv.FieldByName("Items")
1862

1863
        for i := 0; i < iv.Len(); i++ {
1✔
1864
                obj := iv.Index(i).Addr().Interface().(client.Object)
×
1865
                if obj.GetDeletionTimestamp().IsZero() {
×
1866
                        klog.V(3).Infof("Deleting type %+v %+v", reflect.TypeOf(obj), obj)
1867
                        if err := c.Delete(ctx, obj); err != nil {
2✔
1868
                                return err
1✔
1869
                        }
2✔
1870
                }
1✔
1871
        }
1✔
1872

1✔
1873
        return nil
1874
}
1✔
1875

1876
// ValidateSnapshotCloneSize does proper size validation when doing a clone from snapshot operation
1877
func ValidateSnapshotCloneSize(snapshot *snapshotv1.VolumeSnapshot, pvcSpec *corev1.PersistentVolumeClaimSpec, targetSC *storagev1.StorageClass, log logr.Logger) (bool, error) {
1878
        restoreSize := snapshot.Status.RestoreSize
1✔
1879
        if restoreSize == nil {
1✔
1880
                return false, fmt.Errorf("snapshot has no RestoreSize")
1✔
1881
        }
1✔
1882
        targetRequest, hasTargetRequest := pvcSpec.Resources.Requests[corev1.ResourceStorage]
1✔
1883
        allowExpansion := targetSC.AllowVolumeExpansion != nil && *targetSC.AllowVolumeExpansion
1✔
1884
        if hasTargetRequest {
1✔
1885
                // otherwise will just use restoreSize
1✔
1886
                if restoreSize.Cmp(targetRequest) < 0 && !allowExpansion {
×
1887
                        log.V(3).Info("Can't expand restored PVC because SC does not allow expansion, need to fall back to host assisted")
×
1888
                        return false, nil
1889
                }
1✔
1890
        }
1891
        return true, nil
1892
}
1893

×
1894
// ValidateSnapshotCloneProvisioners validates the target PVC storage class against the snapshot class provisioner
×
1895
func ValidateSnapshotCloneProvisioners(vsc *snapshotv1.VolumeSnapshotContent, storageClass *storagev1.StorageClass) (bool, error) {
×
1896
        // Do snapshot and storage class validation
×
1897
        if storageClass == nil {
×
1898
                return false, fmt.Errorf("target storage class not found")
×
1899
        }
1900
        if storageClass.Provisioner != vsc.Spec.Driver {
1901
                return false, nil
×
1902
        }
×
1903
        // TODO: get sourceVolumeMode from volumesnapshotcontent and validate against target spec
×
1904
        // currently don't have CRDs in CI with sourceVolumeMode which is pretty new
×
1905
        // converting volume mode is possible but has security implications
×
1906
        return true, nil
×
1907
}
×
1908

×
1909
// GetSnapshotClassForSmartClone looks up the snapshot class based on the storage class
×
1910
func GetSnapshotClassForSmartClone(pvc *corev1.PersistentVolumeClaim, targetPvcStorageClassName, snapshotClassName *string, log logr.Logger, client client.Client, recorder record.EventRecorder) (string, error) {
×
1911
        logger := log.WithName("GetSnapshotClassForSmartClone").V(3)
1912
        // Check if relevant CRDs are available
1913
        if !isCsiCrdsDeployed(client, log) {
1914
                logger.Info("Missing CSI snapshotter CRDs, falling back to host assisted clone")
×
1915
                return "", nil
1916
        }
1917

1918
        targetStorageClass, err := GetStorageClassByNameWithK8sFallback(context.TODO(), client, targetPvcStorageClassName)
×
1919
        if err != nil {
×
1920
                return "", err
×
1921
        }
×
1922
        if targetStorageClass == nil {
×
1923
                logger.Info("Target PVC's Storage Class not found")
×
1924
                return "", nil
×
1925
        }
×
1926

×
1927
        vscName, err := GetVolumeSnapshotClass(context.TODO(), client, pvc, targetStorageClass.Provisioner, snapshotClassName, logger, recorder)
×
1928
        if err != nil {
×
1929
                return "", err
×
1930
        }
×
1931
        if vscName != nil {
1932
                if pvc != nil {
×
1933
                        logger.Info("smart-clone is applicable for datavolume", "datavolume",
1934
                                pvc.Name, "snapshot class", *vscName)
1935
                }
1936
                return *vscName, nil
×
1937
        }
×
1938

×
1939
        logger.Info("Could not match snapshotter with storage class, falling back to host assisted clone")
×
1940
        return "", nil
×
1941
}
×
1942

×
1943
// GetVolumeSnapshotClass looks up the snapshot class based on the driver and an optional specific name
×
1944
// In case of multiple candidates, it returns the default-annotated one, or the sorted list first one if no such default
1945
func GetVolumeSnapshotClass(ctx context.Context, c client.Client, pvc *corev1.PersistentVolumeClaim, driver string, snapshotClassName *string, log logr.Logger, recorder record.EventRecorder) (*string, error) {
1946
        logger := log.WithName("GetVolumeSnapshotClass").V(3)
1947

×
1948
        logEvent := func(message, vscName string) {
1949
                logger.Info(message, "name", vscName)
1950
                if pvc != nil {
1951
                        msg := fmt.Sprintf("%s %s", message, vscName)
×
1952
                        recorder.Event(pvc, corev1.EventTypeNormal, VolumeSnapshotClassSelected, msg)
×
1953
                }
×
1954
        }
×
1955

×
1956
        if snapshotClassName != nil {
×
1957
                vsc := &snapshotv1.VolumeSnapshotClass{}
×
1958
                if err := c.Get(context.TODO(), types.NamespacedName{Name: *snapshotClassName}, vsc); err != nil {
1959
                        return nil, err
×
1960
                }
×
1961
                if vsc.Driver == driver {
×
1962
                        logEvent(MessageStorageProfileVolumeSnapshotClassSelected, vsc.Name)
×
1963
                        return snapshotClassName, nil
×
1964
                }
×
1965
                return nil, nil
×
1966
        }
×
1967

1968
        vscList := &snapshotv1.VolumeSnapshotClassList{}
×
1969
        if err := c.List(ctx, vscList); err != nil {
×
1970
                if meta.IsNoMatchError(err) {
×
1971
                        return nil, nil
×
1972
                }
×
1973
                return nil, err
×
1974
        }
×
1975

×
1976
        var candidates []string
×
1977
        for _, vsc := range vscList.Items {
×
1978
                if vsc.Driver == driver {
1979
                        if vsc.Annotations[AnnDefaultSnapshotClass] == "true" {
1980
                                logEvent(MessageDefaultVolumeSnapshotClassSelected, vsc.Name)
×
1981
                                vscName := vsc.Name
×
1982
                                return &vscName, nil
1983
                        }
1984
                        candidates = append(candidates, vsc.Name)
1985
                }
1986
        }
×
1987

×
1988
        if len(candidates) > 0 {
×
1989
                sort.Strings(candidates)
×
1990
                logEvent(MessageFirstVolumeSnapshotClassSelected, candidates[0])
×
1991
                return &candidates[0], nil
×
1992
        }
×
1993

×
1994
        return nil, nil
×
1995
}
1996

1997
// isCsiCrdsDeployed checks whether the CSI snapshotter CRD are deployed
×
1998
func isCsiCrdsDeployed(c client.Client, log logr.Logger) bool {
×
1999
        version := "v1"
×
2000
        vsClass := "volumesnapshotclasses." + snapshotv1.GroupName
×
2001
        vsContent := "volumesnapshotcontents." + snapshotv1.GroupName
×
2002
        vs := "volumesnapshots." + snapshotv1.GroupName
×
2003

×
2004
        return isCrdDeployed(c, vsClass, version, log) &&
×
2005
                isCrdDeployed(c, vsContent, version, log) &&
×
2006
                isCrdDeployed(c, vs, version, log)
×
2007
}
2008

2009
// isCrdDeployed checks whether a CRD is deployed
×
2010
func isCrdDeployed(c client.Client, name, version string, log logr.Logger) bool {
×
2011
        crd := &extv1.CustomResourceDefinition{}
×
2012
        err := c.Get(context.TODO(), types.NamespacedName{Name: name}, crd)
×
2013
        if err != nil {
×
2014
                if !k8serrors.IsNotFound(err) {
×
2015
                        log.Info("Error looking up CRD", "crd name", name, "version", version, "error", err)
2016
                }
2017
                return false
×
2018
        }
×
2019

×
2020
        for _, v := range crd.Spec.Versions {
×
2021
                if v.Name == version && v.Served {
×
2022
                        return true
×
2023
                }
×
2024
        }
×
2025

×
2026
        return false
2027
}
2028

2029
// IsSnapshotReady indicates if a volume snapshot is ready to be used
×
2030
func IsSnapshotReady(snapshot *snapshotv1.VolumeSnapshot) bool {
×
2031
        return snapshot.Status != nil && snapshot.Status.ReadyToUse != nil && *snapshot.Status.ReadyToUse
×
2032
}
×
2033

×
2034
// GetResource updates given obj with the data of the object with the same name and namespace
2035
func GetResource(ctx context.Context, c client.Client, namespace, name string, obj client.Object) (bool, error) {
×
2036
        obj.SetNamespace(namespace)
2037
        obj.SetName(name)
2038

2039
        err := c.Get(ctx, client.ObjectKeyFromObject(obj), obj)
×
2040
        if err != nil {
×
2041
                if k8serrors.IsNotFound(err) {
×
2042
                        return false, nil
×
2043
                }
×
2044

×
2045
                return false, err
×
2046
        }
×
2047

×
2048
        return true, nil
×
2049
}
2050

2051
// PatchArgs are the args for Patch
×
2052
type PatchArgs struct {
×
2053
        Client client.Client
×
2054
        Log    logr.Logger
×
2055
        Obj    client.Object
×
2056
        OldObj client.Object
×
2057
}
×
2058

×
2059
// GetAnnotatedEventSource returns resource referenced by AnnEventSource annotations
2060
func GetAnnotatedEventSource(ctx context.Context, c client.Client, obj client.Object) (client.Object, error) {
2061
        esk, ok := obj.GetAnnotations()[AnnEventSourceKind]
×
2062
        if !ok {
×
2063
                return obj, nil
×
2064
        }
×
2065
        if esk != "PersistentVolumeClaim" {
2066
                return obj, nil
2067
        }
×
2068
        es, ok := obj.GetAnnotations()[AnnEventSource]
2069
        if !ok {
2070
                return obj, nil
2071
        }
×
2072
        namespace, name, err := cache.SplitMetaNamespaceKey(es)
×
2073
        if err != nil {
×
2074
                return nil, err
2075
        }
2076
        pvc := &corev1.PersistentVolumeClaim{
×
2077
                ObjectMeta: metav1.ObjectMeta{
×
2078
                        Namespace: namespace,
×
2079
                        Name:      name,
×
2080
                },
×
2081
        }
×
2082
        if err := c.Get(ctx, client.ObjectKeyFromObject(pvc), pvc); err != nil {
×
2083
                return nil, err
×
2084
        }
×
2085
        return pvc, nil
2086
}
×
2087

2088
// OwnedByDataVolume returns true if the object is owned by a DataVolume
2089
func OwnedByDataVolume(obj metav1.Object) bool {
×
2090
        owner := metav1.GetControllerOf(obj)
2091
        return owner != nil && owner.Kind == "DataVolume"
2092
}
2093

2094
// CopyAllowedAnnotations copies the allowed annotations from the source object
2095
// to the destination object
2096
func CopyAllowedAnnotations(srcObj, dstObj metav1.Object) {
2097
        for ann, def := range allowedAnnotations {
2098
                val, ok := srcObj.GetAnnotations()[ann]
2099
                if !ok && def != "" {
2100
                        val = def
2101
                }
×
2102
                if val != "" {
×
2103
                        klog.V(1).Info("Applying annotation", "Name", dstObj.GetName(), ann, val)
×
2104
                        AddAnnotation(dstObj, ann, val)
×
2105
                }
×
2106
        }
×
2107
}
×
2108

×
2109
// CopyAllowedLabels copies allowed labels matching the validLabelsMatch regexp from the
×
2110
// source map to the destination object allowing overwrites
×
2111
func CopyAllowedLabels(srcLabels map[string]string, dstObj metav1.Object, overwrite bool) {
×
2112
        for label, value := range srcLabels {
×
2113
                if _, found := dstObj.GetLabels()[label]; (!found || overwrite) && validLabelsMatch.MatchString(label) {
×
2114
                        AddLabel(dstObj, label, value)
×
2115
                }
×
2116
        }
×
2117
}
×
2118

×
2119
// ClaimMayExistBeforeDataVolume returns true if the PVC may exist before the DataVolume
×
2120
func ClaimMayExistBeforeDataVolume(c client.Client, pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) (bool, error) {
×
2121
        if ClaimIsPopulatedForDataVolume(pvc, dv) {
×
2122
                return true, nil
×
2123
        }
×
2124
        return AllowClaimAdoption(c, pvc, dv)
×
2125
}
×
2126

×
2127
// ClaimIsPopulatedForDataVolume returns true if the PVC is populated for the given DataVolume
2128
func ClaimIsPopulatedForDataVolume(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) bool {
2129
        return pvc != nil && dv != nil && pvc.Annotations[AnnPopulatedFor] == dv.Name
2130
}
×
2131

×
2132
// AllowClaimAdoption returns true if the PVC may be adopted
×
2133
func AllowClaimAdoption(c client.Client, pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) (bool, error) {
×
2134
        if pvc == nil || dv == nil {
2135
                return false, nil
2136
        }
2137
        anno, ok := pvc.Annotations[AnnCreatedForDataVolume]
×
2138
        if ok && anno == string(dv.UID) {
×
2139
                return false, nil
×
2140
        }
×
2141
        anno, ok = dv.Annotations[AnnAllowClaimAdoption]
×
2142
        // if annotation exists, go with that regardless of featuregate
×
2143
        if ok {
×
2144
                val, _ := strconv.ParseBool(anno)
×
2145
                return val, nil
×
2146
        }
×
2147
        return featuregates.NewFeatureGates(c).ClaimAdoptionEnabled()
2148
}
2149

2150
// ResolveDataSourceChain resolves a DataSource reference.
2151
// Returns an error if DataSource reference is not found or
2152
// DataSource reference points to another DataSource
1✔
2153
func ResolveDataSourceChain(ctx context.Context, client client.Client, dataSource *cdiv1.DataSource) (*cdiv1.DataSource, error) {
2✔
2154
        if dataSource.Spec.Source.DataSource == nil {
2✔
2155
                return dataSource, nil
1✔
2156
        }
1✔
2157

2158
        ref := dataSource.Spec.Source.DataSource
2159
        refNs := GetNamespace(ref.Namespace, dataSource.Namespace)
2160
        if dataSource.Namespace != refNs {
2161
                return dataSource, ErrDataSourceCrossNamespace
×
2162
        }
×
2163
        if ref.Name == dataSource.Name && refNs == dataSource.Namespace {
×
2164
                return nil, ErrDataSourceSelfReference
×
2165
        }
×
2166

2167
        resolved := &cdiv1.DataSource{}
2168
        if err := client.Get(ctx, types.NamespacedName{Name: ref.Name, Namespace: refNs}, resolved); err != nil {
2169
                return nil, err
×
2170
        }
×
2171

×
2172
        if resolved.Spec.Source.DataSource != nil {
2173
                return nil, ErrDataSourceMaxDepthReached
2174
        }
×
2175

×
2176
        return resolved, nil
×
2177
}
×
2178

×
NEW
2179
// Sorts events in time based order, prioritizing events that were emitted from prime PVCs
×
NEW
2180
func sortEvents(events *corev1.EventList, pvcPrimeName string) {
×
2181
        sort.Slice(events.Items, func(i, j int) bool {
×
NEW
2182
                if pvcPrimeName != "" {
×
2183
                        firstContainsPrime := strings.Contains(events.Items[i].Message, pvcPrimeName)
×
2184
                        secondContainsPrime := strings.Contains(events.Items[j].Message, pvcPrimeName)
×
2185

×
2186
                        if firstContainsPrime && !secondContainsPrime {
×
2187
                                return true
×
2188
                        }
×
2189
                        if !firstContainsPrime && secondContainsPrime {
2190
                                return false
2191
                        }
2192
                }
2193

2194
                // if the timestamps are the same, prioritze longer messages to make sure our sorting is deterministic
×
2195
                if events.Items[i].LastTimestamp.Time.Equal(events.Items[j].LastTimestamp.Time) {
×
2196
                        return len(events.Items[i].Message) > len(events.Items[j].Message)
×
2197
                }
×
2198

2199
                // if both contains primeName substring or neither, just sort on timestamp
×
2200
                return events.Items[i].LastTimestamp.Time.After(events.Items[j].LastTimestamp.Time)
×
2201
        })
×
2202
}
×
2203

×
NEW
2204
func getLatestEventMessage(pvc *corev1.PersistentVolumeClaim, primeName string, c client.Client) string {
×
NEW
2205
        events := &corev1.EventList{}
×
NEW
2206
        err := c.List(context.TODO(), events,
×
2207
                client.InNamespace(pvc.GetNamespace()),
NEW
2208
                client.MatchingFields{"involvedObject.name": pvc.GetName(),
×
NEW
2209
                        "involvedObject.uid": string(pvc.GetUID())},
×
NEW
2210
        )
×
NEW
2211
        if err != nil {
×
2212
                klog.Info("Unable to list events for pvc", "pvc", pvc.Name)
NEW
2213
                return ""
×
NEW
2214
        }
×
NEW
2215

×
2216
        if len(events.Items) == 0 {
NEW
2217
                return ""
×
2218
        }
2219

2220
        sortEvents(events, primeName)
2221

1✔
2222
        if primeName != "" {
2✔
2223
                // prime events get emitted on target PVCs with their name prefixed in square brackets
2✔
2224
                fmtPrimeName := fmt.Sprintf("[%s] : ", primeName)
1✔
2225
                primeIdx := strings.Index(events.Items[0].Message, fmtPrimeName)
1✔
2226
                if primeIdx == -1 {
1✔
2227
                        return ""
2✔
2228
                }
1✔
2229
                return events.Items[0].Message[primeIdx+len(fmtPrimeName):]
1✔
2230
        }
2✔
2231

1✔
2232
        return events.Items[0].Message
1✔
2233
}
2234

2235
// UpdatePVCBoundContion updates the bound condition annotations on the target PVC
2236
//
1✔
NEW
2237
// For populators, the message will come from the prime PVCs bound condition if explicitly set,
×
NEW
2238
// otherwise it will use the prime PVC's latest event message
×
2239
//
2240
// For non-populators, the bound message will come from the PVCs latest event message
2241
func UpdatePVCBoundContion(pvc *corev1.PersistentVolumeClaim, c client.Client) error {
1✔
2242
        currentPvcCopy := pvc.DeepCopy()
2243

2244
        anno := pvc.GetAnnotations()
2245
        if anno == nil {
×
2246
                return nil
×
UNCOV
2247
        }
×
2248

×
NEW
2249
        // we only want to update bound conditions for non prime pvcs
×
NEW
2250
        if _, exists := anno[AnnPopulatorKind]; exists {
×
NEW
2251
                return nil
×
NEW
2252
        }
×
NEW
2253

×
2254
        if IsBound(pvc) {
×
2255
                delete(anno, AnnBoundCondition)
×
2256
                delete(anno, AnnBoundConditionReason)
UNCOV
2257
                delete(anno, AnnBoundConditionMessage)
×
UNCOV
2258

×
UNCOV
2259
                if !reflect.DeepEqual(currentPvcCopy, pvc) {
×
2260
                        patch := client.MergeFrom(currentPvcCopy)
UNCOV
2261
                        if err := c.Patch(context.TODO(), pvc, patch); err != nil {
×
UNCOV
2262
                                return err
×
UNCOV
2263
                        }
×
UNCOV
2264
                }
×
UNCOV
2265

×
UNCOV
2266
                return nil
×
UNCOV
2267
        }
×
UNCOV
2268

×
UNCOV
2269
        if pvc.Status.Phase != corev1.ClaimPending {
×
UNCOV
2270
                return nil
×
2271
        }
2272

UNCOV
2273
        boundMessage := ""
×
2274
        primeName, usingPopulator := anno[AnnPVCPrimeName]
2275

2276
        if usingPopulator {
2277
                pvcPrime := &corev1.PersistentVolumeClaim{}
2278
                if err := c.Get(context.TODO(), types.NamespacedName{Name: primeName, Namespace: pvc.GetNamespace()}, pvcPrime); err != nil {
2279
                        if k8serrors.IsNotFound(err) {
2280
                                klog.V(4).Info("Could not find prime pvc associated to this pvc", "pvc", pvc.Name)
2281
                                return nil
NEW
2282
                        }
×
NEW
2283
                        return err
×
NEW
2284
                }
×
2285

×
NEW
2286
                // prioritize the prime PVC's bound condition message over events
×
NEW
2287
                if msg, exists := pvcPrime.Annotations[AnnBoundConditionMessage]; exists {
×
NEW
2288
                        boundMessage = msg
×
2289
                } else {
2290
                        boundMessage = getLatestEventMessage(pvc, primeName, c)
UNCOV
2291
                }
×
UNCOV
2292
        } else {
×
NEW
2293
                boundMessage = getLatestEventMessage(pvc, primeName, c)
×
2294
        }
NEW
2295

×
NEW
2296
        // return early if no message was found
×
NEW
2297
        if boundMessage == "" {
×
NEW
2298
                return nil
×
UNCOV
2299
        }
×
UNCOV
2300

×
2301
        // since we checked status of phase above, we know this is pending
×
2302
        anno[AnnBoundCondition] = "false"
×
2303
        anno[AnnBoundConditionReason] = "Pending"
×
2304
        anno[AnnBoundConditionMessage] = boundMessage
×
2305

2306
        patch := client.MergeFrom(currentPvcCopy)
2307
        if err := c.Patch(context.TODO(), pvc, patch); err != nil {
×
2308
                return err
2309
        }
2310

×
2311
        return nil
×
2312
}
×
2313

UNCOV
2314
// CopyEvents gets srcPvc events and re-emits them on the target PVC with the src name prefix
×
2315
func CopyEvents(srcPVC, targetPVC client.Object, c client.Client, recorder record.EventRecorder) {
×
2316
        newEvents := &corev1.EventList{}
×
2317
        err := c.List(context.TODO(), newEvents,
×
2318
                client.InNamespace(srcPVC.GetNamespace()),
×
2319
                client.MatchingFields{"involvedObject.name": srcPVC.GetName(),
×
2320
                        "involvedObject.uid": string(srcPVC.GetUID())},
×
2321
        )
×
2322

×
2323
        if err != nil {
×
2324
                klog.Error(err, "Could not retrieve srcPVC list of Events")
×
2325
        }
2326

2327
        currEvents := &corev1.EventList{}
2328
        err = c.List(context.TODO(), currEvents,
×
2329
                client.InNamespace(targetPVC.GetNamespace()),
×
2330
                client.MatchingFields{"involvedObject.name": targetPVC.GetName(),
×
2331
                        "involvedObject.uid": string(targetPVC.GetUID())},
×
2332
        )
×
2333

×
2334
        if err != nil {
×
2335
                klog.Error(err, "Could not retrieve targetPVC list of Events")
×
2336
        }
2337

2338
        // use this to hash each message for quick lookup, value is unused
×
UNCOV
2339
        eventMap := map[string]struct{}{}
×
UNCOV
2340

×
2341
        for _, event := range currEvents.Items {
2342
                eventMap[event.Message] = struct{}{}
2343
        }
×
2344

×
2345
        for _, newEvent := range newEvents.Items {
×
2346
                msg := newEvent.Message
×
2347

×
2348
                // check if target PVC already has this equivalent event
×
UNCOV
2349
                if _, exists := eventMap[msg]; exists {
×
2350
                        continue
×
2351
                }
UNCOV
2352

×
2353
                // format new event message to indicate that it came from the src pvc
2354
                formattedMsg := fmt.Sprintf("[%s] : %s", srcPVC.GetName(), msg)
2355

2356
                // check if we already emitted this event with the src prefix
×
2357
                if _, exists := eventMap[formattedMsg]; exists {
×
2358
                        continue
×
2359
                }
×
NEW
2360

×
2361
                recorder.Event(targetPVC, newEvent.Type, newEvent.Reason, formattedMsg)
×
2362
        }
×
2363
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc