• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 16359912094

18 Jul 2025 01:28AM UTC coverage: 80.461%. Remained the same
16359912094

Pull #2086

github

andyzhangx
feat: install blobfuse 2.5.0 as default version
Pull Request #2086: feat: install blobfuse 2.5.0 as default version

2446 of 3040 relevant lines covered (80.46%)

8.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.39
/pkg/blob/controllerserver.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package blob
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "os"
24
        "os/exec"
25
        "strconv"
26
        "strings"
27
        "time"
28

29
        "google.golang.org/grpc/codes"
30
        "google.golang.org/grpc/status"
31

32
        "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
33
        "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage"
34
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
35
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
36
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
37
        azstorage "github.com/Azure/azure-sdk-for-go/storage"
38
        "github.com/container-storage-interface/spec/lib/go/csi"
39

40
        "k8s.io/apimachinery/pkg/util/wait"
41
        "k8s.io/klog/v2"
42
        "k8s.io/utils/ptr"
43

44
        "sigs.k8s.io/blob-csi-driver/pkg/util"
45
        "sigs.k8s.io/cloud-provider-azure/pkg/azclient/blobcontainerclient"
46
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
47
        "sigs.k8s.io/cloud-provider-azure/pkg/metrics"
48
        "sigs.k8s.io/cloud-provider-azure/pkg/provider/storage"
49
)
50

51
const (
52
        privateEndpoint = "privateendpoint"
53

54
        azcopyAutoLoginType             = "AZCOPY_AUTO_LOGIN_TYPE"
55
        azcopySPAApplicationID          = "AZCOPY_SPA_APPLICATION_ID"
56
        azcopySPAClientSecret           = "AZCOPY_SPA_CLIENT_SECRET"
57
        azcopyTenantID                  = "AZCOPY_TENANT_ID"
58
        azcopyMSIClientID               = "AZCOPY_MSI_CLIENT_ID"
59
        MSI                             = "MSI"
60
        SPN                             = "SPN"
61
        authorizationPermissionMismatch = "AuthorizationPermissionMismatch"
62

63
        createdByMetadata = "createdBy"
64
)
65

66
// CreateVolume provisions a volume
67
func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
25✔
68
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
25✔
69
                klog.Errorf("invalid create volume req: %v", req)
×
70
                return nil, err
×
71
        }
×
72

73
        volName := req.GetName()
25✔
74
        if len(volName) == 0 {
26✔
75
                return nil, status.Error(codes.InvalidArgument, "CreateVolume Name must be provided")
1✔
76
        }
1✔
77

78
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
26✔
79
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
80
        }
2✔
81

82
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
22✔
83
        requestGiB := int(util.RoundUpGiB(volSizeBytes))
22✔
84

22✔
85
        volContentSource := req.GetVolumeContentSource()
22✔
86
        secrets := req.GetSecrets()
22✔
87

22✔
88
        parameters := req.GetParameters()
22✔
89
        if parameters == nil {
23✔
90
                parameters = make(map[string]string)
1✔
91
        }
1✔
92
        var storageAccountType, subsID, resourceGroup, location, account, containerName, containerNamePrefix, protocol, customTags, secretName, secretNamespace, pvcNamespace, tagValueDelimiter string
22✔
93
        var isHnsEnabled, requireInfraEncryption, enableBlobVersioning, createPrivateEndpoint, enableNfsV3, allowSharedKeyAccess *bool
22✔
94
        var vnetResourceGroup, vnetName, vnetLinkName, publicNetworkAccess, subnetName, accessTier, networkEndpointType, storageEndpointSuffix, fsGroupChangePolicy, srcAccountName string
22✔
95
        var matchTags, useDataPlaneAPI, getLatestAccountKey bool
22✔
96
        var softDeleteBlobs, softDeleteContainers int32
22✔
97
        var vnetResourceIDs []string
22✔
98
        var err error
22✔
99
        // set allowBlobPublicAccess as false by default
22✔
100
        allowBlobPublicAccess := ptr.To(false)
22✔
101

22✔
102
        containerNameReplaceMap := map[string]string{}
22✔
103

22✔
104
        // store account key to k8s secret by default
22✔
105
        storeAccountKey := true
22✔
106

22✔
107
        // Apply ProvisionerParameters (case-insensitive). We leave validation of
22✔
108
        // the values to the cloud provider.
22✔
109
        for k, v := range parameters {
130✔
110
                switch strings.ToLower(k) {
108✔
111
                case skuNameField:
9✔
112
                        storageAccountType = v
9✔
113
                case storageAccountTypeField:
10✔
114
                        storageAccountType = v
10✔
115
                case locationField:
10✔
116
                        location = v
10✔
117
                case storageAccountField:
11✔
118
                        account = v
11✔
119
                case subscriptionIDField:
×
120
                        subsID = v
×
121
                case resourceGroupField:
10✔
122
                        resourceGroup = v
10✔
123
                case containerNameField:
11✔
124
                        containerName = v
11✔
125
                case containerNamePrefixField:
2✔
126
                        containerNamePrefix = v
2✔
127
                case protocolField:
10✔
128
                        protocol = v
10✔
129
                case tagsField:
1✔
130
                        customTags = v
1✔
131
                case matchTagsField:
1✔
132
                        matchTags = strings.EqualFold(v, trueValue)
1✔
133
                case secretNameField:
×
134
                        secretName = v
×
135
                case secretNamespaceField:
×
136
                        secretNamespace = v
×
137
                case isHnsEnabledField:
×
138
                        if strings.EqualFold(v, trueValue) {
×
139
                                isHnsEnabled = ptr.To(true)
×
140
                        }
×
141
                case softDeleteBlobsField:
×
142
                        days, err := parseDays(v)
×
143
                        if err != nil {
×
144
                                return nil, err
×
145
                        }
×
146
                        softDeleteBlobs = days
×
147
                case softDeleteContainersField:
×
148
                        days, err := parseDays(v)
×
149
                        if err != nil {
×
150
                                return nil, err
×
151
                        }
×
152
                        softDeleteContainers = days
×
153
                case enableBlobVersioningField:
×
154
                        enableBlobVersioning = ptr.To(strings.EqualFold(v, trueValue))
×
155
                case storeAccountKeyField:
3✔
156
                        if strings.EqualFold(v, falseValue) {
5✔
157
                                storeAccountKey = false
2✔
158
                        }
2✔
159
                case getLatestAccountKeyField:
1✔
160
                        if getLatestAccountKey, err = strconv.ParseBool(v); err != nil {
2✔
161
                                return nil, status.Errorf(codes.InvalidArgument, "invalid %s: %s in volume context", getLatestAccountKeyField, v)
1✔
162
                        }
1✔
163
                case allowBlobPublicAccessField:
×
164
                        if strings.EqualFold(v, trueValue) {
×
165
                                allowBlobPublicAccess = ptr.To(true)
×
166
                        }
×
167
                case publicNetworkAccessField:
1✔
168
                        publicNetworkAccess = v
1✔
169
                case allowSharedKeyAccessField:
2✔
170
                        var boolValue bool
2✔
171
                        if boolValue, err = strconv.ParseBool(v); err != nil {
3✔
172
                                return nil, status.Errorf(codes.InvalidArgument, "invalid %s: %s in volume context", allowSharedKeyAccessField, v)
1✔
173
                        }
1✔
174
                        allowSharedKeyAccess = ptr.To(boolValue)
1✔
175
                case requireInfraEncryptionField:
×
176
                        if strings.EqualFold(v, trueValue) {
×
177
                                requireInfraEncryption = ptr.To(true)
×
178
                        }
×
179
                case pvcNamespaceKey:
×
180
                        pvcNamespace = v
×
181
                        containerNameReplaceMap[pvcNamespaceMetadata] = v
×
182
                case pvcNameKey:
×
183
                        containerNameReplaceMap[pvcNameMetadata] = v
×
184
                case pvNameKey:
×
185
                        containerNameReplaceMap[pvNameMetadata] = v
×
186
                case serverNameField:
×
187
                case storageAuthTypeField:
1✔
188
                case storageIdentityClientIDField:
1✔
189
                case storageIdentityObjectIDField:
1✔
190
                case storageIdentityResourceIDField:
1✔
191
                case clientIDField:
1✔
192
                case mountWithWITokenField:
1✔
193
                case tenantIDField:
1✔
194
                case msiEndpointField:
1✔
195
                case storageAADEndpointField:
1✔
196
                        // no op, only used in NodeStageVolume
197
                case storageEndpointSuffixField:
×
198
                        storageEndpointSuffix = v
×
199
                case vnetResourceGroupField:
×
200
                        vnetResourceGroup = v
×
201
                case vnetNameField:
×
202
                        vnetName = v
×
203
                case vnetLinkNameField:
1✔
204
                        vnetLinkName = v
1✔
205
                case subnetNameField:
1✔
206
                        subnetName = v
1✔
207
                case accessTierField:
×
208
                        accessTier = v
×
209
                case networkEndpointTypeField:
1✔
210
                        networkEndpointType = v
1✔
211
                case mountPermissionsField:
11✔
212
                        // only do validations here, used in NodeStageVolume, NodePublishVolume
11✔
213
                        if v != "" {
22✔
214
                                if _, err := strconv.ParseUint(v, 8, 32); err != nil {
12✔
215
                                        return nil, status.Errorf(codes.InvalidArgument, "invalid mountPermissions %s in storage class", v)
1✔
216
                                }
1✔
217
                        }
218
                case useDataPlaneAPIField:
1✔
219
                        useDataPlaneAPI = strings.EqualFold(v, trueValue)
1✔
220
                case fsGroupChangePolicyField:
1✔
221
                        fsGroupChangePolicy = v
1✔
222
                case tagValueDelimiterField:
×
223
                        tagValueDelimiter = v
×
224
                default:
1✔
225
                        return nil, status.Errorf(codes.InvalidArgument, "invalid parameter %q in storage class", k)
1✔
226
                }
227
        }
228

229
        if ptr.Deref(enableBlobVersioning, false) {
18✔
230
                if isNFSProtocol(protocol) || ptr.Deref(isHnsEnabled, false) {
×
231
                        return nil, status.Errorf(codes.InvalidArgument, "enableBlobVersioning is not supported for NFS protocol or HNS enabled account")
×
232
                }
×
233
        }
234

235
        if !isSupportedFSGroupChangePolicy(fsGroupChangePolicy) {
19✔
236
                return nil, status.Errorf(codes.InvalidArgument, "fsGroupChangePolicy(%s) is not supported, supported fsGroupChangePolicy list: %v", fsGroupChangePolicy, supportedFSGroupChangePolicyList)
1✔
237
        }
1✔
238

239
        if matchTags && account != "" {
18✔
240
                return nil, status.Errorf(codes.InvalidArgument, "matchTags must set as false when storageAccount(%s) is provided", account)
1✔
241
        }
1✔
242

243
        if resourceGroup == "" {
23✔
244
                resourceGroup = d.cloud.ResourceGroup
7✔
245
        }
7✔
246

247
        if secretNamespace == "" {
32✔
248
                if pvcNamespace == "" {
32✔
249
                        secretNamespace = defaultNamespace
16✔
250
                } else {
16✔
251
                        secretNamespace = pvcNamespace
×
252
                }
×
253
        }
254

255
        if protocol == "" {
22✔
256
                protocol = Fuse
6✔
257
        }
6✔
258
        if !isSupportedProtocol(protocol) {
17✔
259
                return nil, status.Errorf(codes.InvalidArgument, "protocol(%s) is not supported, supported protocol list: %v", protocol, supportedProtocolList)
1✔
260
        }
1✔
261
        if !isSupportedAccessTier(accessTier) {
15✔
262
                return nil, status.Errorf(codes.InvalidArgument, "accessTier(%s) is not supported, supported AccessTier list: %v", accessTier, armstorage.PossibleAccessTierValues())
×
263
        }
×
264
        if !isSupportedPublicNetworkAccess(publicNetworkAccess) {
16✔
265
                return nil, status.Errorf(codes.InvalidArgument, "publicNetworkAccess(%s) is not supported, supported PublicNetworkAccess list: %v", publicNetworkAccess, armstorage.PossiblePublicNetworkAccessValues())
1✔
266
        }
1✔
267

268
        if containerName != "" && containerNamePrefix != "" {
15✔
269
                return nil, status.Errorf(codes.InvalidArgument, "containerName(%s) and containerNamePrefix(%s) could not be specified together", containerName, containerNamePrefix)
1✔
270
        }
1✔
271
        if !isSupportedContainerNamePrefix(containerNamePrefix) {
14✔
272
                return nil, status.Errorf(codes.InvalidArgument, "containerNamePrefix(%s) can only contain lowercase letters, numbers, hyphens, and length should be less than 21", containerNamePrefix)
1✔
273
        }
1✔
274

275
        enableHTTPSTrafficOnly := true
12✔
276
        if strings.EqualFold(networkEndpointType, privateEndpoint) {
13✔
277
                if strings.Contains(subnetName, ",") {
2✔
278
                        return nil, status.Errorf(codes.InvalidArgument, "subnetName(%s) can only contain one subnet for private endpoint", subnetName)
1✔
279
                }
1✔
280
                createPrivateEndpoint = ptr.To(true)
×
281
        }
282
        accountKind := string(armstorage.KindStorageV2)
11✔
283
        if isNFSProtocol(protocol) {
12✔
284
                isHnsEnabled = ptr.To(true)
1✔
285
                enableNfsV3 = ptr.To(true)
1✔
286
                // NFS protocol does not need account key
1✔
287
                storeAccountKey = false
1✔
288
                if !ptr.Deref(createPrivateEndpoint, false) {
2✔
289
                        // set VirtualNetworkResourceIDs for storage account firewall setting
1✔
290
                        var err error
1✔
291
                        if vnetResourceIDs, err = d.updateSubnetServiceEndpoints(ctx, vnetResourceGroup, vnetName, subnetName); err != nil {
2✔
292
                                return nil, status.Errorf(codes.Internal, "update service endpoints failed with error: %v", err)
1✔
293
                        }
1✔
294
                }
295
        }
296

297
        if strings.HasPrefix(strings.ToLower(storageAccountType), "premium") {
11✔
298
                accountKind = string(armstorage.KindBlockBlobStorage)
1✔
299
        }
1✔
300
        if IsAzureStackCloud(d.cloud) {
11✔
301
                accountKind = string(armstorage.KindStorage)
1✔
302
                if storageAccountType != "" && storageAccountType != string(armstorage.SKUNameStandardLRS) && storageAccountType != string(armstorage.SKUNamePremiumLRS) {
2✔
303
                        return nil, status.Errorf(codes.InvalidArgument, "Invalid skuName value: %s, as Azure Stack only supports %s and %s Storage Account types.", storageAccountType, armstorage.SKUNamePremiumLRS, armstorage.SKUNameStandardLRS)
1✔
304
                }
1✔
305
        }
306

307
        tags, err := util.ConvertTagsToMap(customTags, tagValueDelimiter)
9✔
308
        if err != nil {
10✔
309
                return nil, status.Errorf(codes.InvalidArgument, "%v", err)
1✔
310
        }
1✔
311

312
        if strings.TrimSpace(storageEndpointSuffix) == "" {
16✔
313
                storageEndpointSuffix = d.getStorageEndPointSuffix()
8✔
314
        }
8✔
315

316
        if storeAccountKey && !ptr.Deref(allowSharedKeyAccess, true) {
9✔
317
                return nil, status.Errorf(codes.InvalidArgument, "storeAccountKey is not supported for account with shared access key disabled")
1✔
318
        }
1✔
319

320
        requestName := "controller_create_volume"
7✔
321
        if volContentSource != nil {
9✔
322
                switch volContentSource.Type.(type) {
2✔
323
                case *csi.VolumeContentSource_Snapshot:
1✔
324
                        return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
1✔
325
                case *csi.VolumeContentSource_Volume:
1✔
326
                        requestName = "controller_create_volume_from_volume"
1✔
327
                        if volContentSource.GetVolume() != nil {
2✔
328
                                sourceID := volContentSource.GetVolume().VolumeId
1✔
329
                                _, srcAccountName, _, _, _, err = GetContainerInfo(sourceID) //nolint:dogsled
1✔
330
                                if err != nil {
2✔
331
                                        klog.Errorf("failed to get source volume info from sourceID(%s), error: %v", sourceID, err)
1✔
332
                                } else {
1✔
333
                                        klog.V(2).Infof("source volume account name: %s, sourceID: %s", srcAccountName, sourceID)
×
334
                                }
×
335
                        }
336
                }
337
        }
338

339
        accountOptions := &storage.AccountOptions{
6✔
340
                Name:                            account,
6✔
341
                Type:                            storageAccountType,
6✔
342
                Kind:                            accountKind,
6✔
343
                SubscriptionID:                  subsID,
6✔
344
                ResourceGroup:                   resourceGroup,
6✔
345
                Location:                        location,
6✔
346
                EnableHTTPSTrafficOnly:          enableHTTPSTrafficOnly,
6✔
347
                VirtualNetworkResourceIDs:       vnetResourceIDs,
6✔
348
                Tags:                            tags,
6✔
349
                MatchTags:                       matchTags,
6✔
350
                IsHnsEnabled:                    isHnsEnabled,
6✔
351
                EnableNfsV3:                     enableNfsV3,
6✔
352
                AllowBlobPublicAccess:           allowBlobPublicAccess,
6✔
353
                PublicNetworkAccess:             publicNetworkAccess,
6✔
354
                AllowSharedKeyAccess:            allowSharedKeyAccess,
6✔
355
                RequireInfrastructureEncryption: requireInfraEncryption,
6✔
356
                VNetResourceGroup:               vnetResourceGroup,
6✔
357
                VNetName:                        vnetName,
6✔
358
                VNetLinkName:                    vnetLinkName,
6✔
359
                SubnetName:                      subnetName,
6✔
360
                AccessTier:                      accessTier,
6✔
361
                CreatePrivateEndpoint:           createPrivateEndpoint,
6✔
362
                StorageType:                     storage.StorageTypeBlob,
6✔
363
                StorageEndpointSuffix:           storageEndpointSuffix,
6✔
364
                EnableBlobVersioning:            enableBlobVersioning,
6✔
365
                SoftDeleteBlobs:                 softDeleteBlobs,
6✔
366
                SoftDeleteContainers:            softDeleteContainers,
6✔
367
                GetLatestAccountKey:             getLatestAccountKey,
6✔
368
                SourceAccountName:               srcAccountName,
6✔
369
        }
6✔
370

6✔
371
        containerName = replaceWithMap(containerName, containerNameReplaceMap)
6✔
372
        validContainerName := containerName
6✔
373
        if validContainerName == "" {
7✔
374
                validContainerName = volName
1✔
375
                if containerNamePrefix != "" {
1✔
376
                        validContainerName = containerNamePrefix + "-" + volName
×
377
                }
×
378
                validContainerName = getValidContainerName(validContainerName, protocol)
1✔
379
                setKeyValueInMap(parameters, containerNameField, validContainerName)
1✔
380
        }
381

382
        if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
6✔
383
                // logging the job status if it's volume cloning
×
384
                if volContentSource != nil {
×
385
                        jobState, percent, err := d.azcopy.GetAzcopyJob(validContainerName, []string{})
×
386
                        return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err)
×
387
                }
×
388
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
×
389
        }
390
        defer d.volumeLocks.Release(volName)
6✔
391

6✔
392
        var volumeID string
6✔
393
        mc := metrics.NewMetricContext(blobCSIDriverName, requestName, d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
6✔
394
        isOperationSucceeded := false
6✔
395
        defer func() {
12✔
396
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
6✔
397
        }()
6✔
398

399
        var accountKey string
6✔
400
        accountName := account
6✔
401
        if len(secrets) == 0 && accountName == "" {
7✔
402
                if v, ok := d.volMap.Load(volName); ok {
1✔
403
                        accountName = v.(string)
×
404
                } else {
1✔
405
                        lockKey := fmt.Sprintf("%s%s%s%s%s%v", storageAccountType, accountKind, resourceGroup, location, protocol, ptr.Deref(createPrivateEndpoint, false))
1✔
406
                        // search in cache first
1✔
407
                        cache, err := d.accountSearchCache.Get(ctx, lockKey, azcache.CacheReadTypeDefault)
1✔
408
                        if err != nil {
1✔
409
                                return nil, status.Errorf(codes.Internal, "%v", err)
×
410
                        }
×
411
                        if cache != nil {
1✔
412
                                accountName = cache.(string)
×
413
                        } else {
1✔
414
                                d.volLockMap.LockEntry(lockKey)
1✔
415
                                err = wait.ExponentialBackoff(getBackOff(d.cloud.Config), func() (bool, error) {
2✔
416
                                        var retErr error
1✔
417
                                        accountName, accountKey, retErr = d.cloud.EnsureStorageAccount(ctx, accountOptions, protocol)
1✔
418
                                        if isRetriableError(retErr) {
1✔
419
                                                klog.Warningf("EnsureStorageAccount(%s) failed with error(%v), waiting for retrying", account, retErr)
×
420
                                                return false, nil
×
421
                                        }
×
422
                                        return true, retErr
1✔
423
                                })
424
                                d.volLockMap.UnlockEntry(lockKey)
1✔
425
                                if err != nil {
2✔
426
                                        return nil, status.Errorf(codes.Internal, "ensure storage account failed with %v", err)
1✔
427
                                }
1✔
428
                                d.accountSearchCache.Set(lockKey, accountName)
×
429
                                d.volMap.Store(volName, accountName)
×
430
                        }
431
                }
432
        }
433

434
        if ptr.Deref(createPrivateEndpoint, false) && isNFSProtocol(protocol) {
5✔
435
                // As for blobfuse/blobfuse2, serverName, i.e.,AZURE_STORAGE_BLOB_ENDPOINT env variable can't include
×
436
                // "privatelink", issue: https://github.com/Azure/azure-storage-fuse/issues/1014
×
437
                //
×
438
                // And use public endpoint will be befine to blobfuse/blobfuse2, because it will be resolved to private endpoint
×
439
                // by private dns zone, which includes CNAME record, documented here:
×
440
                // https://learn.microsoft.com/en-us/azure/storage/common/storage-private-endpoints?toc=%2Fazure%2Fstorage%2Fblobs%2Ftoc.json&bc=%2Fazure%2Fstorage%2Fblobs%2Fbreadcrumb%2Ftoc.json#dns-changes-for-private-endpoints
×
441
                setKeyValueInMap(parameters, serverNameField, fmt.Sprintf("%s.privatelink.blob.%s", accountName, storageEndpointSuffix))
×
442
        }
×
443

444
        accountOptions.Name = accountName
5✔
445
        if len(secrets) == 0 && useDataPlaneAPI {
6✔
446
                if accountKey == "" {
2✔
447
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
2✔
448
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
449
                        }
1✔
450
                }
451
                secrets = createStorageAccountSecret(accountName, accountKey)
×
452
        }
453

454
        klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
4✔
455
        if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, storageEndpointSuffix, secrets); err != nil {
5✔
456
                return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
1✔
457
        }
1✔
458
        if volContentSource != nil {
4✔
459
                accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace, false)
1✔
460
                if err != nil {
1✔
461
                        return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
×
462
                }
×
463
                copyErr := d.copyVolume(ctx, req, accountName, accountSASToken, authAzcopyEnv, validContainerName, secretNamespace, accountOptions, storageEndpointSuffix)
1✔
464
                if accountSASToken == "" && copyErr != nil && strings.Contains(copyErr.Error(), authorizationPermissionMismatch) {
1✔
465
                        klog.Warningf("azcopy copy failed with AuthorizationPermissionMismatch error, should assign \"Storage Blob Data Contributor\" role to controller identity, fall back to use sas token, original error: %v", copyErr)
×
466
                        accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace, true)
×
467
                        if err != nil {
×
468
                                return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
×
469
                        }
×
470
                        copyErr = d.copyVolume(ctx, req, accountName, accountSASToken, authAzcopyEnv, validContainerName, secretNamespace, accountOptions, storageEndpointSuffix)
×
471
                }
472
                if copyErr != nil {
2✔
473
                        return nil, copyErr
1✔
474
                }
1✔
475
        }
476

477
        if storeAccountKey && len(req.GetSecrets()) == 0 {
4✔
478
                if accountKey == "" {
4✔
479
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
3✔
480
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
481
                        }
1✔
482
                }
483

484
                secretName, err := setAzureCredentials(ctx, d.KubeClient, accountName, accountKey, secretNamespace)
1✔
485
                if err != nil {
1✔
486
                        return nil, status.Errorf(codes.Internal, "failed to store storage account key: %v", err)
×
487
                }
×
488
                if secretName != "" {
1✔
489
                        klog.V(2).Infof("store account key to k8s secret(%v) in %s namespace", secretName, secretNamespace)
×
490
                }
×
491
        }
492

493
        var uuid string
1✔
494
        if containerName != "" {
2✔
495
                // add volume name as suffix to differentiate volumeID since "containerName" is specified
1✔
496
                // not necessary for dynamic container name creation since volumeID already contains volume name
1✔
497
                uuid = volName
1✔
498
        }
1✔
499
        volumeID = fmt.Sprintf(volumeIDTemplate, resourceGroup, accountName, validContainerName, uuid, secretNamespace, subsID)
1✔
500
        klog.V(2).Infof("create container %s on storage account %s successfully", validContainerName, accountName)
1✔
501

1✔
502
        if useDataPlaneAPI {
1✔
503
                d.dataPlaneAPIVolCache.Set(volumeID, "")
×
504
                d.dataPlaneAPIVolCache.Set(accountName, "")
×
505
        }
×
506

507
        isOperationSucceeded = true
1✔
508
        // reset secretNamespace field in VolumeContext
1✔
509
        setKeyValueInMap(parameters, secretNamespaceField, secretNamespace)
1✔
510
        return &csi.CreateVolumeResponse{
1✔
511
                Volume: &csi.Volume{
1✔
512
                        VolumeId:      volumeID,
1✔
513
                        CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
1✔
514
                        VolumeContext: parameters,
1✔
515
                        ContentSource: volContentSource,
1✔
516
                },
1✔
517
        }, nil
1✔
518
}
519

520
// DeleteVolume delete a volume
521
func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
5✔
522
        volumeID := req.GetVolumeId()
5✔
523
        if len(volumeID) == 0 {
6✔
524
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
525
        }
1✔
526

527
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
4✔
528
                return nil, status.Errorf(codes.Internal, "invalid delete volume req: %v", req)
×
529
        }
×
530

531
        if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
4✔
532
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
×
533
        }
×
534
        defer d.volumeLocks.Release(volumeID)
4✔
535

4✔
536
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
4✔
537
        if err != nil {
5✔
538
                // According to CSI Driver Sanity Tester, should succeed when an invalid volume id is used
1✔
539
                klog.Errorf("GetContainerInfo(%s) in DeleteVolume failed with error: %v", volumeID, err)
1✔
540
                return &csi.DeleteVolumeResponse{}, nil
1✔
541
        }
1✔
542

543
        secrets := req.GetSecrets()
3✔
544
        if len(secrets) == 0 && d.useDataPlaneAPI(ctx, volumeID, accountName) {
4✔
545
                _, accountName, accountKey, _, _, err := d.GetAuthEnv(ctx, volumeID, "", nil, secrets)
1✔
546
                if err != nil {
2✔
547
                        return nil, status.Errorf(codes.Internal, "GetAuthEnv(%s) failed with %v", volumeID, err)
1✔
548
                }
1✔
549
                if accountName != "" && accountKey != "" {
×
550
                        secrets = createStorageAccountSecret(accountName, accountKey)
×
551
                }
×
552
        }
553

554
        mc := metrics.NewMetricContext(blobCSIDriverName, "controller_delete_volume", d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
2✔
555
        isOperationSucceeded := false
2✔
556
        defer func() {
4✔
557
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
2✔
558
        }()
2✔
559

560
        if resourceGroupName == "" {
3✔
561
                resourceGroupName = d.cloud.ResourceGroup
1✔
562
        }
1✔
563
        klog.V(2).Infof("deleting container(%s) rg(%s) account(%s) volumeID(%s)", containerName, resourceGroupName, accountName, volumeID)
2✔
564
        if err := d.DeleteBlobContainer(ctx, subsID, resourceGroupName, accountName, containerName, secrets); err != nil {
4✔
565
                return nil, status.Errorf(codes.Internal, "failed to delete container(%s) under rg(%s) account(%s) volumeID(%s), error: %v", containerName, resourceGroupName, accountName, volumeID, err)
2✔
566
        }
2✔
567

568
        isOperationSucceeded = true
×
569
        klog.V(2).Infof("container(%s) under rg(%s) account(%s) volumeID(%s) is deleted successfully", containerName, resourceGroupName, accountName, volumeID)
×
570
        return &csi.DeleteVolumeResponse{}, nil
×
571
}
572

573
// ValidateVolumeCapabilities return the capabilities of the volume
574
func (d *Driver) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
8✔
575
        volumeID := req.GetVolumeId()
8✔
576
        if len(volumeID) == 0 {
9✔
577
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
578
        }
1✔
579
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
9✔
580
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
581
        }
2✔
582

583
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
5✔
584
        if err != nil {
6✔
585
                klog.Errorf("GetContainerInfo(%s) in ValidateVolumeCapabilities failed with error: %v", volumeID, err)
1✔
586
                return nil, status.Error(codes.NotFound, err.Error())
1✔
587
        }
1✔
588

589
        var exist bool
4✔
590
        secrets := req.GetSecrets()
4✔
591
        if len(secrets) > 0 {
5✔
592
                container, err := getContainerReference(containerName, secrets, d.getStorageEndPointSuffix())
1✔
593
                if err != nil {
2✔
594
                        return nil, status.Error(codes.Internal, err.Error())
1✔
595
                }
1✔
596
                exist, err = container.Exists()
×
597
                if err != nil {
×
598
                        return nil, status.Error(codes.Internal, err.Error())
×
599
                }
×
600
        } else {
3✔
601
                if resourceGroupName == "" {
3✔
602
                        resourceGroupName = d.cloud.ResourceGroup
×
603
                }
×
604
                blobClient, err := d.clientFactory.GetBlobContainerClientForSub(subsID)
3✔
605
                if err != nil {
3✔
606
                        return nil, status.Error(codes.Internal, err.Error())
×
607
                }
×
608

609
                blobContainer, err := blobClient.Get(ctx, resourceGroupName, accountName, containerName)
3✔
610
                if err != nil {
4✔
611
                        return nil, status.Error(codes.Internal, err.Error())
1✔
612
                }
1✔
613
                if blobContainer.ContainerProperties == nil {
3✔
614
                        return nil, status.Errorf(codes.Internal, "ContainerProperties of volume(%s) is nil", volumeID)
1✔
615
                }
1✔
616
                exist = blobContainer.ContainerProperties.Deleted != nil && !*blobContainer.ContainerProperties.Deleted
1✔
617
        }
618
        if !exist {
2✔
619
                return nil, status.Errorf(codes.NotFound, "requested volume(%s) does not exist", volumeID)
1✔
620
        }
1✔
621
        klog.V(2).Infof("ValidateVolumeCapabilities on volume(%s) succeeded", volumeID)
×
622

×
623
        // blob driver supports all AccessModes, no need to check capabilities here
×
624
        return &csi.ValidateVolumeCapabilitiesResponse{
×
625
                Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
×
626
                        VolumeCapabilities: req.GetVolumeCapabilities(),
×
627
                },
×
628
                Message: "",
×
629
        }, nil
×
630
}
631

632
// ControllerModifyVolume modify volume
633
func (d *Driver) ControllerModifyVolume(_ context.Context, _ *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) {
×
634
        return nil, status.Error(codes.Unimplemented, "")
×
635
}
×
636

637
func (d *Driver) ControllerPublishVolume(_ context.Context, _ *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
1✔
638
        return nil, status.Error(codes.Unimplemented, "ControllerPublishVolume is not yet implemented")
1✔
639
}
1✔
640

641
func (d *Driver) ControllerUnpublishVolume(_ context.Context, _ *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
1✔
642
        return nil, status.Error(codes.Unimplemented, "ControllerUnpublishVolume is not yet implemented")
1✔
643
}
1✔
644

645
// ControllerGetVolume get volume
646
func (d *Driver) ControllerGetVolume(context.Context, *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
1✔
647
        return nil, status.Error(codes.Unimplemented, "ControllerGetVolume is not yet implemented")
1✔
648
}
1✔
649

650
// GetCapacity returns the capacity of the total available storage pool
651
func (d *Driver) GetCapacity(_ context.Context, _ *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
1✔
652
        return nil, status.Error(codes.Unimplemented, "GetCapacity is not yet implemented")
1✔
653
}
1✔
654

655
// ListVolumes return all available volumes
656
func (d *Driver) ListVolumes(_ context.Context, _ *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
1✔
657
        return nil, status.Error(codes.Unimplemented, "ListVolumes is not yet implemented")
1✔
658
}
1✔
659

660
// CreateSnapshot create snapshot
661
func (d *Driver) CreateSnapshot(_ context.Context, _ *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
1✔
662
        return nil, status.Error(codes.Unimplemented, "CreateSnapshot is not yet implemented")
1✔
663
}
1✔
664

665
// DeleteSnapshot delete snapshot
666
func (d *Driver) DeleteSnapshot(_ context.Context, _ *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
1✔
667
        return nil, status.Error(codes.Unimplemented, "DeleteSnapshot is not yet implemented")
1✔
668
}
1✔
669

670
// ListSnapshots list snapshots
671
func (d *Driver) ListSnapshots(_ context.Context, _ *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
1✔
672
        return nil, status.Error(codes.Unimplemented, "ListSnapshots is not yet implemented")
1✔
673
}
1✔
674

675
// ControllerGetCapabilities returns the capabilities of the Controller plugin
676
func (d *Driver) ControllerGetCapabilities(_ context.Context, _ *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
1✔
677
        return &csi.ControllerGetCapabilitiesResponse{
1✔
678
                Capabilities: d.Cap,
1✔
679
        }, nil
1✔
680
}
1✔
681

682
// ControllerExpandVolume controller expand volume
683
func (d *Driver) ControllerExpandVolume(_ context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
4✔
684
        if len(req.GetVolumeId()) == 0 {
5✔
685
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
686
        }
1✔
687

688
        if req.GetCapacityRange() == nil {
4✔
689
                return nil, status.Error(codes.InvalidArgument, "Capacity Range missing in request")
1✔
690
        }
1✔
691

692
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_EXPAND_VOLUME); err != nil {
2✔
693
                return nil, status.Errorf(codes.Internal, "invalid expand volume req: %v", req)
×
694
        }
×
695

696
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
2✔
697
        requestGiB := int64(util.RoundUpGiB(volSizeBytes))
2✔
698

2✔
699
        if volSizeBytes > containerMaxSize {
3✔
700
                return nil, status.Errorf(codes.OutOfRange, "required bytes (%d) exceeds the maximum supported bytes (%d)", volSizeBytes, containerMaxSize)
1✔
701
        }
1✔
702

703
        klog.V(2).Infof("ControllerExpandVolume(%s) successfully, currentQuota: %d Gi", req.VolumeId, requestGiB)
1✔
704

1✔
705
        return &csi.ControllerExpandVolumeResponse{CapacityBytes: req.GetCapacityRange().GetRequiredBytes()}, nil
1✔
706
}
707

708
// CreateBlobContainer creates a blob container
709
func (d *Driver) CreateBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName, storageEndpointSuffix string, secrets map[string]string) error {
10✔
710
        if containerName == "" {
11✔
711
                return fmt.Errorf("containerName is empty")
1✔
712
        }
1✔
713
        return wait.ExponentialBackoff(getBackOff(d.cloud.Config), func() (bool, error) {
18✔
714
                var err error
9✔
715
                if len(secrets) > 0 {
10✔
716
                        container, getErr := getContainerReference(containerName, secrets, storageEndpointSuffix)
1✔
717
                        if getErr != nil {
2✔
718
                                return true, getErr
1✔
719
                        }
1✔
720
                        container.Metadata = map[string]string{createdByMetadata: d.Name}
×
721
                        _, err = container.CreateIfNotExists(&azstorage.CreateContainerOptions{Access: azstorage.ContainerAccessTypePrivate})
×
722
                } else {
8✔
723
                        blobContainer := armstorage.BlobContainer{
8✔
724
                                ContainerProperties: &armstorage.ContainerProperties{
8✔
725
                                        PublicAccess: to.Ptr(armstorage.PublicAccessNone),
8✔
726
                                        Metadata:     map[string]*string{createdByMetadata: to.Ptr(d.Name)},
8✔
727
                                },
8✔
728
                        }
8✔
729
                        var blobClient blobcontainerclient.Interface
8✔
730
                        blobClient, err = d.clientFactory.GetBlobContainerClientForSub(subsID)
8✔
731
                        if err != nil {
8✔
732
                                return true, err
×
733
                        }
×
734
                        _, err = blobClient.CreateContainer(ctx, resourceGroupName, accountName, containerName, blobContainer)
8✔
735
                }
736
                if err != nil {
12✔
737
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
4✔
738
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) {
6✔
739
                                klog.Warningf("CreateContainer(%s, %s, %s) failed with error(%v), retry", resourceGroupName, accountName, containerName, err)
2✔
740
                                return false, nil
2✔
741
                        }
2✔
742
                }
743
                return true, err
6✔
744
        })
745
}
746

747
// DeleteBlobContainer deletes a blob container
748
func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
8✔
749
        if containerName == "" {
9✔
750
                return fmt.Errorf("containerName is empty")
1✔
751
        }
1✔
752
        return wait.ExponentialBackoff(getBackOff(d.cloud.Config), func() (bool, error) {
14✔
753
                var err error
7✔
754
                if len(secrets) > 0 {
10✔
755
                        container, getErr := getContainerReference(containerName, secrets, d.getStorageEndPointSuffix())
3✔
756
                        if getErr != nil {
6✔
757
                                return true, getErr
3✔
758
                        }
3✔
759
                        _, err = container.DeleteIfExists(nil)
×
760
                } else {
4✔
761
                        var blobClient blobcontainerclient.Interface
4✔
762
                        blobClient, err = d.clientFactory.GetBlobContainerClientForSub(subsID)
4✔
763
                        if err != nil {
4✔
764
                                return true, err
×
765
                        }
×
766
                        err = blobClient.DeleteContainer(ctx, resourceGroupName, accountName, containerName)
4✔
767
                }
768
                if err != nil {
7✔
769
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
3✔
770
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) ||
3✔
771
                                strings.Contains(err.Error(), statusCodeNotFound) ||
3✔
772
                                strings.Contains(err.Error(), httpCodeNotFound) {
5✔
773
                                klog.Warningf("delete container(%s) on account(%s) failed with error(%v), return as success", containerName, accountName, err)
2✔
774
                                return true, nil
2✔
775
                        }
2✔
776
                        return false, fmt.Errorf("failed to delete container(%s) on account(%s), error: %w", containerName, accountName, err)
1✔
777
                }
778
                return true, err
1✔
779
        })
780
}
781

782
// copyBlobContainer copies source volume content into a destination volume
783
func (d *Driver) copyBlobContainer(ctx context.Context, req *csi.CreateVolumeRequest, dstAccountName string, dstAccountSasToken string, authAzcopyEnv []string, dstContainerName string, secretNamespace string, accountOptions *storage.AccountOptions, storageEndpointSuffix string) error {
7✔
784
        var sourceVolumeID string
7✔
785
        if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
14✔
786
                sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
7✔
787

7✔
788
        }
7✔
789
        srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
7✔
790
        if err != nil {
9✔
791
                return status.Error(codes.NotFound, err.Error())
2✔
792
        }
2✔
793
        if dstAccountName == "" {
9✔
794
                dstAccountName = srcAccountName
4✔
795
        }
4✔
796
        if srcAccountName == "" || srcContainerName == "" || dstContainerName == "" {
7✔
797
                return fmt.Errorf("srcAccountName(%s) or srcContainerName(%s) or dstContainerName(%s) is empty", srcAccountName, srcContainerName, dstContainerName)
2✔
798
        }
2✔
799
        srcAccountSasToken := dstAccountSasToken
3✔
800
        if srcAccountName != dstAccountName && dstAccountSasToken != "" {
4✔
801
                srcAccountOptions := &storage.AccountOptions{
1✔
802
                        Name:                srcAccountName,
1✔
803
                        ResourceGroup:       srcResourceGroupName,
1✔
804
                        SubscriptionID:      srcSubscriptionID,
1✔
805
                        GetLatestAccountKey: accountOptions.GetLatestAccountKey,
1✔
806
                }
1✔
807
                if srcAccountSasToken, _, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, nil, "", secretNamespace, true); err != nil {
2✔
808
                        return err
1✔
809
                }
1✔
810
        }
811
        srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", srcAccountName, storageEndpointSuffix, srcContainerName, srcAccountSasToken)
2✔
812
        dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", dstAccountName, storageEndpointSuffix, dstContainerName, dstAccountSasToken)
2✔
813

2✔
814
        jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
2✔
815
        klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
2✔
816
        switch jobState {
2✔
817
        case util.AzcopyJobError, util.AzcopyJobCompleted, util.AzcopyJobCompletedWithErrors, util.AzcopyJobCompletedWithSkipped, util.AzcopyJobCompletedWithErrorsAndSkipped:
1✔
818
                return err
1✔
819
        case util.AzcopyJobRunning:
1✔
820
                err = wait.PollUntilContextTimeout(ctx, 20*time.Second, time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, true, func(context.Context) (bool, error) {
5✔
821
                        jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
4✔
822
                        klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
4✔
823
                        if err != nil {
4✔
824
                                return false, err
×
825
                        }
×
826
                        if jobState == util.AzcopyJobRunning {
8✔
827
                                return false, nil
4✔
828
                        }
4✔
829
                        return true, nil
×
830
                })
831
        case util.AzcopyJobNotFound:
×
832
                klog.V(2).Infof("copy blob container %s:%s to %s:%s", srcAccountName, srcContainerName, dstAccountName, dstContainerName)
×
833
                execFunc := func() error {
×
834
                        if out, err := d.execAzcopyCopy(srcPath, dstPath, azcopyCloneVolumeOptions, authAzcopyEnv); err != nil {
×
835
                                return fmt.Errorf("exec error: %v, output: %v", err, string(out))
×
836
                        }
×
837
                        return nil
×
838
                }
839
                timeoutFunc := func() error {
×
840
                        _, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
×
841
                        return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcContainerName, dstContainerName, percent)
×
842
                }
×
843
                err = util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
×
844
        }
845
        if err != nil {
2✔
846
                klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstContainerName, err)
1✔
847
        } else {
1✔
848
                klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
×
849
                if out, err := d.azcopy.CleanJobs(); err != nil {
×
850
                        klog.Warningf("clean azcopy jobs failed with error: %v, output: %s", err, string(out))
×
851
                }
×
852
        }
853
        return err
1✔
854
}
855

856
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
857
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountName string, accountSASToken string, authAzcopyEnv []string, dstContainerName, secretNamespace string, accountOptions *storage.AccountOptions, storageEndpointSuffix string) error {
8✔
858
        vs := req.VolumeContentSource
8✔
859
        switch vs.Type.(type) {
8✔
860
        case *csi.VolumeContentSource_Snapshot:
1✔
861
                return status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
1✔
862
        case *csi.VolumeContentSource_Volume:
7✔
863
                return d.copyBlobContainer(ctx, req, accountName, accountSASToken, authAzcopyEnv, dstContainerName, secretNamespace, accountOptions, storageEndpointSuffix)
7✔
864
        default:
×
865
                return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
×
866
        }
867
}
868

869
// execAzcopyCopy exec azcopy copy command
870
func (d *Driver) execAzcopyCopy(srcPath, dstPath string, azcopyCopyOptions, authAzcopyEnv []string) ([]byte, error) {
×
871
        cmd := exec.Command("azcopy", "copy", srcPath, dstPath)
×
872
        cmd.Args = append(cmd.Args, azcopyCopyOptions...)
×
873
        if len(authAzcopyEnv) > 0 {
×
874
                cmd.Env = append(os.Environ(), authAzcopyEnv...)
×
875
        }
×
876
        return cmd.CombinedOutput()
×
877
}
878

879
// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
880
func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
6✔
881
        azureAuthConfig := d.cloud.Config.AzureAuthConfig
6✔
882
        azureClientConfig := d.cloud.Config.AzureClientConfig
6✔
883
        var authAzcopyEnv []string
6✔
884
        if azureAuthConfig.UseManagedIdentityExtension {
9✔
885
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, MSI))
3✔
886
                if len(azureAuthConfig.UserAssignedIdentityID) > 0 {
4✔
887
                        klog.V(2).Infof("use user assigned managed identity to authorize azcopy")
1✔
888
                        authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyMSIClientID, azureAuthConfig.UserAssignedIdentityID))
1✔
889
                } else {
3✔
890
                        klog.V(2).Infof("use system-assigned managed identity to authorize azcopy")
2✔
891
                }
2✔
892
                return authAzcopyEnv, nil
3✔
893
        }
894
        if len(azureAuthConfig.AADClientSecret) > 0 {
5✔
895
                klog.V(2).Infof("use service principal to authorize azcopy")
2✔
896
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, SPN))
2✔
897
                if azureAuthConfig.AADClientID == "" || azureClientConfig.TenantID == "" {
3✔
898
                        return []string{}, fmt.Errorf("AADClientID and TenantID must be set when use service principal")
1✔
899
                }
1✔
900
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAApplicationID, azureAuthConfig.AADClientID))
1✔
901
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAClientSecret, azureAuthConfig.AADClientSecret))
1✔
902
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyTenantID, azureClientConfig.TenantID))
1✔
903
                klog.V(2).Infof("set AZCOPY_SPA_APPLICATION_ID=%s, AZCOPY_TENANT_ID=%s successfully", azureAuthConfig.AADClientID, azureClientConfig.TenantID)
1✔
904

1✔
905
                return authAzcopyEnv, nil
1✔
906
        }
907
        return []string{}, fmt.Errorf("service principle or managed identity are both not set")
1✔
908
}
909

910
// getAzcopyAuth will only generate sas token for azcopy in following conditions:
911
// 1. secrets is not empty
912
// 2. driver is not using managed identity and service principal
913
// 3. parameter useSasToken is true
914
func (d *Driver) getAzcopyAuth(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, accountOptions *storage.AccountOptions, secrets map[string]string, secretName, secretNamespace string, useSasToken bool) (string, []string, error) {
6✔
915
        var authAzcopyEnv []string
6✔
916
        var err error
6✔
917
        if !useSasToken && !d.useDataPlaneAPI(ctx, "", accountName) && len(secrets) == 0 && len(secretName) == 0 {
7✔
918
                // search in cache first
1✔
919
                if cache, err := d.azcopySasTokenCache.Get(ctx, accountName, azcache.CacheReadTypeDefault); err == nil && cache != nil {
1✔
920
                        klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
921
                        return cache.(string), nil, nil
×
922
                }
×
923

924
                authAzcopyEnv, err = d.authorizeAzcopyWithIdentity()
1✔
925
                if err != nil {
1✔
926
                        klog.Warningf("failed to authorize azcopy with identity, error: %v", err)
×
927
                }
×
928
        }
929

930
        if len(secrets) > 0 || len(secretName) > 0 || len(authAzcopyEnv) == 0 || useSasToken {
11✔
931
                if accountKey == "" {
10✔
932
                        if _, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
7✔
933
                                return "", nil, err
2✔
934
                        }
2✔
935
                }
936
                klog.V(2).Infof("generate sas token for account(%s)", accountName)
3✔
937
                sasToken, err := d.generateSASToken(ctx, accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
3✔
938
                return sasToken, nil, err
3✔
939
        }
940
        return "", authAzcopyEnv, nil
1✔
941
}
942

943
// isValidVolumeCapabilities validates the given VolumeCapability array is valid
944
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
31✔
945
        if len(volCaps) == 0 {
33✔
946
                return fmt.Errorf("volume capabilities missing in request")
2✔
947
        }
2✔
948
        for _, c := range volCaps {
58✔
949
                if c.GetBlock() != nil {
31✔
950
                        return fmt.Errorf("block volume capability not supported")
2✔
951
                }
2✔
952
        }
953
        return nil
27✔
954
}
955

956
func parseDays(dayStr string) (int32, error) {
3✔
957
        days, err := strconv.Atoi(dayStr)
3✔
958
        if err != nil {
4✔
959
                return 0, status.Errorf(codes.InvalidArgument, "invalid %s:%s in storage class", softDeleteBlobsField, dayStr)
1✔
960
        }
1✔
961
        if days <= 0 || days > 365 {
3✔
962
                return 0, status.Errorf(codes.InvalidArgument, "invalid %s:%s in storage class, should be in range [1, 365]", softDeleteBlobsField, dayStr)
1✔
963
        }
1✔
964

965
        return int32(days), nil
1✔
966
}
967

968
// generateSASToken generate a sas token for storage account
969
func (d *Driver) generateSASToken(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, expiryTime int) (string, error) {
5✔
970
        // search in cache first
5✔
971
        cache, err := d.azcopySasTokenCache.Get(ctx, accountName, azcache.CacheReadTypeDefault)
5✔
972
        if err != nil {
5✔
973
                return "", fmt.Errorf("get(%s) from azcopySasTokenCache failed with error: %v", accountName, err)
×
974
        }
×
975
        if cache != nil {
5✔
976
                klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
977
                return cache.(string), nil
×
978
        }
×
979

980
        credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
5✔
981
        if err != nil {
8✔
982
                return "", status.Errorf(codes.Internal, "failed to generate sas token in creating new shared key credential, accountName: %s, err: %v", accountName, err)
3✔
983
        }
3✔
984
        clientOptions := service.ClientOptions{}
2✔
985
        clientOptions.InsecureAllowCredentialWithHTTP = true
2✔
986
        serviceClient, err := service.NewClientWithSharedKeyCredential(fmt.Sprintf("https://%s.blob.%s/", accountName, storageEndpointSuffix), credential, &clientOptions)
2✔
987
        if err != nil {
2✔
988
                return "", status.Errorf(codes.Internal, "failed to generate sas token in creating new client with shared key credential, accountName: %s, err: %v", accountName, err)
×
989
        }
×
990
        sasURL, err := serviceClient.GetSASURL(
2✔
991
                sas.AccountResourceTypes{Object: true, Service: false, Container: true},
2✔
992
                sas.AccountPermissions{Read: true, List: true, Write: true},
2✔
993
                time.Now().Add(time.Duration(expiryTime)*time.Minute),
2✔
994
                &service.GetSASURLOptions{StartTime: to.Ptr(time.Now())},
2✔
995
        )
2✔
996
        if err != nil {
2✔
997
                return "", err
×
998
        }
×
999
        u, err := url.Parse(sasURL)
2✔
1000
        if err != nil {
2✔
1001
                return "", err
×
1002
        }
×
1003
        sasToken := "?" + u.RawQuery
2✔
1004
        d.azcopySasTokenCache.Set(accountName, sasToken)
2✔
1005
        return sasToken, nil
2✔
1006
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc