• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 14642486131

24 Apr 2025 01:10PM UTC coverage: 78.052% (-0.03%) from 78.078%
14642486131

Pull #1964

github

andyzhangx
fix: incorrect metris log in NodeStageVolume
Pull Request #1964: fix: incorrect metris log in NodeStageVolume

0 of 1 new or added line in 1 file covered. (0.0%)

1 existing line in 1 file now uncovered.

2340 of 2998 relevant lines covered (78.05%)

7.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.09
/pkg/blob/controllerserver.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package blob
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "os"
24
        "os/exec"
25
        "strconv"
26
        "strings"
27
        "time"
28

29
        "google.golang.org/grpc/codes"
30
        "google.golang.org/grpc/status"
31

32
        "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
33
        "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage"
34
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
35
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
36
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
37
        azstorage "github.com/Azure/azure-sdk-for-go/storage"
38
        "github.com/container-storage-interface/spec/lib/go/csi"
39

40
        "k8s.io/apimachinery/pkg/util/wait"
41
        "k8s.io/klog/v2"
42
        "k8s.io/utils/ptr"
43

44
        "sigs.k8s.io/blob-csi-driver/pkg/util"
45
        "sigs.k8s.io/cloud-provider-azure/pkg/azclient/blobcontainerclient"
46
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
47
        "sigs.k8s.io/cloud-provider-azure/pkg/metrics"
48
        "sigs.k8s.io/cloud-provider-azure/pkg/provider/storage"
49
)
50

51
const (
52
        privateEndpoint = "privateendpoint"
53

54
        azcopyAutoLoginType             = "AZCOPY_AUTO_LOGIN_TYPE"
55
        azcopySPAApplicationID          = "AZCOPY_SPA_APPLICATION_ID"
56
        azcopySPAClientSecret           = "AZCOPY_SPA_CLIENT_SECRET"
57
        azcopyTenantID                  = "AZCOPY_TENANT_ID"
58
        azcopyMSIClientID               = "AZCOPY_MSI_CLIENT_ID"
59
        MSI                             = "MSI"
60
        SPN                             = "SPN"
61
        authorizationPermissionMismatch = "AuthorizationPermissionMismatch"
62

63
        createdByMetadata = "createdBy"
64
)
65

66
// CreateVolume provisions a volume
67
func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
24✔
68
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
24✔
69
                klog.Errorf("invalid create volume req: %v", req)
×
70
                return nil, err
×
71
        }
×
72

73
        volName := req.GetName()
24✔
74
        if len(volName) == 0 {
25✔
75
                return nil, status.Error(codes.InvalidArgument, "CreateVolume Name must be provided")
1✔
76
        }
1✔
77

78
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
25✔
79
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
80
        }
2✔
81

82
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
21✔
83
        requestGiB := int(util.RoundUpGiB(volSizeBytes))
21✔
84

21✔
85
        volContentSource := req.GetVolumeContentSource()
21✔
86
        secrets := req.GetSecrets()
21✔
87

21✔
88
        parameters := req.GetParameters()
21✔
89
        if parameters == nil {
22✔
90
                parameters = make(map[string]string)
1✔
91
        }
1✔
92
        var storageAccountType, subsID, resourceGroup, location, account, containerName, containerNamePrefix, protocol, customTags, secretName, secretNamespace, pvcNamespace, tagValueDelimiter string
21✔
93
        var isHnsEnabled, requireInfraEncryption, enableBlobVersioning, createPrivateEndpoint, enableNfsV3, allowSharedKeyAccess *bool
21✔
94
        var vnetResourceGroup, vnetName, subnetName, accessTier, networkEndpointType, storageEndpointSuffix, fsGroupChangePolicy, srcAccountName string
21✔
95
        var matchTags, useDataPlaneAPI, getLatestAccountKey bool
21✔
96
        var softDeleteBlobs, softDeleteContainers int32
21✔
97
        var vnetResourceIDs []string
21✔
98
        var err error
21✔
99
        // set allowBlobPublicAccess as false by default
21✔
100
        allowBlobPublicAccess := ptr.To(false)
21✔
101

21✔
102
        containerNameReplaceMap := map[string]string{}
21✔
103

21✔
104
        // store account key to k8s secret by default
21✔
105
        storeAccountKey := true
21✔
106

21✔
107
        // Apply ProvisionerParameters (case-insensitive). We leave validation of
21✔
108
        // the values to the cloud provider.
21✔
109
        for k, v := range parameters {
122✔
110
                switch strings.ToLower(k) {
101✔
111
                case skuNameField:
9✔
112
                        storageAccountType = v
9✔
113
                case storageAccountTypeField:
10✔
114
                        storageAccountType = v
10✔
115
                case locationField:
9✔
116
                        location = v
9✔
117
                case storageAccountField:
10✔
118
                        account = v
10✔
119
                case subscriptionIDField:
×
120
                        subsID = v
×
121
                case resourceGroupField:
9✔
122
                        resourceGroup = v
9✔
123
                case containerNameField:
10✔
124
                        containerName = v
10✔
125
                case containerNamePrefixField:
2✔
126
                        containerNamePrefix = v
2✔
127
                case protocolField:
10✔
128
                        protocol = v
10✔
129
                case tagsField:
1✔
130
                        customTags = v
1✔
131
                case matchTagsField:
1✔
132
                        matchTags = strings.EqualFold(v, trueValue)
1✔
133
                case secretNameField:
×
134
                        secretName = v
×
135
                case secretNamespaceField:
×
136
                        secretNamespace = v
×
137
                case isHnsEnabledField:
×
138
                        if strings.EqualFold(v, trueValue) {
×
139
                                isHnsEnabled = ptr.To(true)
×
140
                        }
×
141
                case softDeleteBlobsField:
×
142
                        days, err := parseDays(v)
×
143
                        if err != nil {
×
144
                                return nil, err
×
145
                        }
×
146
                        softDeleteBlobs = days
×
147
                case softDeleteContainersField:
×
148
                        days, err := parseDays(v)
×
149
                        if err != nil {
×
150
                                return nil, err
×
151
                        }
×
152
                        softDeleteContainers = days
×
153
                case enableBlobVersioningField:
×
154
                        enableBlobVersioning = ptr.To(strings.EqualFold(v, trueValue))
×
155
                case storeAccountKeyField:
3✔
156
                        if strings.EqualFold(v, falseValue) {
5✔
157
                                storeAccountKey = false
2✔
158
                        }
2✔
159
                case getLatestAccountKeyField:
1✔
160
                        if getLatestAccountKey, err = strconv.ParseBool(v); err != nil {
2✔
161
                                return nil, status.Errorf(codes.InvalidArgument, "invalid %s: %s in volume context", getLatestAccountKeyField, v)
1✔
162
                        }
1✔
163
                case allowBlobPublicAccessField:
×
164
                        if strings.EqualFold(v, trueValue) {
×
165
                                allowBlobPublicAccess = ptr.To(true)
×
166
                        }
×
167
                case allowSharedKeyAccessField:
2✔
168
                        var boolValue bool
2✔
169
                        if boolValue, err = strconv.ParseBool(v); err != nil {
3✔
170
                                return nil, status.Errorf(codes.InvalidArgument, "invalid %s: %s in volume context", allowSharedKeyAccessField, v)
1✔
171
                        }
1✔
172
                        allowSharedKeyAccess = ptr.To(boolValue)
1✔
173
                case requireInfraEncryptionField:
×
174
                        if strings.EqualFold(v, trueValue) {
×
175
                                requireInfraEncryption = ptr.To(true)
×
176
                        }
×
177
                case pvcNamespaceKey:
×
178
                        pvcNamespace = v
×
179
                        containerNameReplaceMap[pvcNamespaceMetadata] = v
×
180
                case pvcNameKey:
×
181
                        containerNameReplaceMap[pvcNameMetadata] = v
×
182
                case pvNameKey:
×
183
                        containerNameReplaceMap[pvNameMetadata] = v
×
184
                case serverNameField:
×
185
                case storageAuthTypeField:
1✔
186
                case storageIdentityClientIDField:
1✔
187
                case storageIdentityObjectIDField:
1✔
188
                case storageIdentityResourceIDField:
1✔
189
                case clientIDField:
1✔
190
                case tenantIDField:
1✔
191
                case msiEndpointField:
1✔
192
                case storageAADEndpointField:
1✔
193
                        // no op, only used in NodeStageVolume
194
                case storageEndpointSuffixField:
×
195
                        storageEndpointSuffix = v
×
196
                case vnetResourceGroupField:
×
197
                        vnetResourceGroup = v
×
198
                case vnetNameField:
×
199
                        vnetName = v
×
200
                case subnetNameField:
1✔
201
                        subnetName = v
1✔
202
                case accessTierField:
×
203
                        accessTier = v
×
204
                case networkEndpointTypeField:
1✔
205
                        networkEndpointType = v
1✔
206
                case mountPermissionsField:
11✔
207
                        // only do validations here, used in NodeStageVolume, NodePublishVolume
11✔
208
                        if v != "" {
22✔
209
                                if _, err := strconv.ParseUint(v, 8, 32); err != nil {
12✔
210
                                        return nil, status.Errorf(codes.InvalidArgument, "invalid mountPermissions %s in storage class", v)
1✔
211
                                }
1✔
212
                        }
213
                case useDataPlaneAPIField:
1✔
214
                        useDataPlaneAPI = strings.EqualFold(v, trueValue)
1✔
215
                case fsGroupChangePolicyField:
1✔
216
                        fsGroupChangePolicy = v
1✔
217
                case tagValueDelimiterField:
×
218
                        tagValueDelimiter = v
×
219
                default:
1✔
220
                        return nil, status.Errorf(codes.InvalidArgument, "invalid parameter %q in storage class", k)
1✔
221
                }
222
        }
223

224
        if ptr.Deref(enableBlobVersioning, false) {
17✔
225
                if isNFSProtocol(protocol) || ptr.Deref(isHnsEnabled, false) {
×
226
                        return nil, status.Errorf(codes.InvalidArgument, "enableBlobVersioning is not supported for NFS protocol or HNS enabled account")
×
227
                }
×
228
        }
229

230
        if !isSupportedFSGroupChangePolicy(fsGroupChangePolicy) {
18✔
231
                return nil, status.Errorf(codes.InvalidArgument, "fsGroupChangePolicy(%s) is not supported, supported fsGroupChangePolicy list: %v", fsGroupChangePolicy, supportedFSGroupChangePolicyList)
1✔
232
        }
1✔
233

234
        if matchTags && account != "" {
17✔
235
                return nil, status.Errorf(codes.InvalidArgument, "matchTags must set as false when storageAccount(%s) is provided", account)
1✔
236
        }
1✔
237

238
        if resourceGroup == "" {
21✔
239
                resourceGroup = d.cloud.ResourceGroup
6✔
240
        }
6✔
241

242
        if secretNamespace == "" {
30✔
243
                if pvcNamespace == "" {
30✔
244
                        secretNamespace = defaultNamespace
15✔
245
                } else {
15✔
246
                        secretNamespace = pvcNamespace
×
247
                }
×
248
        }
249

250
        if protocol == "" {
20✔
251
                protocol = Fuse
5✔
252
        }
5✔
253
        if !isSupportedProtocol(protocol) {
16✔
254
                return nil, status.Errorf(codes.InvalidArgument, "protocol(%s) is not supported, supported protocol list: %v", protocol, supportedProtocolList)
1✔
255
        }
1✔
256
        if !isSupportedAccessTier(accessTier) {
14✔
257
                return nil, status.Errorf(codes.InvalidArgument, "accessTier(%s) is not supported, supported AccessTier list: %v", accessTier, armstorage.PossibleAccessTierValues())
×
258
        }
×
259

260
        if containerName != "" && containerNamePrefix != "" {
15✔
261
                return nil, status.Errorf(codes.InvalidArgument, "containerName(%s) and containerNamePrefix(%s) could not be specified together", containerName, containerNamePrefix)
1✔
262
        }
1✔
263
        if !isSupportedContainerNamePrefix(containerNamePrefix) {
14✔
264
                return nil, status.Errorf(codes.InvalidArgument, "containerNamePrefix(%s) can only contain lowercase letters, numbers, hyphens, and length should be less than 21", containerNamePrefix)
1✔
265
        }
1✔
266

267
        enableHTTPSTrafficOnly := true
12✔
268
        if strings.EqualFold(networkEndpointType, privateEndpoint) {
13✔
269
                if strings.Contains(subnetName, ",") {
2✔
270
                        return nil, status.Errorf(codes.InvalidArgument, "subnetName(%s) can only contain one subnet for private endpoint", subnetName)
1✔
271
                }
1✔
272
                createPrivateEndpoint = ptr.To(true)
×
273
        }
274
        accountKind := string(armstorage.KindStorageV2)
11✔
275
        if isNFSProtocol(protocol) {
12✔
276
                isHnsEnabled = ptr.To(true)
1✔
277
                enableNfsV3 = ptr.To(true)
1✔
278
                // NFS protocol does not need account key
1✔
279
                storeAccountKey = false
1✔
280
                if !ptr.Deref(createPrivateEndpoint, false) {
2✔
281
                        // set VirtualNetworkResourceIDs for storage account firewall setting
1✔
282
                        var err error
1✔
283
                        if vnetResourceIDs, err = d.updateSubnetServiceEndpoints(ctx, vnetResourceGroup, vnetName, subnetName); err != nil {
2✔
284
                                return nil, status.Errorf(codes.Internal, "update service endpoints failed with error: %v", err)
1✔
285
                        }
1✔
286
                }
287
        }
288

289
        if strings.HasPrefix(strings.ToLower(storageAccountType), "premium") {
11✔
290
                accountKind = string(armstorage.KindBlockBlobStorage)
1✔
291
        }
1✔
292
        if IsAzureStackCloud(d.cloud) {
11✔
293
                accountKind = string(armstorage.KindStorage)
1✔
294
                if storageAccountType != "" && storageAccountType != string(armstorage.SKUNameStandardLRS) && storageAccountType != string(armstorage.SKUNamePremiumLRS) {
2✔
295
                        return nil, status.Errorf(codes.InvalidArgument, "Invalid skuName value: %s, as Azure Stack only supports %s and %s Storage Account types.", storageAccountType, armstorage.SKUNamePremiumLRS, armstorage.SKUNameStandardLRS)
1✔
296
                }
1✔
297
        }
298

299
        tags, err := util.ConvertTagsToMap(customTags, tagValueDelimiter)
9✔
300
        if err != nil {
10✔
301
                return nil, status.Errorf(codes.InvalidArgument, "%v", err)
1✔
302
        }
1✔
303

304
        if strings.TrimSpace(storageEndpointSuffix) == "" {
16✔
305
                storageEndpointSuffix = d.getStorageEndPointSuffix()
8✔
306
        }
8✔
307

308
        if storeAccountKey && !ptr.Deref(allowSharedKeyAccess, true) {
9✔
309
                return nil, status.Errorf(codes.InvalidArgument, "storeAccountKey is not supported for account with shared access key disabled")
1✔
310
        }
1✔
311

312
        requestName := "controller_create_volume"
7✔
313
        if volContentSource != nil {
9✔
314
                switch volContentSource.Type.(type) {
2✔
315
                case *csi.VolumeContentSource_Snapshot:
1✔
316
                        return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
1✔
317
                case *csi.VolumeContentSource_Volume:
1✔
318
                        requestName = "controller_create_volume_from_volume"
1✔
319
                        if volContentSource.GetVolume() != nil {
2✔
320
                                sourceID := volContentSource.GetVolume().VolumeId
1✔
321
                                _, srcAccountName, _, _, _, err = GetContainerInfo(sourceID) //nolint:dogsled
1✔
322
                                if err != nil {
2✔
323
                                        klog.Errorf("failed to get source volume info from sourceID(%s), error: %v", sourceID, err)
1✔
324
                                } else {
1✔
325
                                        klog.V(2).Infof("source volume account name: %s, sourceID: %s", srcAccountName, sourceID)
×
326
                                }
×
327
                        }
328
                }
329
        }
330

331
        accountOptions := &storage.AccountOptions{
6✔
332
                Name:                            account,
6✔
333
                Type:                            storageAccountType,
6✔
334
                Kind:                            accountKind,
6✔
335
                SubscriptionID:                  subsID,
6✔
336
                ResourceGroup:                   resourceGroup,
6✔
337
                Location:                        location,
6✔
338
                EnableHTTPSTrafficOnly:          enableHTTPSTrafficOnly,
6✔
339
                VirtualNetworkResourceIDs:       vnetResourceIDs,
6✔
340
                Tags:                            tags,
6✔
341
                MatchTags:                       matchTags,
6✔
342
                IsHnsEnabled:                    isHnsEnabled,
6✔
343
                EnableNfsV3:                     enableNfsV3,
6✔
344
                AllowBlobPublicAccess:           allowBlobPublicAccess,
6✔
345
                AllowSharedKeyAccess:            allowSharedKeyAccess,
6✔
346
                RequireInfrastructureEncryption: requireInfraEncryption,
6✔
347
                VNetResourceGroup:               vnetResourceGroup,
6✔
348
                VNetName:                        vnetName,
6✔
349
                SubnetName:                      subnetName,
6✔
350
                AccessTier:                      accessTier,
6✔
351
                CreatePrivateEndpoint:           createPrivateEndpoint,
6✔
352
                StorageType:                     storage.StorageTypeBlob,
6✔
353
                StorageEndpointSuffix:           storageEndpointSuffix,
6✔
354
                EnableBlobVersioning:            enableBlobVersioning,
6✔
355
                SoftDeleteBlobs:                 softDeleteBlobs,
6✔
356
                SoftDeleteContainers:            softDeleteContainers,
6✔
357
                GetLatestAccountKey:             getLatestAccountKey,
6✔
358
                SourceAccountName:               srcAccountName,
6✔
359
        }
6✔
360

6✔
361
        containerName = replaceWithMap(containerName, containerNameReplaceMap)
6✔
362
        validContainerName := containerName
6✔
363
        if validContainerName == "" {
7✔
364
                validContainerName = volName
1✔
365
                if containerNamePrefix != "" {
1✔
366
                        validContainerName = containerNamePrefix + "-" + volName
×
367
                }
×
368
                validContainerName = getValidContainerName(validContainerName, protocol)
1✔
369
                setKeyValueInMap(parameters, containerNameField, validContainerName)
1✔
370
        }
371

372
        if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
6✔
373
                // logging the job status if it's volume cloning
×
374
                if volContentSource != nil {
×
375
                        jobState, percent, err := d.azcopy.GetAzcopyJob(validContainerName, []string{})
×
376
                        return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err)
×
377
                }
×
378
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
×
379
        }
380
        defer d.volumeLocks.Release(volName)
6✔
381

6✔
382
        var volumeID string
6✔
383
        mc := metrics.NewMetricContext(blobCSIDriverName, requestName, d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
6✔
384
        isOperationSucceeded := false
6✔
385
        defer func() {
12✔
386
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
6✔
387
        }()
6✔
388

389
        var accountKey string
6✔
390
        accountName := account
6✔
391
        if len(secrets) == 0 && accountName == "" {
7✔
392
                if v, ok := d.volMap.Load(volName); ok {
1✔
393
                        accountName = v.(string)
×
394
                } else {
1✔
395
                        lockKey := fmt.Sprintf("%s%s%s%s%s%v", storageAccountType, accountKind, resourceGroup, location, protocol, ptr.Deref(createPrivateEndpoint, false))
1✔
396
                        // search in cache first
1✔
397
                        cache, err := d.accountSearchCache.Get(ctx, lockKey, azcache.CacheReadTypeDefault)
1✔
398
                        if err != nil {
1✔
399
                                return nil, status.Errorf(codes.Internal, "%v", err)
×
400
                        }
×
401
                        if cache != nil {
1✔
402
                                accountName = cache.(string)
×
403
                        } else {
1✔
404
                                d.volLockMap.LockEntry(lockKey)
1✔
405
                                err = wait.ExponentialBackoff(getBackOff(d.cloud.Config), func() (bool, error) {
2✔
406
                                        var retErr error
1✔
407
                                        accountName, accountKey, retErr = d.cloud.EnsureStorageAccount(ctx, accountOptions, protocol)
1✔
408
                                        if isRetriableError(retErr) {
1✔
409
                                                klog.Warningf("EnsureStorageAccount(%s) failed with error(%v), waiting for retrying", account, retErr)
×
410
                                                return false, nil
×
411
                                        }
×
412
                                        return true, retErr
1✔
413
                                })
414
                                d.volLockMap.UnlockEntry(lockKey)
1✔
415
                                if err != nil {
2✔
416
                                        return nil, status.Errorf(codes.Internal, "ensure storage account failed with %v", err)
1✔
417
                                }
1✔
418
                                d.accountSearchCache.Set(lockKey, accountName)
×
419
                                d.volMap.Store(volName, accountName)
×
420
                        }
421
                }
422
        }
423

424
        if ptr.Deref(createPrivateEndpoint, false) && isNFSProtocol(protocol) {
5✔
425
                // As for blobfuse/blobfuse2, serverName, i.e.,AZURE_STORAGE_BLOB_ENDPOINT env variable can't include
×
426
                // "privatelink", issue: https://github.com/Azure/azure-storage-fuse/issues/1014
×
427
                //
×
428
                // And use public endpoint will be befine to blobfuse/blobfuse2, because it will be resolved to private endpoint
×
429
                // by private dns zone, which includes CNAME record, documented here:
×
430
                // https://learn.microsoft.com/en-us/azure/storage/common/storage-private-endpoints?toc=%2Fazure%2Fstorage%2Fblobs%2Ftoc.json&bc=%2Fazure%2Fstorage%2Fblobs%2Fbreadcrumb%2Ftoc.json#dns-changes-for-private-endpoints
×
431
                setKeyValueInMap(parameters, serverNameField, fmt.Sprintf("%s.privatelink.blob.%s", accountName, storageEndpointSuffix))
×
432
        }
×
433

434
        accountOptions.Name = accountName
5✔
435
        if len(secrets) == 0 && useDataPlaneAPI {
6✔
436
                if accountKey == "" {
2✔
437
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
2✔
438
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
439
                        }
1✔
440
                }
441
                secrets = createStorageAccountSecret(accountName, accountKey)
×
442
        }
443

444
        klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
4✔
445
        if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, storageEndpointSuffix, secrets); err != nil {
5✔
446
                return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
1✔
447
        }
1✔
448
        if volContentSource != nil {
4✔
449
                accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace, false)
1✔
450
                if err != nil {
1✔
451
                        return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
×
452
                }
×
453
                copyErr := d.copyVolume(ctx, req, accountName, accountSASToken, authAzcopyEnv, validContainerName, secretNamespace, accountOptions, storageEndpointSuffix)
1✔
454
                if accountSASToken == "" && copyErr != nil && strings.Contains(copyErr.Error(), authorizationPermissionMismatch) {
1✔
455
                        klog.Warningf("azcopy copy failed with AuthorizationPermissionMismatch error, should assign \"Storage Blob Data Contributor\" role to controller identity, fall back to use sas token, original error: %v", copyErr)
×
456
                        accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace, true)
×
457
                        if err != nil {
×
458
                                return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
×
459
                        }
×
460
                        copyErr = d.copyVolume(ctx, req, accountName, accountSASToken, authAzcopyEnv, validContainerName, secretNamespace, accountOptions, storageEndpointSuffix)
×
461
                }
462
                if copyErr != nil {
2✔
463
                        return nil, copyErr
1✔
464
                }
1✔
465
        }
466

467
        if storeAccountKey && len(req.GetSecrets()) == 0 {
4✔
468
                if accountKey == "" {
4✔
469
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
3✔
470
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
471
                        }
1✔
472
                }
473

474
                secretName, err := setAzureCredentials(ctx, d.KubeClient, accountName, accountKey, secretNamespace)
1✔
475
                if err != nil {
1✔
476
                        return nil, status.Errorf(codes.Internal, "failed to store storage account key: %v", err)
×
477
                }
×
478
                if secretName != "" {
1✔
479
                        klog.V(2).Infof("store account key to k8s secret(%v) in %s namespace", secretName, secretNamespace)
×
480
                }
×
481
        }
482

483
        var uuid string
1✔
484
        if containerName != "" {
2✔
485
                // add volume name as suffix to differentiate volumeID since "containerName" is specified
1✔
486
                // not necessary for dynamic container name creation since volumeID already contains volume name
1✔
487
                uuid = volName
1✔
488
        }
1✔
489
        volumeID = fmt.Sprintf(volumeIDTemplate, resourceGroup, accountName, validContainerName, uuid, secretNamespace, subsID)
1✔
490
        klog.V(2).Infof("create container %s on storage account %s successfully", validContainerName, accountName)
1✔
491

1✔
492
        if useDataPlaneAPI {
1✔
493
                d.dataPlaneAPIVolCache.Set(volumeID, "")
×
494
                d.dataPlaneAPIVolCache.Set(accountName, "")
×
495
        }
×
496

497
        isOperationSucceeded = true
1✔
498
        // reset secretNamespace field in VolumeContext
1✔
499
        setKeyValueInMap(parameters, secretNamespaceField, secretNamespace)
1✔
500
        return &csi.CreateVolumeResponse{
1✔
501
                Volume: &csi.Volume{
1✔
502
                        VolumeId:      volumeID,
1✔
503
                        CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
1✔
504
                        VolumeContext: parameters,
1✔
505
                        ContentSource: volContentSource,
1✔
506
                },
1✔
507
        }, nil
1✔
508
}
509

510
// DeleteVolume delete a volume
511
func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
5✔
512
        volumeID := req.GetVolumeId()
5✔
513
        if len(volumeID) == 0 {
6✔
514
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
515
        }
1✔
516

517
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
4✔
518
                return nil, status.Errorf(codes.Internal, "invalid delete volume req: %v", req)
×
519
        }
×
520

521
        if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
4✔
522
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
×
523
        }
×
524
        defer d.volumeLocks.Release(volumeID)
4✔
525

4✔
526
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
4✔
527
        if err != nil {
5✔
528
                // According to CSI Driver Sanity Tester, should succeed when an invalid volume id is used
1✔
529
                klog.Errorf("GetContainerInfo(%s) in DeleteVolume failed with error: %v", volumeID, err)
1✔
530
                return &csi.DeleteVolumeResponse{}, nil
1✔
531
        }
1✔
532

533
        secrets := req.GetSecrets()
3✔
534
        if len(secrets) == 0 && d.useDataPlaneAPI(ctx, volumeID, accountName) {
4✔
535
                _, accountName, accountKey, _, _, err := d.GetAuthEnv(ctx, volumeID, "", nil, secrets)
1✔
536
                if err != nil {
2✔
537
                        return nil, status.Errorf(codes.Internal, "GetAuthEnv(%s) failed with %v", volumeID, err)
1✔
538
                }
1✔
539
                if accountName != "" && accountKey != "" {
×
540
                        secrets = createStorageAccountSecret(accountName, accountKey)
×
541
                }
×
542
        }
543

544
        mc := metrics.NewMetricContext(blobCSIDriverName, "controller_delete_volume", d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
2✔
545
        isOperationSucceeded := false
2✔
546
        defer func() {
4✔
547
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
2✔
548
        }()
2✔
549

550
        if resourceGroupName == "" {
3✔
551
                resourceGroupName = d.cloud.ResourceGroup
1✔
552
        }
1✔
553
        klog.V(2).Infof("deleting container(%s) rg(%s) account(%s) volumeID(%s)", containerName, resourceGroupName, accountName, volumeID)
2✔
554
        if err := d.DeleteBlobContainer(ctx, subsID, resourceGroupName, accountName, containerName, secrets); err != nil {
4✔
555
                return nil, status.Errorf(codes.Internal, "failed to delete container(%s) under rg(%s) account(%s) volumeID(%s), error: %v", containerName, resourceGroupName, accountName, volumeID, err)
2✔
556
        }
2✔
557

558
        isOperationSucceeded = true
×
559
        klog.V(2).Infof("container(%s) under rg(%s) account(%s) volumeID(%s) is deleted successfully", containerName, resourceGroupName, accountName, volumeID)
×
560
        return &csi.DeleteVolumeResponse{}, nil
×
561
}
562

563
// ValidateVolumeCapabilities return the capabilities of the volume
564
func (d *Driver) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
8✔
565
        volumeID := req.GetVolumeId()
8✔
566
        if len(volumeID) == 0 {
9✔
567
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
568
        }
1✔
569
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
9✔
570
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
571
        }
2✔
572

573
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
5✔
574
        if err != nil {
6✔
575
                klog.Errorf("GetContainerInfo(%s) in ValidateVolumeCapabilities failed with error: %v", volumeID, err)
1✔
576
                return nil, status.Error(codes.NotFound, err.Error())
1✔
577
        }
1✔
578

579
        var exist bool
4✔
580
        secrets := req.GetSecrets()
4✔
581
        if len(secrets) > 0 {
5✔
582
                container, err := getContainerReference(containerName, secrets, d.getStorageEndPointSuffix())
1✔
583
                if err != nil {
2✔
584
                        return nil, status.Error(codes.Internal, err.Error())
1✔
585
                }
1✔
586
                exist, err = container.Exists()
×
587
                if err != nil {
×
588
                        return nil, status.Error(codes.Internal, err.Error())
×
589
                }
×
590
        } else {
3✔
591
                if resourceGroupName == "" {
3✔
592
                        resourceGroupName = d.cloud.ResourceGroup
×
593
                }
×
594
                blobClient, err := d.clientFactory.GetBlobContainerClientForSub(subsID)
3✔
595
                if err != nil {
3✔
596
                        return nil, status.Error(codes.Internal, err.Error())
×
597
                }
×
598

599
                blobContainer, err := blobClient.Get(ctx, resourceGroupName, accountName, containerName)
3✔
600
                if err != nil {
4✔
601
                        return nil, status.Error(codes.Internal, err.Error())
1✔
602
                }
1✔
603
                if blobContainer.ContainerProperties == nil {
3✔
604
                        return nil, status.Errorf(codes.Internal, "ContainerProperties of volume(%s) is nil", volumeID)
1✔
605
                }
1✔
606
                exist = blobContainer.ContainerProperties.Deleted != nil && !*blobContainer.ContainerProperties.Deleted
1✔
607
        }
608
        if !exist {
2✔
609
                return nil, status.Errorf(codes.NotFound, "requested volume(%s) does not exist", volumeID)
1✔
610
        }
1✔
611
        klog.V(2).Infof("ValidateVolumeCapabilities on volume(%s) succeeded", volumeID)
×
612

×
613
        // blob driver supports all AccessModes, no need to check capabilities here
×
614
        return &csi.ValidateVolumeCapabilitiesResponse{
×
615
                Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
×
616
                        VolumeCapabilities: req.GetVolumeCapabilities(),
×
617
                },
×
618
                Message: "",
×
619
        }, nil
×
620
}
621

622
// ControllerModifyVolume modify volume
623
func (d *Driver) ControllerModifyVolume(_ context.Context, _ *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) {
×
624
        return nil, status.Error(codes.Unimplemented, "")
×
625
}
×
626

627
func (d *Driver) ControllerPublishVolume(_ context.Context, _ *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
1✔
628
        return nil, status.Error(codes.Unimplemented, "ControllerPublishVolume is not yet implemented")
1✔
629
}
1✔
630

631
func (d *Driver) ControllerUnpublishVolume(_ context.Context, _ *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
1✔
632
        return nil, status.Error(codes.Unimplemented, "ControllerUnpublishVolume is not yet implemented")
1✔
633
}
1✔
634

635
// ControllerGetVolume get volume
636
func (d *Driver) ControllerGetVolume(context.Context, *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
1✔
637
        return nil, status.Error(codes.Unimplemented, "ControllerGetVolume is not yet implemented")
1✔
638
}
1✔
639

640
// GetCapacity returns the capacity of the total available storage pool
641
func (d *Driver) GetCapacity(_ context.Context, _ *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
1✔
642
        return nil, status.Error(codes.Unimplemented, "GetCapacity is not yet implemented")
1✔
643
}
1✔
644

645
// ListVolumes return all available volumes
646
func (d *Driver) ListVolumes(_ context.Context, _ *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
1✔
647
        return nil, status.Error(codes.Unimplemented, "ListVolumes is not yet implemented")
1✔
648
}
1✔
649

650
// CreateSnapshot create snapshot
651
func (d *Driver) CreateSnapshot(_ context.Context, _ *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
1✔
652
        return nil, status.Error(codes.Unimplemented, "CreateSnapshot is not yet implemented")
1✔
653
}
1✔
654

655
// DeleteSnapshot delete snapshot
656
func (d *Driver) DeleteSnapshot(_ context.Context, _ *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
1✔
657
        return nil, status.Error(codes.Unimplemented, "DeleteSnapshot is not yet implemented")
1✔
658
}
1✔
659

660
// ListSnapshots list snapshots
661
func (d *Driver) ListSnapshots(_ context.Context, _ *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
1✔
662
        return nil, status.Error(codes.Unimplemented, "ListSnapshots is not yet implemented")
1✔
663
}
1✔
664

665
// ControllerGetCapabilities returns the capabilities of the Controller plugin
666
func (d *Driver) ControllerGetCapabilities(_ context.Context, _ *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
1✔
667
        return &csi.ControllerGetCapabilitiesResponse{
1✔
668
                Capabilities: d.Cap,
1✔
669
        }, nil
1✔
670
}
1✔
671

672
// ControllerExpandVolume controller expand volume
673
func (d *Driver) ControllerExpandVolume(_ context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
4✔
674
        if len(req.GetVolumeId()) == 0 {
5✔
675
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
676
        }
1✔
677

678
        if req.GetCapacityRange() == nil {
4✔
679
                return nil, status.Error(codes.InvalidArgument, "Capacity Range missing in request")
1✔
680
        }
1✔
681

682
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_EXPAND_VOLUME); err != nil {
2✔
683
                return nil, status.Errorf(codes.Internal, "invalid expand volume req: %v", req)
×
684
        }
×
685

686
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
2✔
687
        requestGiB := int64(util.RoundUpGiB(volSizeBytes))
2✔
688

2✔
689
        if volSizeBytes > containerMaxSize {
3✔
690
                return nil, status.Errorf(codes.OutOfRange, "required bytes (%d) exceeds the maximum supported bytes (%d)", volSizeBytes, containerMaxSize)
1✔
691
        }
1✔
692

693
        klog.V(2).Infof("ControllerExpandVolume(%s) successfully, currentQuota: %d Gi", req.VolumeId, requestGiB)
1✔
694

1✔
695
        return &csi.ControllerExpandVolumeResponse{CapacityBytes: req.GetCapacityRange().GetRequiredBytes()}, nil
1✔
696
}
697

698
// CreateBlobContainer creates a blob container
699
func (d *Driver) CreateBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName, storageEndpointSuffix string, secrets map[string]string) error {
10✔
700
        if containerName == "" {
11✔
701
                return fmt.Errorf("containerName is empty")
1✔
702
        }
1✔
703
        return wait.ExponentialBackoff(getBackOff(d.cloud.Config), func() (bool, error) {
18✔
704
                var err error
9✔
705
                if len(secrets) > 0 {
10✔
706
                        container, getErr := getContainerReference(containerName, secrets, storageEndpointSuffix)
1✔
707
                        if getErr != nil {
2✔
708
                                return true, getErr
1✔
709
                        }
1✔
710
                        container.Metadata = map[string]string{createdByMetadata: d.Name}
×
711
                        _, err = container.CreateIfNotExists(&azstorage.CreateContainerOptions{Access: azstorage.ContainerAccessTypePrivate})
×
712
                } else {
8✔
713
                        blobContainer := armstorage.BlobContainer{
8✔
714
                                ContainerProperties: &armstorage.ContainerProperties{
8✔
715
                                        PublicAccess: to.Ptr(armstorage.PublicAccessNone),
8✔
716
                                        Metadata:     map[string]*string{createdByMetadata: to.Ptr(d.Name)},
8✔
717
                                },
8✔
718
                        }
8✔
719
                        var blobClient blobcontainerclient.Interface
8✔
720
                        blobClient, err = d.clientFactory.GetBlobContainerClientForSub(subsID)
8✔
721
                        if err != nil {
8✔
722
                                return true, err
×
723
                        }
×
724
                        _, err = blobClient.CreateContainer(ctx, resourceGroupName, accountName, containerName, blobContainer)
8✔
725
                }
726
                if err != nil {
12✔
727
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
4✔
728
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) {
6✔
729
                                klog.Warningf("CreateContainer(%s, %s, %s) failed with error(%v), retry", resourceGroupName, accountName, containerName, err)
2✔
730
                                return false, nil
2✔
731
                        }
2✔
732
                }
733
                return true, err
6✔
734
        })
735
}
736

737
// DeleteBlobContainer deletes a blob container
738
func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
8✔
739
        if containerName == "" {
9✔
740
                return fmt.Errorf("containerName is empty")
1✔
741
        }
1✔
742
        return wait.ExponentialBackoff(getBackOff(d.cloud.Config), func() (bool, error) {
14✔
743
                var err error
7✔
744
                if len(secrets) > 0 {
10✔
745
                        container, getErr := getContainerReference(containerName, secrets, d.getStorageEndPointSuffix())
3✔
746
                        if getErr != nil {
6✔
747
                                return true, getErr
3✔
748
                        }
3✔
749
                        _, err = container.DeleteIfExists(nil)
×
750
                } else {
4✔
751
                        var blobClient blobcontainerclient.Interface
4✔
752
                        blobClient, err = d.clientFactory.GetBlobContainerClientForSub(subsID)
4✔
753
                        if err != nil {
4✔
754
                                return true, err
×
755
                        }
×
756
                        err = blobClient.DeleteContainer(ctx, resourceGroupName, accountName, containerName)
4✔
757
                }
758
                if err != nil {
7✔
759
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
3✔
760
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) ||
3✔
761
                                strings.Contains(err.Error(), statusCodeNotFound) ||
3✔
762
                                strings.Contains(err.Error(), httpCodeNotFound) {
5✔
763
                                klog.Warningf("delete container(%s) on account(%s) failed with error(%v), return as success", containerName, accountName, err)
2✔
764
                                return true, nil
2✔
765
                        }
2✔
766
                        return false, fmt.Errorf("failed to delete container(%s) on account(%s), error: %w", containerName, accountName, err)
1✔
767
                }
768
                return true, err
1✔
769
        })
770
}
771

772
// copyBlobContainer copies source volume content into a destination volume
773
func (d *Driver) copyBlobContainer(ctx context.Context, req *csi.CreateVolumeRequest, dstAccountName string, dstAccountSasToken string, authAzcopyEnv []string, dstContainerName string, secretNamespace string, accountOptions *storage.AccountOptions, storageEndpointSuffix string) error {
7✔
774
        var sourceVolumeID string
7✔
775
        if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
14✔
776
                sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
7✔
777

7✔
778
        }
7✔
779
        srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
7✔
780
        if err != nil {
9✔
781
                return status.Error(codes.NotFound, err.Error())
2✔
782
        }
2✔
783
        if dstAccountName == "" {
9✔
784
                dstAccountName = srcAccountName
4✔
785
        }
4✔
786
        if srcAccountName == "" || srcContainerName == "" || dstContainerName == "" {
7✔
787
                return fmt.Errorf("srcAccountName(%s) or srcContainerName(%s) or dstContainerName(%s) is empty", srcAccountName, srcContainerName, dstContainerName)
2✔
788
        }
2✔
789
        srcAccountSasToken := dstAccountSasToken
3✔
790
        if srcAccountName != dstAccountName && dstAccountSasToken != "" {
4✔
791
                srcAccountOptions := &storage.AccountOptions{
1✔
792
                        Name:                srcAccountName,
1✔
793
                        ResourceGroup:       srcResourceGroupName,
1✔
794
                        SubscriptionID:      srcSubscriptionID,
1✔
795
                        GetLatestAccountKey: accountOptions.GetLatestAccountKey,
1✔
796
                }
1✔
797
                if srcAccountSasToken, _, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, nil, "", secretNamespace, true); err != nil {
2✔
798
                        return err
1✔
799
                }
1✔
800
        }
801
        srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", srcAccountName, storageEndpointSuffix, srcContainerName, srcAccountSasToken)
2✔
802
        dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", dstAccountName, storageEndpointSuffix, dstContainerName, dstAccountSasToken)
2✔
803

2✔
804
        jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
2✔
805
        klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
2✔
806
        switch jobState {
2✔
807
        case util.AzcopyJobError, util.AzcopyJobCompleted, util.AzcopyJobCompletedWithErrors, util.AzcopyJobCompletedWithSkipped, util.AzcopyJobCompletedWithErrorsAndSkipped:
1✔
808
                return err
1✔
809
        case util.AzcopyJobRunning:
1✔
810
                err = wait.PollUntilContextTimeout(ctx, 20*time.Second, time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, true, func(context.Context) (bool, error) {
4✔
811
                        jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
3✔
812
                        klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
3✔
813
                        if err != nil {
3✔
814
                                return false, err
×
815
                        }
×
816
                        if jobState == util.AzcopyJobRunning {
6✔
817
                                return false, nil
3✔
818
                        }
3✔
819
                        return true, nil
×
820
                })
821
        case util.AzcopyJobNotFound:
×
822
                klog.V(2).Infof("copy blob container %s:%s to %s:%s", srcAccountName, srcContainerName, dstAccountName, dstContainerName)
×
823
                execFunc := func() error {
×
824
                        if out, err := d.execAzcopyCopy(srcPath, dstPath, azcopyCloneVolumeOptions, authAzcopyEnv); err != nil {
×
825
                                return fmt.Errorf("exec error: %v, output: %v", err, string(out))
×
826
                        }
×
827
                        return nil
×
828
                }
829
                timeoutFunc := func() error {
×
830
                        _, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
×
831
                        return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcContainerName, dstContainerName, percent)
×
832
                }
×
833
                err = util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
×
834
        }
835
        if err != nil {
2✔
836
                klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstContainerName, err)
1✔
837
        } else {
1✔
838
                klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
×
839
                if out, err := d.azcopy.CleanJobs(); err != nil {
×
840
                        klog.Warningf("clean azcopy jobs failed with error: %v, output: %s", err, string(out))
×
841
                }
×
842
        }
843
        return err
1✔
844
}
845

846
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
847
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountName string, accountSASToken string, authAzcopyEnv []string, dstContainerName, secretNamespace string, accountOptions *storage.AccountOptions, storageEndpointSuffix string) error {
8✔
848
        vs := req.VolumeContentSource
8✔
849
        switch vs.Type.(type) {
8✔
850
        case *csi.VolumeContentSource_Snapshot:
1✔
851
                return status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
1✔
852
        case *csi.VolumeContentSource_Volume:
7✔
853
                return d.copyBlobContainer(ctx, req, accountName, accountSASToken, authAzcopyEnv, dstContainerName, secretNamespace, accountOptions, storageEndpointSuffix)
7✔
854
        default:
×
855
                return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
×
856
        }
857
}
858

859
// execAzcopyCopy exec azcopy copy command
860
func (d *Driver) execAzcopyCopy(srcPath, dstPath string, azcopyCopyOptions, authAzcopyEnv []string) ([]byte, error) {
×
861
        cmd := exec.Command("azcopy", "copy", srcPath, dstPath)
×
862
        cmd.Args = append(cmd.Args, azcopyCopyOptions...)
×
863
        if len(authAzcopyEnv) > 0 {
×
864
                cmd.Env = append(os.Environ(), authAzcopyEnv...)
×
865
        }
×
866
        return cmd.CombinedOutput()
×
867
}
868

869
// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
870
func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
6✔
871
        azureAuthConfig := d.cloud.Config.AzureAuthConfig
6✔
872
        azureClientConfig := d.cloud.Config.AzureClientConfig
6✔
873
        var authAzcopyEnv []string
6✔
874
        if azureAuthConfig.UseManagedIdentityExtension {
9✔
875
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, MSI))
3✔
876
                if len(azureAuthConfig.UserAssignedIdentityID) > 0 {
4✔
877
                        klog.V(2).Infof("use user assigned managed identity to authorize azcopy")
1✔
878
                        authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyMSIClientID, azureAuthConfig.UserAssignedIdentityID))
1✔
879
                } else {
3✔
880
                        klog.V(2).Infof("use system-assigned managed identity to authorize azcopy")
2✔
881
                }
2✔
882
                return authAzcopyEnv, nil
3✔
883
        }
884
        if len(azureAuthConfig.AADClientSecret) > 0 {
5✔
885
                klog.V(2).Infof("use service principal to authorize azcopy")
2✔
886
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, SPN))
2✔
887
                if azureAuthConfig.AADClientID == "" || azureClientConfig.TenantID == "" {
3✔
888
                        return []string{}, fmt.Errorf("AADClientID and TenantID must be set when use service principal")
1✔
889
                }
1✔
890
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAApplicationID, azureAuthConfig.AADClientID))
1✔
891
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAClientSecret, azureAuthConfig.AADClientSecret))
1✔
892
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyTenantID, azureClientConfig.TenantID))
1✔
893
                klog.V(2).Infof("set AZCOPY_SPA_APPLICATION_ID=%s, AZCOPY_TENANT_ID=%s successfully", azureAuthConfig.AADClientID, azureClientConfig.TenantID)
1✔
894

1✔
895
                return authAzcopyEnv, nil
1✔
896
        }
897
        return []string{}, fmt.Errorf("service principle or managed identity are both not set")
1✔
898
}
899

900
// getAzcopyAuth will only generate sas token for azcopy in following conditions:
901
// 1. secrets is not empty
902
// 2. driver is not using managed identity and service principal
903
// 3. parameter useSasToken is true
904
func (d *Driver) getAzcopyAuth(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, accountOptions *storage.AccountOptions, secrets map[string]string, secretName, secretNamespace string, useSasToken bool) (string, []string, error) {
6✔
905
        var authAzcopyEnv []string
6✔
906
        var err error
6✔
907
        if !useSasToken && !d.useDataPlaneAPI(ctx, "", accountName) && len(secrets) == 0 && len(secretName) == 0 {
7✔
908
                // search in cache first
1✔
909
                if cache, err := d.azcopySasTokenCache.Get(ctx, accountName, azcache.CacheReadTypeDefault); err == nil && cache != nil {
1✔
910
                        klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
911
                        return cache.(string), nil, nil
×
912
                }
×
913

914
                authAzcopyEnv, err = d.authorizeAzcopyWithIdentity()
1✔
915
                if err != nil {
1✔
916
                        klog.Warningf("failed to authorize azcopy with identity, error: %v", err)
×
917
                }
×
918
        }
919

920
        if len(secrets) > 0 || len(secretName) > 0 || len(authAzcopyEnv) == 0 || useSasToken {
11✔
921
                if accountKey == "" {
10✔
922
                        if _, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
7✔
923
                                return "", nil, err
2✔
924
                        }
2✔
925
                }
926
                klog.V(2).Infof("generate sas token for account(%s)", accountName)
3✔
927
                sasToken, err := d.generateSASToken(ctx, accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
3✔
928
                return sasToken, nil, err
3✔
929
        }
930
        return "", authAzcopyEnv, nil
1✔
931
}
932

933
// isValidVolumeCapabilities validates the given VolumeCapability array is valid
934
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
30✔
935
        if len(volCaps) == 0 {
32✔
936
                return fmt.Errorf("volume capabilities missing in request")
2✔
937
        }
2✔
938
        for _, c := range volCaps {
56✔
939
                if c.GetBlock() != nil {
30✔
940
                        return fmt.Errorf("block volume capability not supported")
2✔
941
                }
2✔
942
        }
943
        return nil
26✔
944
}
945

946
func parseDays(dayStr string) (int32, error) {
3✔
947
        days, err := strconv.Atoi(dayStr)
3✔
948
        if err != nil {
4✔
949
                return 0, status.Errorf(codes.InvalidArgument, "invalid %s:%s in storage class", softDeleteBlobsField, dayStr)
1✔
950
        }
1✔
951
        if days <= 0 || days > 365 {
3✔
952
                return 0, status.Errorf(codes.InvalidArgument, "invalid %s:%s in storage class, should be in range [1, 365]", softDeleteBlobsField, dayStr)
1✔
953
        }
1✔
954

955
        return int32(days), nil
1✔
956
}
957

958
// generateSASToken generate a sas token for storage account
959
func (d *Driver) generateSASToken(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, expiryTime int) (string, error) {
5✔
960
        // search in cache first
5✔
961
        cache, err := d.azcopySasTokenCache.Get(ctx, accountName, azcache.CacheReadTypeDefault)
5✔
962
        if err != nil {
5✔
963
                return "", fmt.Errorf("get(%s) from azcopySasTokenCache failed with error: %v", accountName, err)
×
964
        }
×
965
        if cache != nil {
5✔
966
                klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
967
                return cache.(string), nil
×
968
        }
×
969

970
        credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
5✔
971
        if err != nil {
8✔
972
                return "", status.Errorf(codes.Internal, "failed to generate sas token in creating new shared key credential, accountName: %s, err: %v", accountName, err)
3✔
973
        }
3✔
974
        clientOptions := service.ClientOptions{}
2✔
975
        clientOptions.InsecureAllowCredentialWithHTTP = true
2✔
976
        serviceClient, err := service.NewClientWithSharedKeyCredential(fmt.Sprintf("https://%s.blob.%s/", accountName, storageEndpointSuffix), credential, &clientOptions)
2✔
977
        if err != nil {
2✔
978
                return "", status.Errorf(codes.Internal, "failed to generate sas token in creating new client with shared key credential, accountName: %s, err: %v", accountName, err)
×
979
        }
×
980
        sasURL, err := serviceClient.GetSASURL(
2✔
981
                sas.AccountResourceTypes{Object: true, Service: false, Container: true},
2✔
982
                sas.AccountPermissions{Read: true, List: true, Write: true},
2✔
983
                time.Now().Add(time.Duration(expiryTime)*time.Minute),
2✔
984
                &service.GetSASURLOptions{StartTime: to.Ptr(time.Now())},
2✔
985
        )
2✔
986
        if err != nil {
2✔
987
                return "", err
×
988
        }
×
989
        u, err := url.Parse(sasURL)
2✔
990
        if err != nil {
2✔
991
                return "", err
×
992
        }
×
993
        sasToken := "?" + u.RawQuery
2✔
994
        d.azcopySasTokenCache.Set(accountName, sasToken)
2✔
995
        return sasToken, nil
2✔
996
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc