• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 10213342106

02 Aug 2024 09:30AM UTC coverage: 74.235% (+1.0%) from 73.249%
10213342106

Pull #1532

github

k8s-infra-cherrypick-robot
fix volume cloning and add e2e
Pull Request #1532: [release-1.24] cleanup: refactor volume cloning

37 of 51 new or added lines in 1 file covered. (72.55%)

1 existing line in 1 file now uncovered.

2233 of 3008 relevant lines covered (74.24%)

7.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.52
/pkg/blob/controllerserver.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package blob
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "os"
24
        "os/exec"
25
        "strconv"
26
        "strings"
27
        "time"
28

29
        "google.golang.org/grpc/codes"
30
        "google.golang.org/grpc/status"
31

32
        "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
33
        "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage"
34
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
35
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
36
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
37
        "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage"
38
        azstorage "github.com/Azure/azure-sdk-for-go/storage"
39
        "github.com/container-storage-interface/spec/lib/go/csi"
40

41
        "k8s.io/apimachinery/pkg/util/wait"
42
        "k8s.io/klog/v2"
43
        "k8s.io/utils/pointer"
44

45
        "sigs.k8s.io/blob-csi-driver/pkg/util"
46
        "sigs.k8s.io/cloud-provider-azure/pkg/azclient/blobcontainerclient"
47
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
48
        "sigs.k8s.io/cloud-provider-azure/pkg/metrics"
49
        "sigs.k8s.io/cloud-provider-azure/pkg/provider"
50
        azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
51
)
52

53
const (
54
        privateEndpoint = "privateendpoint"
55

56
        azcopyAutoLoginType             = "AZCOPY_AUTO_LOGIN_TYPE"
57
        azcopySPAApplicationID          = "AZCOPY_SPA_APPLICATION_ID"
58
        azcopySPAClientSecret           = "AZCOPY_SPA_CLIENT_SECRET"
59
        azcopyTenantID                  = "AZCOPY_TENANT_ID"
60
        azcopyMSIClientID               = "AZCOPY_MSI_CLIENT_ID"
61
        MSI                             = "MSI"
62
        SPN                             = "SPN"
63
        authorizationPermissionMismatch = "AuthorizationPermissionMismatch"
64

65
        createdByMetadata = "createdBy"
66
)
67

68
// CreateVolume provisions a volume
69
func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
22✔
70
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
22✔
71
                klog.Errorf("invalid create volume req: %v", req)
×
72
                return nil, err
×
73
        }
×
74

75
        volName := req.GetName()
22✔
76
        if len(volName) == 0 {
23✔
77
                return nil, status.Error(codes.InvalidArgument, "CreateVolume Name must be provided")
1✔
78
        }
1✔
79

80
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
23✔
81
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
82
        }
2✔
83

84
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
19✔
85
        requestGiB := int(util.RoundUpGiB(volSizeBytes))
19✔
86

19✔
87
        volContentSource := req.GetVolumeContentSource()
19✔
88
        secrets := req.GetSecrets()
19✔
89

19✔
90
        parameters := req.GetParameters()
19✔
91
        if parameters == nil {
20✔
92
                parameters = make(map[string]string)
1✔
93
        }
1✔
94
        var storageAccountType, subsID, resourceGroup, location, account, containerName, containerNamePrefix, protocol, customTags, secretName, secretNamespace, pvcNamespace, tagValueDelimiter string
19✔
95
        var isHnsEnabled, requireInfraEncryption, enableBlobVersioning, createPrivateEndpoint, enableNfsV3, allowSharedKeyAccess *bool
19✔
96
        var vnetResourceGroup, vnetName, subnetName, accessTier, networkEndpointType, storageEndpointSuffix, fsGroupChangePolicy string
19✔
97
        var matchTags, useDataPlaneAPI, getLatestAccountKey bool
19✔
98
        var softDeleteBlobs, softDeleteContainers int32
19✔
99
        var vnetResourceIDs []string
19✔
100
        var err error
19✔
101
        // set allowBlobPublicAccess as false by default
19✔
102
        allowBlobPublicAccess := pointer.Bool(false)
19✔
103

19✔
104
        containerNameReplaceMap := map[string]string{}
19✔
105

19✔
106
        // store account key to k8s secret by default
19✔
107
        storeAccountKey := true
19✔
108

19✔
109
        // Apply ProvisionerParameters (case-insensitive). We leave validation of
19✔
110
        // the values to the cloud provider.
19✔
111
        for k, v := range parameters {
114✔
112
                switch strings.ToLower(k) {
95✔
113
                case skuNameField:
9✔
114
                        storageAccountType = v
9✔
115
                case storageAccountTypeField:
10✔
116
                        storageAccountType = v
10✔
117
                case locationField:
9✔
118
                        location = v
9✔
119
                case storageAccountField:
10✔
120
                        account = v
10✔
121
                case subscriptionIDField:
×
122
                        subsID = v
×
123
                case resourceGroupField:
9✔
124
                        resourceGroup = v
9✔
125
                case containerNameField:
10✔
126
                        containerName = v
10✔
127
                case containerNamePrefixField:
2✔
128
                        containerNamePrefix = v
2✔
129
                case protocolField:
9✔
130
                        protocol = v
9✔
131
                case tagsField:
1✔
132
                        customTags = v
1✔
133
                case matchTagsField:
1✔
134
                        matchTags = strings.EqualFold(v, trueValue)
1✔
135
                case secretNameField:
×
136
                        secretName = v
×
137
                case secretNamespaceField:
×
138
                        secretNamespace = v
×
139
                case isHnsEnabledField:
×
140
                        if strings.EqualFold(v, trueValue) {
×
141
                                isHnsEnabled = pointer.Bool(true)
×
142
                        }
×
143
                case softDeleteBlobsField:
×
144
                        days, err := parseDays(v)
×
145
                        if err != nil {
×
146
                                return nil, err
×
147
                        }
×
148
                        softDeleteBlobs = days
×
149
                case softDeleteContainersField:
×
150
                        days, err := parseDays(v)
×
151
                        if err != nil {
×
152
                                return nil, err
×
153
                        }
×
154
                        softDeleteContainers = days
×
155
                case enableBlobVersioningField:
×
156
                        enableBlobVersioning = pointer.Bool(strings.EqualFold(v, trueValue))
×
157
                case storeAccountKeyField:
3✔
158
                        if strings.EqualFold(v, falseValue) {
5✔
159
                                storeAccountKey = false
2✔
160
                        }
2✔
161
                case getLatestAccountKeyField:
1✔
162
                        if getLatestAccountKey, err = strconv.ParseBool(v); err != nil {
2✔
163
                                return nil, status.Errorf(codes.InvalidArgument, "invalid %s: %s in volume context", getLatestAccountKeyField, v)
1✔
164
                        }
1✔
165
                case allowBlobPublicAccessField:
×
166
                        if strings.EqualFold(v, trueValue) {
×
167
                                allowBlobPublicAccess = pointer.Bool(true)
×
168
                        }
×
169
                case allowSharedKeyAccessField:
×
170
                        var boolValue bool
×
171
                        if boolValue, err = strconv.ParseBool(v); err != nil {
×
172
                                return nil, status.Errorf(codes.InvalidArgument, "invalid %s: %s in volume context", allowSharedKeyAccessField, v)
×
173
                        }
×
174
                        allowSharedKeyAccess = pointer.Bool(boolValue)
×
175
                case requireInfraEncryptionField:
×
176
                        if strings.EqualFold(v, trueValue) {
×
177
                                requireInfraEncryption = pointer.Bool(true)
×
178
                        }
×
179
                case pvcNamespaceKey:
×
180
                        pvcNamespace = v
×
181
                        containerNameReplaceMap[pvcNamespaceMetadata] = v
×
182
                case pvcNameKey:
×
183
                        containerNameReplaceMap[pvcNameMetadata] = v
×
184
                case pvNameKey:
×
185
                        containerNameReplaceMap[pvNameMetadata] = v
×
186
                case serverNameField:
×
187
                case storageAuthTypeField:
1✔
188
                case storageIdentityClientIDField:
1✔
189
                case storageIdentityObjectIDField:
1✔
190
                case storageIdentityResourceIDField:
1✔
191
                case msiEndpointField:
1✔
192
                case storageAADEndpointField:
1✔
193
                        // no op, only used in NodeStageVolume
194
                case storageEndpointSuffixField:
×
195
                        storageEndpointSuffix = v
×
196
                case vnetResourceGroupField:
×
197
                        vnetResourceGroup = v
×
198
                case vnetNameField:
×
199
                        vnetName = v
×
200
                case subnetNameField:
1✔
201
                        subnetName = v
1✔
202
                case accessTierField:
×
203
                        accessTier = v
×
204
                case networkEndpointTypeField:
1✔
205
                        networkEndpointType = v
1✔
206
                case mountPermissionsField:
10✔
207
                        // only do validations here, used in NodeStageVolume, NodePublishVolume
10✔
208
                        if v != "" {
20✔
209
                                if _, err := strconv.ParseUint(v, 8, 32); err != nil {
11✔
210
                                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid mountPermissions %s in storage class", v))
1✔
211
                                }
1✔
212
                        }
213
                case useDataPlaneAPIField:
1✔
214
                        useDataPlaneAPI = strings.EqualFold(v, trueValue)
1✔
215
                case fsGroupChangePolicyField:
1✔
216
                        fsGroupChangePolicy = v
1✔
217
                case tagValueDelimiterField:
×
218
                        tagValueDelimiter = v
×
219
                default:
1✔
220
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid parameter %q in storage class", k))
1✔
221
                }
222
        }
223

224
        if pointer.BoolDeref(enableBlobVersioning, false) {
16✔
225
                if isNFSProtocol(protocol) || pointer.BoolDeref(isHnsEnabled, false) {
×
226
                        return nil, status.Errorf(codes.InvalidArgument, "enableBlobVersioning is not supported for NFS protocol or HNS enabled account")
×
227
                }
×
228
        }
229

230
        if !isSupportedFSGroupChangePolicy(fsGroupChangePolicy) {
17✔
231
                return nil, status.Errorf(codes.InvalidArgument, "fsGroupChangePolicy(%s) is not supported, supported fsGroupChangePolicy list: %v", fsGroupChangePolicy, supportedFSGroupChangePolicyList)
1✔
232
        }
1✔
233

234
        if matchTags && account != "" {
16✔
235
                return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("matchTags must set as false when storageAccount(%s) is provided", account))
1✔
236
        }
1✔
237

238
        if resourceGroup == "" {
20✔
239
                resourceGroup = d.cloud.ResourceGroup
6✔
240
        }
6✔
241

242
        if secretNamespace == "" {
28✔
243
                if pvcNamespace == "" {
28✔
244
                        secretNamespace = defaultNamespace
14✔
245
                } else {
14✔
246
                        secretNamespace = pvcNamespace
×
247
                }
×
248
        }
249

250
        if protocol == "" {
19✔
251
                protocol = Fuse
5✔
252
        }
5✔
253
        if !isSupportedProtocol(protocol) {
15✔
254
                return nil, status.Errorf(codes.InvalidArgument, "protocol(%s) is not supported, supported protocol list: %v", protocol, supportedProtocolList)
1✔
255
        }
1✔
256
        if !isSupportedAccessTier(accessTier) {
13✔
257
                return nil, status.Errorf(codes.InvalidArgument, "accessTier(%s) is not supported, supported AccessTier list: %v", accessTier, armstorage.PossibleAccessTierValues())
×
258
        }
×
259

260
        if containerName != "" && containerNamePrefix != "" {
14✔
261
                return nil, status.Errorf(codes.InvalidArgument, "containerName(%s) and containerNamePrefix(%s) could not be specified together", containerName, containerNamePrefix)
1✔
262
        }
1✔
263
        if !isSupportedContainerNamePrefix(containerNamePrefix) {
13✔
264
                return nil, status.Errorf(codes.InvalidArgument, "containerNamePrefix(%s) can only contain lowercase letters, numbers, hyphens, and length should be less than 21", containerNamePrefix)
1✔
265
        }
1✔
266

267
        enableHTTPSTrafficOnly := true
11✔
268
        if strings.EqualFold(networkEndpointType, privateEndpoint) {
12✔
269
                if strings.Contains(subnetName, ",") {
2✔
270
                        return nil, status.Errorf(codes.InvalidArgument, "subnetName(%s) can only contain one subnet for private endpoint", subnetName)
1✔
271
                }
1✔
272
                createPrivateEndpoint = pointer.BoolPtr(true)
×
273
        }
274
        accountKind := string(armstorage.KindStorageV2)
10✔
275
        if isNFSProtocol(protocol) {
11✔
276
                isHnsEnabled = pointer.Bool(true)
1✔
277
                enableNfsV3 = pointer.Bool(true)
1✔
278
                // NFS protocol does not need account key
1✔
279
                storeAccountKey = false
1✔
280
                if !pointer.BoolDeref(createPrivateEndpoint, false) {
2✔
281
                        // set VirtualNetworkResourceIDs for storage account firewall setting
1✔
282
                        subnets := strings.Split(subnetName, ",")
1✔
283
                        for _, subnet := range subnets {
2✔
284
                                subnet = strings.TrimSpace(subnet)
1✔
285
                                vnetResourceID := d.getSubnetResourceID(vnetResourceGroup, vnetName, subnet)
1✔
286
                                klog.V(2).Infof("set vnetResourceID(%s) for NFS protocol", vnetResourceID)
1✔
287
                                vnetResourceIDs = []string{vnetResourceID}
1✔
288
                                if err := d.updateSubnetServiceEndpoints(ctx, vnetResourceGroup, vnetName, subnet); err != nil {
2✔
289
                                        return nil, status.Errorf(codes.Internal, "update service endpoints failed with error: %v", err)
1✔
290
                                }
1✔
291
                        }
292
                }
293
        }
294

295
        if strings.HasPrefix(strings.ToLower(storageAccountType), "premium") {
10✔
296
                accountKind = string(armstorage.KindBlockBlobStorage)
1✔
297
        }
1✔
298
        if IsAzureStackCloud(d.cloud) {
10✔
299
                accountKind = string(armstorage.KindStorage)
1✔
300
                if storageAccountType != "" && storageAccountType != string(armstorage.SKUNameStandardLRS) && storageAccountType != string(armstorage.SKUNamePremiumLRS) {
2✔
301
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("Invalid skuName value: %s, as Azure Stack only supports %s and %s Storage Account types.", storageAccountType, storage.SkuNamePremiumLRS, storage.SkuNameStandardLRS))
1✔
302
                }
1✔
303
        }
304

305
        tags, err := util.ConvertTagsToMap(customTags, tagValueDelimiter)
8✔
306
        if err != nil {
9✔
307
                return nil, status.Errorf(codes.InvalidArgument, err.Error())
1✔
308
        }
1✔
309

310
        if strings.TrimSpace(storageEndpointSuffix) == "" {
14✔
311
                storageEndpointSuffix = d.getStorageEndPointSuffix()
7✔
312
        }
7✔
313

314
        if storeAccountKey && !pointer.BoolDeref(allowSharedKeyAccess, true) {
7✔
315
                return nil, status.Errorf(codes.InvalidArgument, "storeAccountKey is not supported for account with shared access key disabled")
×
316
        }
×
317

318
        accountOptions := &azure.AccountOptions{
7✔
319
                Name:                            account,
7✔
320
                Type:                            storageAccountType,
7✔
321
                Kind:                            accountKind,
7✔
322
                SubscriptionID:                  subsID,
7✔
323
                ResourceGroup:                   resourceGroup,
7✔
324
                Location:                        location,
7✔
325
                EnableHTTPSTrafficOnly:          enableHTTPSTrafficOnly,
7✔
326
                VirtualNetworkResourceIDs:       vnetResourceIDs,
7✔
327
                Tags:                            tags,
7✔
328
                MatchTags:                       matchTags,
7✔
329
                IsHnsEnabled:                    isHnsEnabled,
7✔
330
                EnableNfsV3:                     enableNfsV3,
7✔
331
                AllowBlobPublicAccess:           allowBlobPublicAccess,
7✔
332
                AllowSharedKeyAccess:            allowSharedKeyAccess,
7✔
333
                RequireInfrastructureEncryption: requireInfraEncryption,
7✔
334
                VNetResourceGroup:               vnetResourceGroup,
7✔
335
                VNetName:                        vnetName,
7✔
336
                SubnetName:                      subnetName,
7✔
337
                AccessTier:                      accessTier,
7✔
338
                CreatePrivateEndpoint:           createPrivateEndpoint,
7✔
339
                StorageType:                     provider.StorageTypeBlob,
7✔
340
                StorageEndpointSuffix:           storageEndpointSuffix,
7✔
341
                EnableBlobVersioning:            enableBlobVersioning,
7✔
342
                SoftDeleteBlobs:                 softDeleteBlobs,
7✔
343
                SoftDeleteContainers:            softDeleteContainers,
7✔
344
                GetLatestAccountKey:             getLatestAccountKey,
7✔
345
        }
7✔
346

7✔
347
        containerName = replaceWithMap(containerName, containerNameReplaceMap)
7✔
348
        validContainerName := containerName
7✔
349
        if validContainerName == "" {
8✔
350
                validContainerName = volName
1✔
351
                if containerNamePrefix != "" {
1✔
352
                        validContainerName = containerNamePrefix + "-" + volName
×
353
                }
×
354
                validContainerName = getValidContainerName(validContainerName, protocol)
1✔
355
                setKeyValueInMap(parameters, containerNameField, validContainerName)
1✔
356
        }
357

358
        if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
7✔
359
                // logging the job status if it's volume cloning
×
360
                if volContentSource != nil {
×
361
                        jobState, percent, err := d.azcopy.GetAzcopyJob(validContainerName, []string{})
×
362
                        return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err)
×
363
                }
×
364
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
×
365
        }
366
        defer d.volumeLocks.Release(volName)
7✔
367

7✔
368
        requestName := "controller_create_volume"
7✔
369
        if volContentSource != nil {
9✔
370
                switch volContentSource.Type.(type) {
2✔
371
                case *csi.VolumeContentSource_Snapshot:
1✔
372
                        return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
1✔
373
                case *csi.VolumeContentSource_Volume:
1✔
374
                        requestName = "controller_create_volume_from_volume"
1✔
375
                }
376
        }
377

378
        var volumeID string
6✔
379
        mc := metrics.NewMetricContext(blobCSIDriverName, requestName, d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
6✔
380
        isOperationSucceeded := false
6✔
381
        defer func() {
12✔
382
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
6✔
383
        }()
6✔
384

385
        var accountKey string
6✔
386
        accountName := account
6✔
387
        if len(secrets) == 0 && accountName == "" {
7✔
388
                if v, ok := d.volMap.Load(volName); ok {
1✔
389
                        accountName = v.(string)
×
390
                } else {
1✔
391
                        lockKey := fmt.Sprintf("%s%s%s%s%s%v", storageAccountType, accountKind, resourceGroup, location, protocol, pointer.BoolDeref(createPrivateEndpoint, false))
1✔
392
                        // search in cache first
1✔
393
                        cache, err := d.accountSearchCache.Get(lockKey, azcache.CacheReadTypeDefault)
1✔
394
                        if err != nil {
1✔
395
                                return nil, status.Errorf(codes.Internal, err.Error())
×
396
                        }
×
397
                        if cache != nil {
1✔
398
                                accountName = cache.(string)
×
399
                        } else {
1✔
400
                                d.volLockMap.LockEntry(lockKey)
1✔
401
                                err = wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
2✔
402
                                        var retErr error
1✔
403
                                        accountName, accountKey, retErr = d.cloud.EnsureStorageAccount(ctx, accountOptions, protocol)
1✔
404
                                        if isRetriableError(retErr) {
1✔
405
                                                klog.Warningf("EnsureStorageAccount(%s) failed with error(%v), waiting for retrying", account, retErr)
×
406
                                                return false, nil
×
407
                                        }
×
408
                                        return true, retErr
1✔
409
                                })
410
                                d.volLockMap.UnlockEntry(lockKey)
1✔
411
                                if err != nil {
2✔
412
                                        return nil, status.Errorf(codes.Internal, "ensure storage account failed with %v", err)
1✔
413
                                }
1✔
414
                                d.accountSearchCache.Set(lockKey, accountName)
×
415
                                d.volMap.Store(volName, accountName)
×
416
                        }
417
                }
418
        }
419

420
        if pointer.BoolDeref(createPrivateEndpoint, false) && isNFSProtocol(protocol) {
5✔
421
                // As for blobfuse/blobfuse2, serverName, i.e.,AZURE_STORAGE_BLOB_ENDPOINT env variable can't include
×
422
                // "privatelink", issue: https://github.com/Azure/azure-storage-fuse/issues/1014
×
423
                //
×
424
                // And use public endpoint will be befine to blobfuse/blobfuse2, because it will be resolved to private endpoint
×
425
                // by private dns zone, which includes CNAME record, documented here:
×
426
                // https://learn.microsoft.com/en-us/azure/storage/common/storage-private-endpoints?toc=%2Fazure%2Fstorage%2Fblobs%2Ftoc.json&bc=%2Fazure%2Fstorage%2Fblobs%2Fbreadcrumb%2Ftoc.json#dns-changes-for-private-endpoints
×
427
                setKeyValueInMap(parameters, serverNameField, fmt.Sprintf("%s.privatelink.blob.%s", accountName, storageEndpointSuffix))
×
428
        }
×
429

430
        accountOptions.Name = accountName
5✔
431
        if len(secrets) == 0 && useDataPlaneAPI {
6✔
432
                if accountKey == "" {
2✔
433
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
2✔
434
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
435
                        }
1✔
436
                }
437
                secrets = createStorageAccountSecret(accountName, accountKey)
×
438
        }
439

440
        klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
4✔
441
        if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
5✔
442
                return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
1✔
443
        }
1✔
444
        if volContentSource != nil {
4✔
445
                accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace)
1✔
446
                if err != nil {
1✔
NEW
447
                        return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
×
UNCOV
448
                }
×
449
                if err := d.copyVolume(ctx, req, accountName, accountSASToken, authAzcopyEnv, validContainerName, secretNamespace, accountOptions, storageEndpointSuffix); err != nil {
2✔
450
                        return nil, err
1✔
451
                }
1✔
452
        }
453

454
        if storeAccountKey && len(req.GetSecrets()) == 0 {
4✔
455
                if accountKey == "" {
4✔
456
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
3✔
457
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
458
                        }
1✔
459
                }
460

461
                secretName, err := setAzureCredentials(ctx, d.KubeClient, accountName, accountKey, secretNamespace)
1✔
462
                if err != nil {
1✔
463
                        return nil, status.Errorf(codes.Internal, "failed to store storage account key: %v", err)
×
464
                }
×
465
                if secretName != "" {
1✔
466
                        klog.V(2).Infof("store account key to k8s secret(%v) in %s namespace", secretName, secretNamespace)
×
467
                }
×
468
        }
469

470
        var uuid string
1✔
471
        if containerName != "" {
2✔
472
                // add volume name as suffix to differentiate volumeID since "containerName" is specified
1✔
473
                // not necessary for dynamic container name creation since volumeID already contains volume name
1✔
474
                uuid = volName
1✔
475
        }
1✔
476
        volumeID = fmt.Sprintf(volumeIDTemplate, resourceGroup, accountName, validContainerName, uuid, secretNamespace, subsID)
1✔
477
        klog.V(2).Infof("create container %s on storage account %s successfully", validContainerName, accountName)
1✔
478

1✔
479
        if useDataPlaneAPI {
1✔
480
                d.dataPlaneAPIVolCache.Set(volumeID, "")
×
481
                d.dataPlaneAPIVolCache.Set(accountName, "")
×
482
        }
×
483

484
        isOperationSucceeded = true
1✔
485
        // reset secretNamespace field in VolumeContext
1✔
486
        setKeyValueInMap(parameters, secretNamespaceField, secretNamespace)
1✔
487
        return &csi.CreateVolumeResponse{
1✔
488
                Volume: &csi.Volume{
1✔
489
                        VolumeId:      volumeID,
1✔
490
                        CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
1✔
491
                        VolumeContext: parameters,
1✔
492
                        ContentSource: volContentSource,
1✔
493
                },
1✔
494
        }, nil
1✔
495
}
496

497
// DeleteVolume delete a volume
498
func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
5✔
499
        volumeID := req.GetVolumeId()
5✔
500
        if len(volumeID) == 0 {
6✔
501
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
502
        }
1✔
503

504
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
4✔
505
                return nil, status.Errorf(codes.Internal, "invalid delete volume req: %v", req)
×
506
        }
×
507

508
        if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
4✔
509
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
×
510
        }
×
511
        defer d.volumeLocks.Release(volumeID)
4✔
512

4✔
513
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
4✔
514
        if err != nil {
5✔
515
                // According to CSI Driver Sanity Tester, should succeed when an invalid volume id is used
1✔
516
                klog.Errorf("GetContainerInfo(%s) in DeleteVolume failed with error: %v", volumeID, err)
1✔
517
                return &csi.DeleteVolumeResponse{}, nil
1✔
518
        }
1✔
519

520
        secrets := req.GetSecrets()
3✔
521
        if len(secrets) == 0 && d.useDataPlaneAPI(volumeID, accountName) {
4✔
522
                _, accountName, accountKey, _, _, err := d.GetAuthEnv(ctx, volumeID, "", nil, secrets)
1✔
523
                if err != nil {
2✔
524
                        return nil, status.Errorf(codes.Internal, "GetAuthEnv(%s) failed with %v", volumeID, err)
1✔
525
                }
1✔
526
                if accountName != "" && accountKey != "" {
×
527
                        secrets = createStorageAccountSecret(accountName, accountKey)
×
528
                }
×
529
        }
530

531
        mc := metrics.NewMetricContext(blobCSIDriverName, "controller_delete_volume", d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
2✔
532
        isOperationSucceeded := false
2✔
533
        defer func() {
4✔
534
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
2✔
535
        }()
2✔
536

537
        if resourceGroupName == "" {
3✔
538
                resourceGroupName = d.cloud.ResourceGroup
1✔
539
        }
1✔
540
        klog.V(2).Infof("deleting container(%s) rg(%s) account(%s) volumeID(%s)", containerName, resourceGroupName, accountName, volumeID)
2✔
541
        if err := d.DeleteBlobContainer(ctx, subsID, resourceGroupName, accountName, containerName, secrets); err != nil {
4✔
542
                return nil, status.Errorf(codes.Internal, "failed to delete container(%s) under rg(%s) account(%s) volumeID(%s), error: %v", containerName, resourceGroupName, accountName, volumeID, err)
2✔
543
        }
2✔
544

545
        isOperationSucceeded = true
×
546
        klog.V(2).Infof("container(%s) under rg(%s) account(%s) volumeID(%s) is deleted successfully", containerName, resourceGroupName, accountName, volumeID)
×
547
        return &csi.DeleteVolumeResponse{}, nil
×
548
}
549

550
// ValidateVolumeCapabilities return the capabilities of the volume
551
func (d *Driver) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
8✔
552
        volumeID := req.GetVolumeId()
8✔
553
        if len(volumeID) == 0 {
9✔
554
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
555
        }
1✔
556
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
9✔
557
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
558
        }
2✔
559

560
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
5✔
561
        if err != nil {
6✔
562
                klog.Errorf("GetContainerInfo(%s) in ValidateVolumeCapabilities failed with error: %v", volumeID, err)
1✔
563
                return nil, status.Error(codes.NotFound, err.Error())
1✔
564
        }
1✔
565

566
        var exist bool
4✔
567
        secrets := req.GetSecrets()
4✔
568
        if len(secrets) > 0 {
5✔
569
                container, err := getContainerReference(containerName, secrets, d.getCloudEnvironment())
1✔
570
                if err != nil {
2✔
571
                        return nil, status.Error(codes.Internal, err.Error())
1✔
572
                }
1✔
573
                exist, err = container.Exists()
×
574
                if err != nil {
×
575
                        return nil, status.Error(codes.Internal, err.Error())
×
576
                }
×
577
        } else {
3✔
578
                if resourceGroupName == "" {
3✔
579
                        resourceGroupName = d.cloud.ResourceGroup
×
580
                }
×
581
                blobClient, err := d.clientFactory.GetBlobContainerClientForSub(subsID)
3✔
582
                if err != nil {
3✔
583
                        return nil, status.Error(codes.Internal, err.Error())
×
584
                }
×
585

586
                blobContainer, err := blobClient.Get(ctx, resourceGroupName, accountName, containerName)
3✔
587
                if err != nil {
4✔
588
                        return nil, status.Error(codes.Internal, err.Error())
1✔
589
                }
1✔
590
                if blobContainer.ContainerProperties == nil {
3✔
591
                        return nil, status.Errorf(codes.Internal, "ContainerProperties of volume(%s) is nil", volumeID)
1✔
592
                }
1✔
593
                exist = blobContainer.ContainerProperties.Deleted != nil && !*blobContainer.ContainerProperties.Deleted
1✔
594
        }
595
        if !exist {
2✔
596
                return nil, status.Errorf(codes.NotFound, "requested volume(%s) does not exist", volumeID)
1✔
597
        }
1✔
598
        klog.V(2).Infof("ValidateVolumeCapabilities on volume(%s) succeeded", volumeID)
×
599

×
600
        // blob driver supports all AccessModes, no need to check capabilities here
×
601
        return &csi.ValidateVolumeCapabilitiesResponse{
×
602
                Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
×
603
                        VolumeCapabilities: req.GetVolumeCapabilities(),
×
604
                },
×
605
                Message: "",
×
606
        }, nil
×
607
}
608

609
func (d *Driver) ControllerPublishVolume(_ context.Context, _ *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
1✔
610
        return nil, status.Error(codes.Unimplemented, "ControllerPublishVolume is not yet implemented")
1✔
611
}
1✔
612

613
func (d *Driver) ControllerUnpublishVolume(_ context.Context, _ *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
1✔
614
        return nil, status.Error(codes.Unimplemented, "ControllerUnpublishVolume is not yet implemented")
1✔
615
}
1✔
616

617
// ControllerGetVolume get volume
618
func (d *Driver) ControllerGetVolume(context.Context, *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
1✔
619
        return nil, status.Error(codes.Unimplemented, "ControllerGetVolume is not yet implemented")
1✔
620
}
1✔
621

622
// GetCapacity returns the capacity of the total available storage pool
623
func (d *Driver) GetCapacity(_ context.Context, _ *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
1✔
624
        return nil, status.Error(codes.Unimplemented, "GetCapacity is not yet implemented")
1✔
625
}
1✔
626

627
// ListVolumes return all available volumes
628
func (d *Driver) ListVolumes(_ context.Context, _ *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
1✔
629
        return nil, status.Error(codes.Unimplemented, "ListVolumes is not yet implemented")
1✔
630
}
1✔
631

632
// CreateSnapshot create snapshot
633
func (d *Driver) CreateSnapshot(_ context.Context, _ *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
1✔
634
        return nil, status.Error(codes.Unimplemented, "CreateSnapshot is not yet implemented")
1✔
635
}
1✔
636

637
// DeleteSnapshot delete snapshot
638
func (d *Driver) DeleteSnapshot(_ context.Context, _ *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
1✔
639
        return nil, status.Error(codes.Unimplemented, "DeleteSnapshot is not yet implemented")
1✔
640
}
1✔
641

642
// ListSnapshots list snapshots
643
func (d *Driver) ListSnapshots(_ context.Context, _ *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
1✔
644
        return nil, status.Error(codes.Unimplemented, "ListSnapshots is not yet implemented")
1✔
645
}
1✔
646

647
// ControllerGetCapabilities returns the capabilities of the Controller plugin
648
func (d *Driver) ControllerGetCapabilities(_ context.Context, _ *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
1✔
649
        return &csi.ControllerGetCapabilitiesResponse{
1✔
650
                Capabilities: d.Cap,
1✔
651
        }, nil
1✔
652
}
1✔
653

654
// ControllerExpandVolume controller expand volume
655
func (d *Driver) ControllerExpandVolume(_ context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
4✔
656
        if len(req.GetVolumeId()) == 0 {
5✔
657
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
658
        }
1✔
659

660
        if req.GetCapacityRange() == nil {
4✔
661
                return nil, status.Error(codes.InvalidArgument, "Capacity Range missing in request")
1✔
662
        }
1✔
663

664
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_EXPAND_VOLUME); err != nil {
2✔
665
                return nil, status.Errorf(codes.Internal, "invalid expand volume req: %v", req)
×
666
        }
×
667

668
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
2✔
669
        requestGiB := int64(util.RoundUpGiB(volSizeBytes))
2✔
670

2✔
671
        if volSizeBytes > containerMaxSize {
3✔
672
                return nil, status.Errorf(codes.OutOfRange, "required bytes (%d) exceeds the maximum supported bytes (%d)", volSizeBytes, containerMaxSize)
1✔
673
        }
1✔
674

675
        klog.V(2).Infof("ControllerExpandVolume(%s) successfully, currentQuota: %d Gi", req.VolumeId, requestGiB)
1✔
676

1✔
677
        return &csi.ControllerExpandVolumeResponse{CapacityBytes: req.GetCapacityRange().GetRequiredBytes()}, nil
1✔
678
}
679

680
// CreateBlobContainer creates a blob container
681
func (d *Driver) CreateBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
10✔
682
        if containerName == "" {
11✔
683
                return fmt.Errorf("containerName is empty")
1✔
684
        }
1✔
685
        return wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
18✔
686
                var err error
9✔
687
                if len(secrets) > 0 {
10✔
688
                        container, getErr := getContainerReference(containerName, secrets, d.getCloudEnvironment())
1✔
689
                        if getErr != nil {
2✔
690
                                return true, getErr
1✔
691
                        }
1✔
692
                        container.Metadata = map[string]string{createdByMetadata: d.Name}
×
693
                        _, err = container.CreateIfNotExists(&azstorage.CreateContainerOptions{Access: azstorage.ContainerAccessTypePrivate})
×
694
                } else {
8✔
695
                        blobContainer := armstorage.BlobContainer{
8✔
696
                                ContainerProperties: &armstorage.ContainerProperties{
8✔
697
                                        PublicAccess: to.Ptr(armstorage.PublicAccessNone),
8✔
698
                                        Metadata:     map[string]*string{createdByMetadata: to.Ptr(d.Name)},
8✔
699
                                },
8✔
700
                        }
8✔
701
                        var blobClient blobcontainerclient.Interface
8✔
702
                        blobClient, err = d.clientFactory.GetBlobContainerClientForSub(subsID)
8✔
703
                        if err != nil {
8✔
704
                                return true, err
×
705
                        }
×
706
                        _, err = blobClient.CreateContainer(ctx, resourceGroupName, accountName, containerName, blobContainer)
8✔
707
                }
708
                if err != nil {
12✔
709
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
4✔
710
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) {
6✔
711
                                klog.Warningf("CreateContainer(%s, %s, %s) failed with error(%v), retry", resourceGroupName, accountName, containerName, err)
2✔
712
                                return false, nil
2✔
713
                        }
2✔
714
                }
715
                return true, err
6✔
716
        })
717
}
718

719
// DeleteBlobContainer deletes a blob container
720
func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
8✔
721
        if containerName == "" {
9✔
722
                return fmt.Errorf("containerName is empty")
1✔
723
        }
1✔
724
        return wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
14✔
725
                var err error
7✔
726
                if len(secrets) > 0 {
10✔
727
                        container, getErr := getContainerReference(containerName, secrets, d.getCloudEnvironment())
3✔
728
                        if getErr != nil {
6✔
729
                                return true, getErr
3✔
730
                        }
3✔
731
                        _, err = container.DeleteIfExists(nil)
×
732
                } else {
4✔
733
                        var blobClient blobcontainerclient.Interface
4✔
734
                        blobClient, err = d.clientFactory.GetBlobContainerClientForSub(subsID)
4✔
735
                        if err != nil {
4✔
736
                                return true, err
×
737
                        }
×
738
                        err = blobClient.DeleteContainer(ctx, resourceGroupName, accountName, containerName)
4✔
739
                }
740
                if err != nil {
7✔
741
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
3✔
742
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) ||
3✔
743
                                strings.Contains(err.Error(), statusCodeNotFound) ||
3✔
744
                                strings.Contains(err.Error(), httpCodeNotFound) {
5✔
745
                                klog.Warningf("delete container(%s) on account(%s) failed with error(%v), return as success", containerName, accountName, err)
2✔
746
                                return true, nil
2✔
747
                        }
2✔
748
                        return false, fmt.Errorf("failed to delete container(%s) on account(%s), error: %w", containerName, accountName, err)
1✔
749
                }
750
                return true, err
1✔
751
        })
752
}
753

754
// copyBlobContainer copies source volume content into a destination volume
755
func (d *Driver) copyBlobContainer(ctx context.Context, req *csi.CreateVolumeRequest, dstAccountName string, dstAccountSasToken string, authAzcopyEnv []string, dstContainerName string, secretNamespace string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
7✔
756
        var sourceVolumeID string
7✔
757
        if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
14✔
758
                sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
7✔
759

7✔
760
        }
7✔
761
        srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
7✔
762
        if err != nil {
9✔
763
                return status.Error(codes.NotFound, err.Error())
2✔
764
        }
2✔
765
        if dstAccountName == "" {
9✔
766
                dstAccountName = srcAccountName
4✔
767
        }
4✔
768
        if srcAccountName == "" || srcContainerName == "" || dstContainerName == "" {
7✔
769
                return fmt.Errorf("srcAccountName(%s) or srcContainerName(%s) or dstContainerName(%s) is empty", srcAccountName, srcContainerName, dstContainerName)
2✔
770
        }
2✔
771
        srcAccountSasToken := dstAccountSasToken
3✔
772
        if srcAccountName != dstAccountName && dstAccountSasToken != "" {
4✔
773
                srcAccountOptions := &azure.AccountOptions{
1✔
774
                        Name:                srcAccountName,
1✔
775
                        ResourceGroup:       srcResourceGroupName,
1✔
776
                        SubscriptionID:      srcSubscriptionID,
1✔
777
                        GetLatestAccountKey: accountOptions.GetLatestAccountKey,
1✔
778
                }
1✔
779
                if srcAccountSasToken, _, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, nil, "", secretNamespace); err != nil {
2✔
780
                        return err
1✔
781
                }
1✔
782
        }
783
        srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", srcAccountName, storageEndpointSuffix, srcContainerName, srcAccountSasToken)
2✔
784
        dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", dstAccountName, storageEndpointSuffix, dstContainerName, dstAccountSasToken)
2✔
785

2✔
786
        jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
2✔
787
        klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
2✔
788
        switch jobState {
2✔
789
        case util.AzcopyJobError, util.AzcopyJobCompleted:
1✔
790
                return err
1✔
791
        case util.AzcopyJobRunning:
1✔
792
                return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
1✔
793
        case util.AzcopyJobNotFound:
×
NEW
794
                klog.V(2).Infof("copy blob container %s:%s to %s:%s", srcAccountName, srcContainerName, dstAccountName, dstContainerName)
×
795
                execFunc := func() error {
×
NEW
796
                        if out, err := d.execAzcopyCopy(srcPath, dstPath, azcopyCloneVolumeOptions, authAzcopyEnv); err != nil {
×
797
                                return fmt.Errorf("exec error: %v, output: %v", err, string(out))
×
798
                        }
×
799
                        return nil
×
800
                }
801
                timeoutFunc := func() error {
×
802
                        _, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
×
803
                        return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcPath, dstContainerName, percent)
×
804
                }
×
805
                copyErr := util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
×
806
                if copyErr != nil {
×
NEW
807
                        klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstContainerName, copyErr)
×
808
                } else {
×
NEW
809
                        klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
×
810
                }
×
811
                return copyErr
×
812
        }
813
        return err
×
814
}
815

816
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
817
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountName string, accountSASToken string, authAzcopyEnv []string, dstContainerName, secretNamespace string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
8✔
818
        vs := req.VolumeContentSource
8✔
819
        switch vs.Type.(type) {
8✔
820
        case *csi.VolumeContentSource_Snapshot:
1✔
821
                return status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
1✔
822
        case *csi.VolumeContentSource_Volume:
7✔
823
                return d.copyBlobContainer(ctx, req, accountName, accountSASToken, authAzcopyEnv, dstContainerName, secretNamespace, accountOptions, storageEndpointSuffix)
7✔
NEW
824
        default:
×
NEW
825
                return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
×
826
        }
827
}
828

829
// execAzcopyCopy exec azcopy copy command
NEW
830
func (d *Driver) execAzcopyCopy(srcPath, dstPath string, azcopyCopyOptions, authAzcopyEnv []string) ([]byte, error) {
×
NEW
831
        cmd := exec.Command("azcopy", "copy", srcPath, dstPath)
×
NEW
832
        cmd.Args = append(cmd.Args, azcopyCopyOptions...)
×
NEW
833
        if len(authAzcopyEnv) > 0 {
×
NEW
834
                cmd.Env = append(os.Environ(), authAzcopyEnv...)
×
NEW
835
        }
×
NEW
836
        return cmd.CombinedOutput()
×
837
}
838

839
// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
840
func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
7✔
841
        azureAuthConfig := d.cloud.Config.AzureAuthConfig
7✔
842
        var authAzcopyEnv []string
7✔
843
        if azureAuthConfig.UseManagedIdentityExtension {
10✔
844
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, MSI))
3✔
845
                if len(azureAuthConfig.UserAssignedIdentityID) > 0 {
4✔
846
                        klog.V(2).Infof("use user assigned managed identity to authorize azcopy")
1✔
847
                        authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyMSIClientID, azureAuthConfig.UserAssignedIdentityID))
1✔
848
                } else {
3✔
849
                        klog.V(2).Infof("use system-assigned managed identity to authorize azcopy")
2✔
850
                }
2✔
851
                return authAzcopyEnv, nil
3✔
852
        }
853
        if len(azureAuthConfig.AADClientSecret) > 0 {
6✔
854
                klog.V(2).Infof("use service principal to authorize azcopy")
2✔
855
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, SPN))
2✔
856
                if azureAuthConfig.AADClientID == "" || azureAuthConfig.TenantID == "" {
3✔
857
                        return []string{}, fmt.Errorf("AADClientID and TenantID must be set when use service principal")
1✔
858
                }
1✔
859
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAApplicationID, azureAuthConfig.AADClientID))
1✔
860
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAClientSecret, azureAuthConfig.AADClientSecret))
1✔
861
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyTenantID, azureAuthConfig.TenantID))
1✔
862
                klog.V(2).Infof(fmt.Sprintf("set AZCOPY_SPA_APPLICATION_ID=%s, AZCOPY_TENANT_ID=%s successfully", azureAuthConfig.AADClientID, azureAuthConfig.TenantID))
1✔
863

1✔
864
                return authAzcopyEnv, nil
1✔
865
        }
866
        return []string{}, fmt.Errorf("service principle or managed identity are both not set")
2✔
867
}
868

869
// getAzcopyAuth will only generate sas token for azcopy in following conditions:
870
// 1. secrets is not empty
871
// 2. driver is not using managed identity and service principal
872
// 3. azcopy returns AuthorizationPermissionMismatch error when using service principal or managed identity
873
func (d *Driver) getAzcopyAuth(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, accountOptions *azure.AccountOptions, secrets map[string]string, secretName, secretNamespace string) (string, []string, error) {
6✔
874
        var authAzcopyEnv []string
6✔
875
        var err error
6✔
876
        useSasToken := false
6✔
877
        if !d.useDataPlaneAPI("", accountName) && len(secrets) == 0 && len(secretName) == 0 {
8✔
878
                // search in cache first
2✔
879
                if cache, err := d.azcopySasTokenCache.Get(accountName, azcache.CacheReadTypeDefault); err == nil && cache != nil {
2✔
880
                        klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
881
                        return cache.(string), nil, nil
×
882
                }
×
883

884
                authAzcopyEnv, err = d.authorizeAzcopyWithIdentity()
2✔
885
                if err != nil {
3✔
886
                        klog.Warningf("failed to authorize azcopy with identity, error: %v", err)
1✔
887
                } else {
2✔
888
                        if len(authAzcopyEnv) > 0 {
2✔
889
                                out, testErr := d.azcopy.TestListJobs(accountName, storageEndpointSuffix, authAzcopyEnv)
1✔
890
                                if testErr != nil {
1✔
891
                                        return "", nil, fmt.Errorf("azcopy list command failed with error(%v): %v", testErr, out)
×
892
                                }
×
893
                                if strings.Contains(out, authorizationPermissionMismatch) {
1✔
894
                                        klog.Warningf("azcopy list failed with AuthorizationPermissionMismatch error, should assign \"Storage Blob Data Contributor\" role to controller identity, fall back to use sas token, original output: %v", out)
×
895
                                        useSasToken = true
×
896
                                }
×
897
                        }
898
                }
899
        }
900

901
        if len(secrets) > 0 || len(secretName) > 0 || len(authAzcopyEnv) == 0 || useSasToken {
11✔
902
                if accountKey == "" {
10✔
903
                        if _, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
7✔
904
                                return "", nil, err
2✔
905
                        }
2✔
906
                }
907
                klog.V(2).Infof("generate sas token for account(%s)", accountName)
3✔
908
                sasToken, err := d.generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
3✔
909
                return sasToken, nil, err
3✔
910
        }
911
        return "", authAzcopyEnv, nil
1✔
912
}
913

914
// isValidVolumeCapabilities validates the given VolumeCapability array is valid
915
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
28✔
916
        if len(volCaps) == 0 {
30✔
917
                return fmt.Errorf("volume capabilities missing in request")
2✔
918
        }
2✔
919
        for _, c := range volCaps {
52✔
920
                if c.GetBlock() != nil {
28✔
921
                        return fmt.Errorf("block volume capability not supported")
2✔
922
                }
2✔
923
        }
924
        return nil
24✔
925
}
926

927
func parseDays(dayStr string) (int32, error) {
3✔
928
        days, err := strconv.Atoi(dayStr)
3✔
929
        if err != nil {
4✔
930
                return 0, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid %s:%s in storage class", softDeleteBlobsField, dayStr))
1✔
931
        }
1✔
932
        if days <= 0 || days > 365 {
3✔
933
                return 0, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid %s:%s in storage class, should be in range [1, 365]", softDeleteBlobsField, dayStr))
1✔
934
        }
1✔
935

936
        return int32(days), nil
1✔
937
}
938

939
// generateSASToken generate a sas token for storage account
940
func (d *Driver) generateSASToken(accountName, accountKey, storageEndpointSuffix string, expiryTime int) (string, error) {
5✔
941
        // search in cache first
5✔
942
        cache, err := d.azcopySasTokenCache.Get(accountName, azcache.CacheReadTypeDefault)
5✔
943
        if err != nil {
5✔
944
                return "", fmt.Errorf("get(%s) from azcopySasTokenCache failed with error: %v", accountName, err)
×
945
        }
×
946
        if cache != nil {
5✔
947
                klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
948
                return cache.(string), nil
×
949
        }
×
950

951
        credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
5✔
952
        if err != nil {
8✔
953
                return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new shared key credential, accountName: %s, err: %s", accountName, err.Error()))
3✔
954
        }
3✔
955
        clientOptions := service.ClientOptions{}
2✔
956
        clientOptions.InsecureAllowCredentialWithHTTP = true
2✔
957
        serviceClient, err := service.NewClientWithSharedKeyCredential(fmt.Sprintf("https://%s.blob.%s/", accountName, storageEndpointSuffix), credential, &clientOptions)
2✔
958
        if err != nil {
2✔
959
                return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new client with shared key credential, accountName: %s, err: %s", accountName, err.Error()))
×
960
        }
×
961
        sasURL, err := serviceClient.GetSASURL(
2✔
962
                sas.AccountResourceTypes{Object: true, Service: false, Container: true},
2✔
963
                sas.AccountPermissions{Read: true, List: true, Write: true},
2✔
964
                time.Now().Add(time.Duration(expiryTime)*time.Minute),
2✔
965
                &service.GetSASURLOptions{StartTime: to.Ptr(time.Now())},
2✔
966
        )
2✔
967
        if err != nil {
2✔
968
                return "", err
×
969
        }
×
970
        u, err := url.Parse(sasURL)
2✔
971
        if err != nil {
2✔
972
                return "", err
×
973
        }
×
974
        sasToken := "?" + u.RawQuery
2✔
975
        d.azcopySasTokenCache.Set(accountName, sasToken)
2✔
976
        return sasToken, nil
2✔
977
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc