• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 12561927214

31 Dec 2024 04:44PM UTC coverage: 74.335%. Remained the same
12561927214

Pull #1779

github

web-flow
chore(deps): bump github.com/onsi/ginkgo/v2 from 2.22.1 to 2.22.2

Bumps [github.com/onsi/ginkgo/v2](https://github.com/onsi/ginkgo) from 2.22.1 to 2.22.2.
- [Release notes](https://github.com/onsi/ginkgo/releases)
- [Changelog](https://github.com/onsi/ginkgo/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/ginkgo/compare/v2.22.1...v2.22.2)

---
updated-dependencies:
- dependency-name: github.com/onsi/ginkgo/v2
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #1779: chore(deps): bump github.com/onsi/ginkgo/v2 from 2.22.1 to 2.22.2

2291 of 3082 relevant lines covered (74.33%)

7.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.22
/pkg/blob/controllerserver.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package blob
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "os"
24
        "os/exec"
25
        "strconv"
26
        "strings"
27
        "time"
28

29
        "google.golang.org/grpc/codes"
30
        "google.golang.org/grpc/status"
31

32
        "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
33
        "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage"
34
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
35
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
36
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
37
        azstorage "github.com/Azure/azure-sdk-for-go/storage"
38
        "github.com/container-storage-interface/spec/lib/go/csi"
39

40
        "k8s.io/apimachinery/pkg/util/wait"
41
        "k8s.io/klog/v2"
42
        "k8s.io/utils/ptr"
43

44
        "sigs.k8s.io/blob-csi-driver/pkg/util"
45
        "sigs.k8s.io/cloud-provider-azure/pkg/azclient/blobcontainerclient"
46
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
47
        "sigs.k8s.io/cloud-provider-azure/pkg/metrics"
48
        "sigs.k8s.io/cloud-provider-azure/pkg/provider"
49
        azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
50
)
51

52
const (
53
        privateEndpoint = "privateendpoint"
54

55
        azcopyAutoLoginType             = "AZCOPY_AUTO_LOGIN_TYPE"
56
        azcopySPAApplicationID          = "AZCOPY_SPA_APPLICATION_ID"
57
        azcopySPAClientSecret           = "AZCOPY_SPA_CLIENT_SECRET"
58
        azcopyTenantID                  = "AZCOPY_TENANT_ID"
59
        azcopyMSIClientID               = "AZCOPY_MSI_CLIENT_ID"
60
        MSI                             = "MSI"
61
        SPN                             = "SPN"
62
        authorizationPermissionMismatch = "AuthorizationPermissionMismatch"
63

64
        createdByMetadata = "createdBy"
65
)
66

67
// CreateVolume provisions a volume
68
func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
24✔
69
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
24✔
70
                klog.Errorf("invalid create volume req: %v", req)
×
71
                return nil, err
×
72
        }
×
73

74
        volName := req.GetName()
24✔
75
        if len(volName) == 0 {
25✔
76
                return nil, status.Error(codes.InvalidArgument, "CreateVolume Name must be provided")
1✔
77
        }
1✔
78

79
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
25✔
80
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
81
        }
2✔
82

83
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
21✔
84
        requestGiB := int(util.RoundUpGiB(volSizeBytes))
21✔
85

21✔
86
        volContentSource := req.GetVolumeContentSource()
21✔
87
        secrets := req.GetSecrets()
21✔
88

21✔
89
        parameters := req.GetParameters()
21✔
90
        if parameters == nil {
22✔
91
                parameters = make(map[string]string)
1✔
92
        }
1✔
93
        var storageAccountType, subsID, resourceGroup, location, account, containerName, containerNamePrefix, protocol, customTags, secretName, secretNamespace, pvcNamespace, tagValueDelimiter string
21✔
94
        var isHnsEnabled, requireInfraEncryption, enableBlobVersioning, createPrivateEndpoint, enableNfsV3, allowSharedKeyAccess *bool
21✔
95
        var vnetResourceGroup, vnetName, subnetName, accessTier, networkEndpointType, storageEndpointSuffix, fsGroupChangePolicy, srcAccountName string
21✔
96
        var matchTags, useDataPlaneAPI, getLatestAccountKey bool
21✔
97
        var softDeleteBlobs, softDeleteContainers int32
21✔
98
        var vnetResourceIDs []string
21✔
99
        var err error
21✔
100
        // set allowBlobPublicAccess as false by default
21✔
101
        allowBlobPublicAccess := ptr.To(false)
21✔
102

21✔
103
        containerNameReplaceMap := map[string]string{}
21✔
104

21✔
105
        // store account key to k8s secret by default
21✔
106
        storeAccountKey := true
21✔
107

21✔
108
        // Apply ProvisionerParameters (case-insensitive). We leave validation of
21✔
109
        // the values to the cloud provider.
21✔
110
        for k, v := range parameters {
122✔
111
                switch strings.ToLower(k) {
101✔
112
                case skuNameField:
9✔
113
                        storageAccountType = v
9✔
114
                case storageAccountTypeField:
10✔
115
                        storageAccountType = v
10✔
116
                case locationField:
9✔
117
                        location = v
9✔
118
                case storageAccountField:
10✔
119
                        account = v
10✔
120
                case subscriptionIDField:
×
121
                        subsID = v
×
122
                case resourceGroupField:
10✔
123
                        resourceGroup = v
10✔
124
                case containerNameField:
11✔
125
                        containerName = v
11✔
126
                case containerNamePrefixField:
2✔
127
                        containerNamePrefix = v
2✔
128
                case protocolField:
10✔
129
                        protocol = v
10✔
130
                case tagsField:
1✔
131
                        customTags = v
1✔
132
                case matchTagsField:
1✔
133
                        matchTags = strings.EqualFold(v, trueValue)
1✔
134
                case secretNameField:
×
135
                        secretName = v
×
136
                case secretNamespaceField:
×
137
                        secretNamespace = v
×
138
                case isHnsEnabledField:
×
139
                        if strings.EqualFold(v, trueValue) {
×
140
                                isHnsEnabled = ptr.To(true)
×
141
                        }
×
142
                case softDeleteBlobsField:
×
143
                        days, err := parseDays(v)
×
144
                        if err != nil {
×
145
                                return nil, err
×
146
                        }
×
147
                        softDeleteBlobs = days
×
148
                case softDeleteContainersField:
×
149
                        days, err := parseDays(v)
×
150
                        if err != nil {
×
151
                                return nil, err
×
152
                        }
×
153
                        softDeleteContainers = days
×
154
                case enableBlobVersioningField:
×
155
                        enableBlobVersioning = ptr.To(strings.EqualFold(v, trueValue))
×
156
                case storeAccountKeyField:
3✔
157
                        if strings.EqualFold(v, falseValue) {
5✔
158
                                storeAccountKey = false
2✔
159
                        }
2✔
160
                case getLatestAccountKeyField:
1✔
161
                        if getLatestAccountKey, err = strconv.ParseBool(v); err != nil {
2✔
162
                                return nil, status.Errorf(codes.InvalidArgument, "invalid %s: %s in volume context", getLatestAccountKeyField, v)
1✔
163
                        }
1✔
164
                case allowBlobPublicAccessField:
×
165
                        if strings.EqualFold(v, trueValue) {
×
166
                                allowBlobPublicAccess = ptr.To(true)
×
167
                        }
×
168
                case allowSharedKeyAccessField:
2✔
169
                        var boolValue bool
2✔
170
                        if boolValue, err = strconv.ParseBool(v); err != nil {
3✔
171
                                return nil, status.Errorf(codes.InvalidArgument, "invalid %s: %s in volume context", allowSharedKeyAccessField, v)
1✔
172
                        }
1✔
173
                        allowSharedKeyAccess = ptr.To(boolValue)
1✔
174
                case requireInfraEncryptionField:
×
175
                        if strings.EqualFold(v, trueValue) {
×
176
                                requireInfraEncryption = ptr.To(true)
×
177
                        }
×
178
                case pvcNamespaceKey:
×
179
                        pvcNamespace = v
×
180
                        containerNameReplaceMap[pvcNamespaceMetadata] = v
×
181
                case pvcNameKey:
×
182
                        containerNameReplaceMap[pvcNameMetadata] = v
×
183
                case pvNameKey:
×
184
                        containerNameReplaceMap[pvNameMetadata] = v
×
185
                case serverNameField:
×
186
                case storageAuthTypeField:
1✔
187
                case storageIdentityClientIDField:
1✔
188
                case storageIdentityObjectIDField:
1✔
189
                case storageIdentityResourceIDField:
1✔
190
                case msiEndpointField:
1✔
191
                case storageAADEndpointField:
1✔
192
                        // no op, only used in NodeStageVolume
193
                case storageEndpointSuffixField:
×
194
                        storageEndpointSuffix = v
×
195
                case vnetResourceGroupField:
×
196
                        vnetResourceGroup = v
×
197
                case vnetNameField:
×
198
                        vnetName = v
×
199
                case subnetNameField:
1✔
200
                        subnetName = v
1✔
201
                case accessTierField:
×
202
                        accessTier = v
×
203
                case networkEndpointTypeField:
1✔
204
                        networkEndpointType = v
1✔
205
                case mountPermissionsField:
11✔
206
                        // only do validations here, used in NodeStageVolume, NodePublishVolume
11✔
207
                        if v != "" {
22✔
208
                                if _, err := strconv.ParseUint(v, 8, 32); err != nil {
12✔
209
                                        return nil, status.Errorf(codes.InvalidArgument, "invalid mountPermissions %s in storage class", v)
1✔
210
                                }
1✔
211
                        }
212
                case useDataPlaneAPIField:
1✔
213
                        useDataPlaneAPI = strings.EqualFold(v, trueValue)
1✔
214
                case fsGroupChangePolicyField:
1✔
215
                        fsGroupChangePolicy = v
1✔
216
                case tagValueDelimiterField:
×
217
                        tagValueDelimiter = v
×
218
                default:
1✔
219
                        return nil, status.Errorf(codes.InvalidArgument, "invalid parameter %q in storage class", k)
1✔
220
                }
221
        }
222

223
        if ptr.Deref(enableBlobVersioning, false) {
17✔
224
                if isNFSProtocol(protocol) || ptr.Deref(isHnsEnabled, false) {
×
225
                        return nil, status.Errorf(codes.InvalidArgument, "enableBlobVersioning is not supported for NFS protocol or HNS enabled account")
×
226
                }
×
227
        }
228

229
        if !isSupportedFSGroupChangePolicy(fsGroupChangePolicy) {
18✔
230
                return nil, status.Errorf(codes.InvalidArgument, "fsGroupChangePolicy(%s) is not supported, supported fsGroupChangePolicy list: %v", fsGroupChangePolicy, supportedFSGroupChangePolicyList)
1✔
231
        }
1✔
232

233
        if matchTags && account != "" {
17✔
234
                return nil, status.Errorf(codes.InvalidArgument, "matchTags must set as false when storageAccount(%s) is provided", account)
1✔
235
        }
1✔
236

237
        if resourceGroup == "" {
21✔
238
                resourceGroup = d.cloud.ResourceGroup
6✔
239
        }
6✔
240

241
        if secretNamespace == "" {
30✔
242
                if pvcNamespace == "" {
30✔
243
                        secretNamespace = defaultNamespace
15✔
244
                } else {
15✔
245
                        secretNamespace = pvcNamespace
×
246
                }
×
247
        }
248

249
        if protocol == "" {
20✔
250
                protocol = Fuse
5✔
251
        }
5✔
252
        if !isSupportedProtocol(protocol) {
16✔
253
                return nil, status.Errorf(codes.InvalidArgument, "protocol(%s) is not supported, supported protocol list: %v", protocol, supportedProtocolList)
1✔
254
        }
1✔
255
        if !isSupportedAccessTier(accessTier) {
14✔
256
                return nil, status.Errorf(codes.InvalidArgument, "accessTier(%s) is not supported, supported AccessTier list: %v", accessTier, armstorage.PossibleAccessTierValues())
×
257
        }
×
258

259
        if containerName != "" && containerNamePrefix != "" {
15✔
260
                return nil, status.Errorf(codes.InvalidArgument, "containerName(%s) and containerNamePrefix(%s) could not be specified together", containerName, containerNamePrefix)
1✔
261
        }
1✔
262
        if !isSupportedContainerNamePrefix(containerNamePrefix) {
14✔
263
                return nil, status.Errorf(codes.InvalidArgument, "containerNamePrefix(%s) can only contain lowercase letters, numbers, hyphens, and length should be less than 21", containerNamePrefix)
1✔
264
        }
1✔
265

266
        enableHTTPSTrafficOnly := true
12✔
267
        if strings.EqualFold(networkEndpointType, privateEndpoint) {
13✔
268
                if strings.Contains(subnetName, ",") {
2✔
269
                        return nil, status.Errorf(codes.InvalidArgument, "subnetName(%s) can only contain one subnet for private endpoint", subnetName)
1✔
270
                }
1✔
271
                createPrivateEndpoint = ptr.To(true)
×
272
        }
273
        accountKind := string(armstorage.KindStorageV2)
11✔
274
        if isNFSProtocol(protocol) {
12✔
275
                isHnsEnabled = ptr.To(true)
1✔
276
                enableNfsV3 = ptr.To(true)
1✔
277
                // NFS protocol does not need account key
1✔
278
                storeAccountKey = false
1✔
279
                if !ptr.Deref(createPrivateEndpoint, false) {
2✔
280
                        // set VirtualNetworkResourceIDs for storage account firewall setting
1✔
281
                        var err error
1✔
282
                        if vnetResourceIDs, err = d.updateSubnetServiceEndpoints(ctx, vnetResourceGroup, vnetName, subnetName); err != nil {
2✔
283
                                return nil, status.Errorf(codes.Internal, "update service endpoints failed with error: %v", err)
1✔
284
                        }
1✔
285
                }
286
        }
287

288
        if strings.HasPrefix(strings.ToLower(storageAccountType), "premium") {
11✔
289
                accountKind = string(armstorage.KindBlockBlobStorage)
1✔
290
        }
1✔
291
        if IsAzureStackCloud(d.cloud) {
11✔
292
                accountKind = string(armstorage.KindStorage)
1✔
293
                if storageAccountType != "" && storageAccountType != string(armstorage.SKUNameStandardLRS) && storageAccountType != string(armstorage.SKUNamePremiumLRS) {
2✔
294
                        return nil, status.Errorf(codes.InvalidArgument, "Invalid skuName value: %s, as Azure Stack only supports %s and %s Storage Account types.", storageAccountType, armstorage.SKUNamePremiumLRS, armstorage.SKUNameStandardLRS)
1✔
295
                }
1✔
296
        }
297

298
        tags, err := util.ConvertTagsToMap(customTags, tagValueDelimiter)
9✔
299
        if err != nil {
10✔
300
                return nil, status.Errorf(codes.InvalidArgument, "%v", err)
1✔
301
        }
1✔
302

303
        if strings.TrimSpace(storageEndpointSuffix) == "" {
16✔
304
                storageEndpointSuffix = d.getStorageEndPointSuffix()
8✔
305
        }
8✔
306

307
        if storeAccountKey && !ptr.Deref(allowSharedKeyAccess, true) {
9✔
308
                return nil, status.Errorf(codes.InvalidArgument, "storeAccountKey is not supported for account with shared access key disabled")
1✔
309
        }
1✔
310

311
        requestName := "controller_create_volume"
7✔
312
        if volContentSource != nil {
9✔
313
                switch volContentSource.Type.(type) {
2✔
314
                case *csi.VolumeContentSource_Snapshot:
1✔
315
                        return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
1✔
316
                case *csi.VolumeContentSource_Volume:
1✔
317
                        requestName = "controller_create_volume_from_volume"
1✔
318
                        if volContentSource.GetVolume() != nil {
2✔
319
                                sourceID := volContentSource.GetVolume().VolumeId
1✔
320
                                _, srcAccountName, _, _, _, err = GetContainerInfo(sourceID) //nolint:dogsled
1✔
321
                                if err != nil {
2✔
322
                                        klog.Errorf("failed to get source volume info from sourceID(%s), error: %v", sourceID, err)
1✔
323
                                } else {
1✔
324
                                        klog.V(2).Infof("source volume account name: %s, sourceID: %s", srcAccountName, sourceID)
×
325
                                }
×
326
                        }
327
                }
328
        }
329

330
        accountOptions := &azure.AccountOptions{
6✔
331
                Name:                            account,
6✔
332
                Type:                            storageAccountType,
6✔
333
                Kind:                            accountKind,
6✔
334
                SubscriptionID:                  subsID,
6✔
335
                ResourceGroup:                   resourceGroup,
6✔
336
                Location:                        location,
6✔
337
                EnableHTTPSTrafficOnly:          enableHTTPSTrafficOnly,
6✔
338
                VirtualNetworkResourceIDs:       vnetResourceIDs,
6✔
339
                Tags:                            tags,
6✔
340
                MatchTags:                       matchTags,
6✔
341
                IsHnsEnabled:                    isHnsEnabled,
6✔
342
                EnableNfsV3:                     enableNfsV3,
6✔
343
                AllowBlobPublicAccess:           allowBlobPublicAccess,
6✔
344
                AllowSharedKeyAccess:            allowSharedKeyAccess,
6✔
345
                RequireInfrastructureEncryption: requireInfraEncryption,
6✔
346
                VNetResourceGroup:               vnetResourceGroup,
6✔
347
                VNetName:                        vnetName,
6✔
348
                SubnetName:                      subnetName,
6✔
349
                AccessTier:                      accessTier,
6✔
350
                CreatePrivateEndpoint:           createPrivateEndpoint,
6✔
351
                StorageType:                     provider.StorageTypeBlob,
6✔
352
                StorageEndpointSuffix:           storageEndpointSuffix,
6✔
353
                EnableBlobVersioning:            enableBlobVersioning,
6✔
354
                SoftDeleteBlobs:                 softDeleteBlobs,
6✔
355
                SoftDeleteContainers:            softDeleteContainers,
6✔
356
                GetLatestAccountKey:             getLatestAccountKey,
6✔
357
                SourceAccountName:               srcAccountName,
6✔
358
        }
6✔
359

6✔
360
        containerName = replaceWithMap(containerName, containerNameReplaceMap)
6✔
361
        validContainerName := containerName
6✔
362
        if validContainerName == "" {
7✔
363
                validContainerName = volName
1✔
364
                if containerNamePrefix != "" {
1✔
365
                        validContainerName = containerNamePrefix + "-" + volName
×
366
                }
×
367
                validContainerName = getValidContainerName(validContainerName, protocol)
1✔
368
                setKeyValueInMap(parameters, containerNameField, validContainerName)
1✔
369
        }
370

371
        if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
6✔
372
                // logging the job status if it's volume cloning
×
373
                if volContentSource != nil {
×
374
                        jobState, percent, err := d.azcopy.GetAzcopyJob(validContainerName, []string{})
×
375
                        return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err)
×
376
                }
×
377
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
×
378
        }
379
        defer d.volumeLocks.Release(volName)
6✔
380

6✔
381
        var volumeID string
6✔
382
        mc := metrics.NewMetricContext(blobCSIDriverName, requestName, d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
6✔
383
        isOperationSucceeded := false
6✔
384
        defer func() {
12✔
385
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
6✔
386
        }()
6✔
387

388
        var accountKey string
6✔
389
        accountName := account
6✔
390
        if len(secrets) == 0 && accountName == "" {
7✔
391
                if v, ok := d.volMap.Load(volName); ok {
1✔
392
                        accountName = v.(string)
×
393
                } else {
1✔
394
                        lockKey := fmt.Sprintf("%s%s%s%s%s%v", storageAccountType, accountKind, resourceGroup, location, protocol, ptr.Deref(createPrivateEndpoint, false))
1✔
395
                        // search in cache first
1✔
396
                        cache, err := d.accountSearchCache.Get(ctx, lockKey, azcache.CacheReadTypeDefault)
1✔
397
                        if err != nil {
1✔
398
                                return nil, status.Errorf(codes.Internal, "%v", err)
×
399
                        }
×
400
                        if cache != nil {
1✔
401
                                accountName = cache.(string)
×
402
                        } else {
1✔
403
                                d.volLockMap.LockEntry(lockKey)
1✔
404
                                err = wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
2✔
405
                                        var retErr error
1✔
406
                                        accountName, accountKey, retErr = d.cloud.EnsureStorageAccount(ctx, accountOptions, protocol)
1✔
407
                                        if isRetriableError(retErr) {
1✔
408
                                                klog.Warningf("EnsureStorageAccount(%s) failed with error(%v), waiting for retrying", account, retErr)
×
409
                                                return false, nil
×
410
                                        }
×
411
                                        return true, retErr
1✔
412
                                })
413
                                d.volLockMap.UnlockEntry(lockKey)
1✔
414
                                if err != nil {
2✔
415
                                        return nil, status.Errorf(codes.Internal, "ensure storage account failed with %v", err)
1✔
416
                                }
1✔
417
                                d.accountSearchCache.Set(lockKey, accountName)
×
418
                                d.volMap.Store(volName, accountName)
×
419
                        }
420
                }
421
        }
422

423
        if ptr.Deref(createPrivateEndpoint, false) && isNFSProtocol(protocol) {
5✔
424
                // As for blobfuse/blobfuse2, serverName, i.e.,AZURE_STORAGE_BLOB_ENDPOINT env variable can't include
×
425
                // "privatelink", issue: https://github.com/Azure/azure-storage-fuse/issues/1014
×
426
                //
×
427
                // And use public endpoint will be befine to blobfuse/blobfuse2, because it will be resolved to private endpoint
×
428
                // by private dns zone, which includes CNAME record, documented here:
×
429
                // https://learn.microsoft.com/en-us/azure/storage/common/storage-private-endpoints?toc=%2Fazure%2Fstorage%2Fblobs%2Ftoc.json&bc=%2Fazure%2Fstorage%2Fblobs%2Fbreadcrumb%2Ftoc.json#dns-changes-for-private-endpoints
×
430
                setKeyValueInMap(parameters, serverNameField, fmt.Sprintf("%s.privatelink.blob.%s", accountName, storageEndpointSuffix))
×
431
        }
×
432

433
        accountOptions.Name = accountName
5✔
434
        if len(secrets) == 0 && useDataPlaneAPI {
6✔
435
                if accountKey == "" {
2✔
436
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
2✔
437
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
438
                        }
1✔
439
                }
440
                secrets = createStorageAccountSecret(accountName, accountKey)
×
441
        }
442

443
        klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
4✔
444
        if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
5✔
445
                return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
1✔
446
        }
1✔
447
        if volContentSource != nil {
4✔
448
                accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace, false)
1✔
449
                if err != nil {
1✔
450
                        return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
×
451
                }
×
452
                copyErr := d.copyVolume(ctx, req, accountName, accountSASToken, authAzcopyEnv, validContainerName, secretNamespace, accountOptions, storageEndpointSuffix)
1✔
453
                if accountSASToken == "" && copyErr != nil && strings.Contains(copyErr.Error(), authorizationPermissionMismatch) {
1✔
454
                        klog.Warningf("azcopy copy failed with AuthorizationPermissionMismatch error, should assign \"Storage Blob Data Contributor\" role to controller identity, fall back to use sas token, original error: %v", copyErr)
×
455
                        accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace, true)
×
456
                        if err != nil {
×
457
                                return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
×
458
                        }
×
459
                        copyErr = d.copyVolume(ctx, req, accountName, accountSASToken, authAzcopyEnv, validContainerName, secretNamespace, accountOptions, storageEndpointSuffix)
×
460
                }
461
                if copyErr != nil {
2✔
462
                        return nil, copyErr
1✔
463
                }
1✔
464
        }
465

466
        if storeAccountKey && len(req.GetSecrets()) == 0 {
4✔
467
                if accountKey == "" {
4✔
468
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
3✔
469
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
470
                        }
1✔
471
                }
472

473
                secretName, err := setAzureCredentials(ctx, d.KubeClient, accountName, accountKey, secretNamespace)
1✔
474
                if err != nil {
1✔
475
                        return nil, status.Errorf(codes.Internal, "failed to store storage account key: %v", err)
×
476
                }
×
477
                if secretName != "" {
1✔
478
                        klog.V(2).Infof("store account key to k8s secret(%v) in %s namespace", secretName, secretNamespace)
×
479
                }
×
480
        }
481

482
        var uuid string
1✔
483
        if containerName != "" {
2✔
484
                // add volume name as suffix to differentiate volumeID since "containerName" is specified
1✔
485
                // not necessary for dynamic container name creation since volumeID already contains volume name
1✔
486
                uuid = volName
1✔
487
        }
1✔
488
        volumeID = fmt.Sprintf(volumeIDTemplate, resourceGroup, accountName, validContainerName, uuid, secretNamespace, subsID)
1✔
489
        klog.V(2).Infof("create container %s on storage account %s successfully", validContainerName, accountName)
1✔
490

1✔
491
        if useDataPlaneAPI {
1✔
492
                d.dataPlaneAPIVolCache.Set(volumeID, "")
×
493
                d.dataPlaneAPIVolCache.Set(accountName, "")
×
494
        }
×
495

496
        isOperationSucceeded = true
1✔
497
        // reset secretNamespace field in VolumeContext
1✔
498
        setKeyValueInMap(parameters, secretNamespaceField, secretNamespace)
1✔
499
        return &csi.CreateVolumeResponse{
1✔
500
                Volume: &csi.Volume{
1✔
501
                        VolumeId:      volumeID,
1✔
502
                        CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
1✔
503
                        VolumeContext: parameters,
1✔
504
                        ContentSource: volContentSource,
1✔
505
                },
1✔
506
        }, nil
1✔
507
}
508

509
// DeleteVolume delete a volume
510
func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
5✔
511
        volumeID := req.GetVolumeId()
5✔
512
        if len(volumeID) == 0 {
6✔
513
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
514
        }
1✔
515

516
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
4✔
517
                return nil, status.Errorf(codes.Internal, "invalid delete volume req: %v", req)
×
518
        }
×
519

520
        if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
4✔
521
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
×
522
        }
×
523
        defer d.volumeLocks.Release(volumeID)
4✔
524

4✔
525
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
4✔
526
        if err != nil {
5✔
527
                // According to CSI Driver Sanity Tester, should succeed when an invalid volume id is used
1✔
528
                klog.Errorf("GetContainerInfo(%s) in DeleteVolume failed with error: %v", volumeID, err)
1✔
529
                return &csi.DeleteVolumeResponse{}, nil
1✔
530
        }
1✔
531

532
        secrets := req.GetSecrets()
3✔
533
        if len(secrets) == 0 && d.useDataPlaneAPI(ctx, volumeID, accountName) {
4✔
534
                _, accountName, accountKey, _, _, err := d.GetAuthEnv(ctx, volumeID, "", nil, secrets)
1✔
535
                if err != nil {
2✔
536
                        return nil, status.Errorf(codes.Internal, "GetAuthEnv(%s) failed with %v", volumeID, err)
1✔
537
                }
1✔
538
                if accountName != "" && accountKey != "" {
×
539
                        secrets = createStorageAccountSecret(accountName, accountKey)
×
540
                }
×
541
        }
542

543
        mc := metrics.NewMetricContext(blobCSIDriverName, "controller_delete_volume", d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
2✔
544
        isOperationSucceeded := false
2✔
545
        defer func() {
4✔
546
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
2✔
547
        }()
2✔
548

549
        if resourceGroupName == "" {
3✔
550
                resourceGroupName = d.cloud.ResourceGroup
1✔
551
        }
1✔
552
        klog.V(2).Infof("deleting container(%s) rg(%s) account(%s) volumeID(%s)", containerName, resourceGroupName, accountName, volumeID)
2✔
553
        if err := d.DeleteBlobContainer(ctx, subsID, resourceGroupName, accountName, containerName, secrets); err != nil {
4✔
554
                return nil, status.Errorf(codes.Internal, "failed to delete container(%s) under rg(%s) account(%s) volumeID(%s), error: %v", containerName, resourceGroupName, accountName, volumeID, err)
2✔
555
        }
2✔
556

557
        isOperationSucceeded = true
×
558
        klog.V(2).Infof("container(%s) under rg(%s) account(%s) volumeID(%s) is deleted successfully", containerName, resourceGroupName, accountName, volumeID)
×
559
        return &csi.DeleteVolumeResponse{}, nil
×
560
}
561

562
// ValidateVolumeCapabilities return the capabilities of the volume
563
func (d *Driver) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
8✔
564
        volumeID := req.GetVolumeId()
8✔
565
        if len(volumeID) == 0 {
9✔
566
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
567
        }
1✔
568
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
9✔
569
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
570
        }
2✔
571

572
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
5✔
573
        if err != nil {
6✔
574
                klog.Errorf("GetContainerInfo(%s) in ValidateVolumeCapabilities failed with error: %v", volumeID, err)
1✔
575
                return nil, status.Error(codes.NotFound, err.Error())
1✔
576
        }
1✔
577

578
        var exist bool
4✔
579
        secrets := req.GetSecrets()
4✔
580
        if len(secrets) > 0 {
5✔
581
                container, err := getContainerReference(containerName, secrets, d.getCloudEnvironment())
1✔
582
                if err != nil {
2✔
583
                        return nil, status.Error(codes.Internal, err.Error())
1✔
584
                }
1✔
585
                exist, err = container.Exists()
×
586
                if err != nil {
×
587
                        return nil, status.Error(codes.Internal, err.Error())
×
588
                }
×
589
        } else {
3✔
590
                if resourceGroupName == "" {
3✔
591
                        resourceGroupName = d.cloud.ResourceGroup
×
592
                }
×
593
                blobClient, err := d.clientFactory.GetBlobContainerClientForSub(subsID)
3✔
594
                if err != nil {
3✔
595
                        return nil, status.Error(codes.Internal, err.Error())
×
596
                }
×
597

598
                blobContainer, err := blobClient.Get(ctx, resourceGroupName, accountName, containerName)
3✔
599
                if err != nil {
4✔
600
                        return nil, status.Error(codes.Internal, err.Error())
1✔
601
                }
1✔
602
                if blobContainer.ContainerProperties == nil {
3✔
603
                        return nil, status.Errorf(codes.Internal, "ContainerProperties of volume(%s) is nil", volumeID)
1✔
604
                }
1✔
605
                exist = blobContainer.ContainerProperties.Deleted != nil && !*blobContainer.ContainerProperties.Deleted
1✔
606
        }
607
        if !exist {
2✔
608
                return nil, status.Errorf(codes.NotFound, "requested volume(%s) does not exist", volumeID)
1✔
609
        }
1✔
610
        klog.V(2).Infof("ValidateVolumeCapabilities on volume(%s) succeeded", volumeID)
×
611

×
612
        // blob driver supports all AccessModes, no need to check capabilities here
×
613
        return &csi.ValidateVolumeCapabilitiesResponse{
×
614
                Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
×
615
                        VolumeCapabilities: req.GetVolumeCapabilities(),
×
616
                },
×
617
                Message: "",
×
618
        }, nil
×
619
}
620

621
// ControllerModifyVolume modify volume
622
func (d *Driver) ControllerModifyVolume(_ context.Context, _ *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) {
×
623
        return nil, status.Error(codes.Unimplemented, "")
×
624
}
×
625

626
func (d *Driver) ControllerPublishVolume(_ context.Context, _ *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
1✔
627
        return nil, status.Error(codes.Unimplemented, "ControllerPublishVolume is not yet implemented")
1✔
628
}
1✔
629

630
func (d *Driver) ControllerUnpublishVolume(_ context.Context, _ *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
1✔
631
        return nil, status.Error(codes.Unimplemented, "ControllerUnpublishVolume is not yet implemented")
1✔
632
}
1✔
633

634
// ControllerGetVolume get volume
635
func (d *Driver) ControllerGetVolume(context.Context, *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
1✔
636
        return nil, status.Error(codes.Unimplemented, "ControllerGetVolume is not yet implemented")
1✔
637
}
1✔
638

639
// GetCapacity returns the capacity of the total available storage pool
640
func (d *Driver) GetCapacity(_ context.Context, _ *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
1✔
641
        return nil, status.Error(codes.Unimplemented, "GetCapacity is not yet implemented")
1✔
642
}
1✔
643

644
// ListVolumes return all available volumes
645
func (d *Driver) ListVolumes(_ context.Context, _ *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
1✔
646
        return nil, status.Error(codes.Unimplemented, "ListVolumes is not yet implemented")
1✔
647
}
1✔
648

649
// CreateSnapshot create snapshot
650
func (d *Driver) CreateSnapshot(_ context.Context, _ *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
1✔
651
        return nil, status.Error(codes.Unimplemented, "CreateSnapshot is not yet implemented")
1✔
652
}
1✔
653

654
// DeleteSnapshot delete snapshot
655
func (d *Driver) DeleteSnapshot(_ context.Context, _ *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
1✔
656
        return nil, status.Error(codes.Unimplemented, "DeleteSnapshot is not yet implemented")
1✔
657
}
1✔
658

659
// ListSnapshots list snapshots
660
func (d *Driver) ListSnapshots(_ context.Context, _ *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
1✔
661
        return nil, status.Error(codes.Unimplemented, "ListSnapshots is not yet implemented")
1✔
662
}
1✔
663

664
// ControllerGetCapabilities returns the capabilities of the Controller plugin
665
func (d *Driver) ControllerGetCapabilities(_ context.Context, _ *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
1✔
666
        return &csi.ControllerGetCapabilitiesResponse{
1✔
667
                Capabilities: d.Cap,
1✔
668
        }, nil
1✔
669
}
1✔
670

671
// ControllerExpandVolume controller expand volume
672
func (d *Driver) ControllerExpandVolume(_ context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
4✔
673
        if len(req.GetVolumeId()) == 0 {
5✔
674
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
675
        }
1✔
676

677
        if req.GetCapacityRange() == nil {
4✔
678
                return nil, status.Error(codes.InvalidArgument, "Capacity Range missing in request")
1✔
679
        }
1✔
680

681
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_EXPAND_VOLUME); err != nil {
2✔
682
                return nil, status.Errorf(codes.Internal, "invalid expand volume req: %v", req)
×
683
        }
×
684

685
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
2✔
686
        requestGiB := int64(util.RoundUpGiB(volSizeBytes))
2✔
687

2✔
688
        if volSizeBytes > containerMaxSize {
3✔
689
                return nil, status.Errorf(codes.OutOfRange, "required bytes (%d) exceeds the maximum supported bytes (%d)", volSizeBytes, containerMaxSize)
1✔
690
        }
1✔
691

692
        klog.V(2).Infof("ControllerExpandVolume(%s) successfully, currentQuota: %d Gi", req.VolumeId, requestGiB)
1✔
693

1✔
694
        return &csi.ControllerExpandVolumeResponse{CapacityBytes: req.GetCapacityRange().GetRequiredBytes()}, nil
1✔
695
}
696

697
// CreateBlobContainer creates a blob container
698
func (d *Driver) CreateBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
10✔
699
        if containerName == "" {
11✔
700
                return fmt.Errorf("containerName is empty")
1✔
701
        }
1✔
702
        return wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
18✔
703
                var err error
9✔
704
                if len(secrets) > 0 {
10✔
705
                        container, getErr := getContainerReference(containerName, secrets, d.getCloudEnvironment())
1✔
706
                        if getErr != nil {
2✔
707
                                return true, getErr
1✔
708
                        }
1✔
709
                        container.Metadata = map[string]string{createdByMetadata: d.Name}
×
710
                        _, err = container.CreateIfNotExists(&azstorage.CreateContainerOptions{Access: azstorage.ContainerAccessTypePrivate})
×
711
                } else {
8✔
712
                        blobContainer := armstorage.BlobContainer{
8✔
713
                                ContainerProperties: &armstorage.ContainerProperties{
8✔
714
                                        PublicAccess: to.Ptr(armstorage.PublicAccessNone),
8✔
715
                                        Metadata:     map[string]*string{createdByMetadata: to.Ptr(d.Name)},
8✔
716
                                },
8✔
717
                        }
8✔
718
                        var blobClient blobcontainerclient.Interface
8✔
719
                        blobClient, err = d.clientFactory.GetBlobContainerClientForSub(subsID)
8✔
720
                        if err != nil {
8✔
721
                                return true, err
×
722
                        }
×
723
                        _, err = blobClient.CreateContainer(ctx, resourceGroupName, accountName, containerName, blobContainer)
8✔
724
                }
725
                if err != nil {
12✔
726
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
4✔
727
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) {
6✔
728
                                klog.Warningf("CreateContainer(%s, %s, %s) failed with error(%v), retry", resourceGroupName, accountName, containerName, err)
2✔
729
                                return false, nil
2✔
730
                        }
2✔
731
                }
732
                return true, err
6✔
733
        })
734
}
735

736
// DeleteBlobContainer deletes a blob container
737
func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
8✔
738
        if containerName == "" {
9✔
739
                return fmt.Errorf("containerName is empty")
1✔
740
        }
1✔
741
        return wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
14✔
742
                var err error
7✔
743
                if len(secrets) > 0 {
10✔
744
                        container, getErr := getContainerReference(containerName, secrets, d.getCloudEnvironment())
3✔
745
                        if getErr != nil {
6✔
746
                                return true, getErr
3✔
747
                        }
3✔
748
                        _, err = container.DeleteIfExists(nil)
×
749
                } else {
4✔
750
                        var blobClient blobcontainerclient.Interface
4✔
751
                        blobClient, err = d.clientFactory.GetBlobContainerClientForSub(subsID)
4✔
752
                        if err != nil {
4✔
753
                                return true, err
×
754
                        }
×
755
                        err = blobClient.DeleteContainer(ctx, resourceGroupName, accountName, containerName)
4✔
756
                }
757
                if err != nil {
7✔
758
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
3✔
759
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) ||
3✔
760
                                strings.Contains(err.Error(), statusCodeNotFound) ||
3✔
761
                                strings.Contains(err.Error(), httpCodeNotFound) {
5✔
762
                                klog.Warningf("delete container(%s) on account(%s) failed with error(%v), return as success", containerName, accountName, err)
2✔
763
                                return true, nil
2✔
764
                        }
2✔
765
                        return false, fmt.Errorf("failed to delete container(%s) on account(%s), error: %w", containerName, accountName, err)
1✔
766
                }
767
                return true, err
1✔
768
        })
769
}
770

771
// copyBlobContainer copies source volume content into a destination volume
772
func (d *Driver) copyBlobContainer(ctx context.Context, req *csi.CreateVolumeRequest, dstAccountName string, dstAccountSasToken string, authAzcopyEnv []string, dstContainerName string, secretNamespace string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
7✔
773
        var sourceVolumeID string
7✔
774
        if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
14✔
775
                sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
7✔
776

7✔
777
        }
7✔
778
        srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
7✔
779
        if err != nil {
9✔
780
                return status.Error(codes.NotFound, err.Error())
2✔
781
        }
2✔
782
        if dstAccountName == "" {
9✔
783
                dstAccountName = srcAccountName
4✔
784
        }
4✔
785
        if srcAccountName == "" || srcContainerName == "" || dstContainerName == "" {
7✔
786
                return fmt.Errorf("srcAccountName(%s) or srcContainerName(%s) or dstContainerName(%s) is empty", srcAccountName, srcContainerName, dstContainerName)
2✔
787
        }
2✔
788
        srcAccountSasToken := dstAccountSasToken
3✔
789
        if srcAccountName != dstAccountName && dstAccountSasToken != "" {
4✔
790
                srcAccountOptions := &azure.AccountOptions{
1✔
791
                        Name:                srcAccountName,
1✔
792
                        ResourceGroup:       srcResourceGroupName,
1✔
793
                        SubscriptionID:      srcSubscriptionID,
1✔
794
                        GetLatestAccountKey: accountOptions.GetLatestAccountKey,
1✔
795
                }
1✔
796
                if srcAccountSasToken, _, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, nil, "", secretNamespace, true); err != nil {
2✔
797
                        return err
1✔
798
                }
1✔
799
        }
800
        srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", srcAccountName, storageEndpointSuffix, srcContainerName, srcAccountSasToken)
2✔
801
        dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", dstAccountName, storageEndpointSuffix, dstContainerName, dstAccountSasToken)
2✔
802

2✔
803
        jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
2✔
804
        klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
2✔
805
        switch jobState {
2✔
806
        case util.AzcopyJobError, util.AzcopyJobCompleted:
1✔
807
                return err
1✔
808
        case util.AzcopyJobRunning:
1✔
809
                err = wait.PollImmediate(20*time.Second, time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, func() (bool, error) {
6✔
810
                        jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
5✔
811
                        klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
5✔
812
                        if err != nil {
5✔
813
                                return false, err
×
814
                        }
×
815
                        if jobState == util.AzcopyJobRunning {
10✔
816
                                return false, nil
5✔
817
                        }
5✔
818
                        return true, nil
×
819
                })
820
        case util.AzcopyJobNotFound:
×
821
                klog.V(2).Infof("copy blob container %s:%s to %s:%s", srcAccountName, srcContainerName, dstAccountName, dstContainerName)
×
822
                execFunc := func() error {
×
823
                        if out, err := d.execAzcopyCopy(srcPath, dstPath, azcopyCloneVolumeOptions, authAzcopyEnv); err != nil {
×
824
                                return fmt.Errorf("exec error: %v, output: %v", err, string(out))
×
825
                        }
×
826
                        return nil
×
827
                }
828
                timeoutFunc := func() error {
×
829
                        _, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
×
830
                        return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcContainerName, dstContainerName, percent)
×
831
                }
×
832
                err = util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
×
833
        }
834
        if err != nil {
2✔
835
                klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstContainerName, err)
1✔
836
        } else {
1✔
837
                klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
×
838
        }
×
839
        return err
1✔
840
}
841

842
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
843
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountName string, accountSASToken string, authAzcopyEnv []string, dstContainerName, secretNamespace string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
8✔
844
        vs := req.VolumeContentSource
8✔
845
        switch vs.Type.(type) {
8✔
846
        case *csi.VolumeContentSource_Snapshot:
1✔
847
                return status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
1✔
848
        case *csi.VolumeContentSource_Volume:
7✔
849
                return d.copyBlobContainer(ctx, req, accountName, accountSASToken, authAzcopyEnv, dstContainerName, secretNamespace, accountOptions, storageEndpointSuffix)
7✔
850
        default:
×
851
                return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
×
852
        }
853
}
854

855
// execAzcopyCopy exec azcopy copy command
856
func (d *Driver) execAzcopyCopy(srcPath, dstPath string, azcopyCopyOptions, authAzcopyEnv []string) ([]byte, error) {
×
857
        cmd := exec.Command("azcopy", "copy", srcPath, dstPath)
×
858
        cmd.Args = append(cmd.Args, azcopyCopyOptions...)
×
859
        if len(authAzcopyEnv) > 0 {
×
860
                cmd.Env = append(os.Environ(), authAzcopyEnv...)
×
861
        }
×
862
        return cmd.CombinedOutput()
×
863
}
864

865
// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
866
func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
6✔
867
        azureAuthConfig := d.cloud.Config.AzureAuthConfig
6✔
868
        azureClientConfig := d.cloud.Config.AzureClientConfig
6✔
869
        var authAzcopyEnv []string
6✔
870
        if azureAuthConfig.UseManagedIdentityExtension {
9✔
871
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, MSI))
3✔
872
                if len(azureAuthConfig.UserAssignedIdentityID) > 0 {
4✔
873
                        klog.V(2).Infof("use user assigned managed identity to authorize azcopy")
1✔
874
                        authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyMSIClientID, azureAuthConfig.UserAssignedIdentityID))
1✔
875
                } else {
3✔
876
                        klog.V(2).Infof("use system-assigned managed identity to authorize azcopy")
2✔
877
                }
2✔
878
                return authAzcopyEnv, nil
3✔
879
        }
880
        if len(azureAuthConfig.AADClientSecret) > 0 {
5✔
881
                klog.V(2).Infof("use service principal to authorize azcopy")
2✔
882
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, SPN))
2✔
883
                if azureAuthConfig.AADClientID == "" || azureClientConfig.TenantID == "" {
3✔
884
                        return []string{}, fmt.Errorf("AADClientID and TenantID must be set when use service principal")
1✔
885
                }
1✔
886
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAApplicationID, azureAuthConfig.AADClientID))
1✔
887
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAClientSecret, azureAuthConfig.AADClientSecret))
1✔
888
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyTenantID, azureClientConfig.TenantID))
1✔
889
                klog.V(2).Infof("set AZCOPY_SPA_APPLICATION_ID=%s, AZCOPY_TENANT_ID=%s successfully", azureAuthConfig.AADClientID, azureClientConfig.TenantID)
1✔
890

1✔
891
                return authAzcopyEnv, nil
1✔
892
        }
893
        return []string{}, fmt.Errorf("service principle or managed identity are both not set")
1✔
894
}
895

896
// getAzcopyAuth will only generate sas token for azcopy in following conditions:
897
// 1. secrets is not empty
898
// 2. driver is not using managed identity and service principal
899
// 3. parameter useSasToken is true
900
func (d *Driver) getAzcopyAuth(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, accountOptions *azure.AccountOptions, secrets map[string]string, secretName, secretNamespace string, useSasToken bool) (string, []string, error) {
6✔
901
        var authAzcopyEnv []string
6✔
902
        var err error
6✔
903
        if !useSasToken && !d.useDataPlaneAPI(ctx, "", accountName) && len(secrets) == 0 && len(secretName) == 0 {
7✔
904
                // search in cache first
1✔
905
                if cache, err := d.azcopySasTokenCache.Get(ctx, accountName, azcache.CacheReadTypeDefault); err == nil && cache != nil {
1✔
906
                        klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
907
                        return cache.(string), nil, nil
×
908
                }
×
909

910
                authAzcopyEnv, err = d.authorizeAzcopyWithIdentity()
1✔
911
                if err != nil {
1✔
912
                        klog.Warningf("failed to authorize azcopy with identity, error: %v", err)
×
913
                }
×
914
        }
915

916
        if len(secrets) > 0 || len(secretName) > 0 || len(authAzcopyEnv) == 0 || useSasToken {
11✔
917
                if accountKey == "" {
10✔
918
                        if _, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
7✔
919
                                return "", nil, err
2✔
920
                        }
2✔
921
                }
922
                klog.V(2).Infof("generate sas token for account(%s)", accountName)
3✔
923
                sasToken, err := d.generateSASToken(ctx, accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
3✔
924
                return sasToken, nil, err
3✔
925
        }
926
        return "", authAzcopyEnv, nil
1✔
927
}
928

929
// isValidVolumeCapabilities validates the given VolumeCapability array is valid
930
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
30✔
931
        if len(volCaps) == 0 {
32✔
932
                return fmt.Errorf("volume capabilities missing in request")
2✔
933
        }
2✔
934
        for _, c := range volCaps {
56✔
935
                if c.GetBlock() != nil {
30✔
936
                        return fmt.Errorf("block volume capability not supported")
2✔
937
                }
2✔
938
        }
939
        return nil
26✔
940
}
941

942
func parseDays(dayStr string) (int32, error) {
3✔
943
        days, err := strconv.Atoi(dayStr)
3✔
944
        if err != nil {
4✔
945
                return 0, status.Errorf(codes.InvalidArgument, "invalid %s:%s in storage class", softDeleteBlobsField, dayStr)
1✔
946
        }
1✔
947
        if days <= 0 || days > 365 {
3✔
948
                return 0, status.Errorf(codes.InvalidArgument, "invalid %s:%s in storage class, should be in range [1, 365]", softDeleteBlobsField, dayStr)
1✔
949
        }
1✔
950

951
        return int32(days), nil
1✔
952
}
953

954
// generateSASToken generate a sas token for storage account
955
func (d *Driver) generateSASToken(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, expiryTime int) (string, error) {
5✔
956
        // search in cache first
5✔
957
        cache, err := d.azcopySasTokenCache.Get(ctx, accountName, azcache.CacheReadTypeDefault)
5✔
958
        if err != nil {
5✔
959
                return "", fmt.Errorf("get(%s) from azcopySasTokenCache failed with error: %v", accountName, err)
×
960
        }
×
961
        if cache != nil {
5✔
962
                klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
963
                return cache.(string), nil
×
964
        }
×
965

966
        credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
5✔
967
        if err != nil {
8✔
968
                return "", status.Errorf(codes.Internal, "failed to generate sas token in creating new shared key credential, accountName: %s, err: %v", accountName, err)
3✔
969
        }
3✔
970
        clientOptions := service.ClientOptions{}
2✔
971
        clientOptions.InsecureAllowCredentialWithHTTP = true
2✔
972
        serviceClient, err := service.NewClientWithSharedKeyCredential(fmt.Sprintf("https://%s.blob.%s/", accountName, storageEndpointSuffix), credential, &clientOptions)
2✔
973
        if err != nil {
2✔
974
                return "", status.Errorf(codes.Internal, "failed to generate sas token in creating new client with shared key credential, accountName: %s, err: %v", accountName, err)
×
975
        }
×
976
        sasURL, err := serviceClient.GetSASURL(
2✔
977
                sas.AccountResourceTypes{Object: true, Service: false, Container: true},
2✔
978
                sas.AccountPermissions{Read: true, List: true, Write: true},
2✔
979
                time.Now().Add(time.Duration(expiryTime)*time.Minute),
2✔
980
                &service.GetSASURLOptions{StartTime: to.Ptr(time.Now())},
2✔
981
        )
2✔
982
        if err != nil {
2✔
983
                return "", err
×
984
        }
×
985
        u, err := url.Parse(sasURL)
2✔
986
        if err != nil {
2✔
987
                return "", err
×
988
        }
×
989
        sasToken := "?" + u.RawQuery
2✔
990
        d.azcopySasTokenCache.Set(accountName, sasToken)
2✔
991
        return sasToken, nil
2✔
992
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc