• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 10512035464

22 Aug 2024 04:32PM UTC coverage: 73.88%. Remained the same
10512035464

Pull #1553

github

web-flow
chore(deps): bump github.com/onsi/ginkgo/v2 from 2.19.1 to 2.20.1

Bumps [github.com/onsi/ginkgo/v2](https://github.com/onsi/ginkgo) from 2.19.1 to 2.20.1.
- [Release notes](https://github.com/onsi/ginkgo/releases)
- [Changelog](https://github.com/onsi/ginkgo/blob/master/CHANGELOG.md)
- [Commits](https://github.com/onsi/ginkgo/compare/v2.19.1...v2.20.1)

---
updated-dependencies:
- dependency-name: github.com/onsi/ginkgo/v2
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #1553: chore(deps): bump github.com/onsi/ginkgo/v2 from 2.19.1 to 2.20.1

2260 of 3059 relevant lines covered (73.88%)

7.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.12
/pkg/blob/controllerserver.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package blob
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "os"
24
        "os/exec"
25
        "strconv"
26
        "strings"
27
        "time"
28

29
        "google.golang.org/grpc/codes"
30
        "google.golang.org/grpc/status"
31

32
        "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
33
        "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage"
34
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
35
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
36
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
37
        "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage"
38
        azstorage "github.com/Azure/azure-sdk-for-go/storage"
39
        "github.com/container-storage-interface/spec/lib/go/csi"
40

41
        "k8s.io/apimachinery/pkg/util/wait"
42
        "k8s.io/klog/v2"
43
        "k8s.io/utils/pointer"
44

45
        "sigs.k8s.io/blob-csi-driver/pkg/util"
46
        "sigs.k8s.io/cloud-provider-azure/pkg/azclient/blobcontainerclient"
47
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
48
        "sigs.k8s.io/cloud-provider-azure/pkg/metrics"
49
        "sigs.k8s.io/cloud-provider-azure/pkg/provider"
50
        azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
51
)
52

53
const (
54
        privateEndpoint = "privateendpoint"
55

56
        azcopyAutoLoginType             = "AZCOPY_AUTO_LOGIN_TYPE"
57
        azcopySPAApplicationID          = "AZCOPY_SPA_APPLICATION_ID"
58
        azcopySPAClientSecret           = "AZCOPY_SPA_CLIENT_SECRET"
59
        azcopyTenantID                  = "AZCOPY_TENANT_ID"
60
        azcopyMSIClientID               = "AZCOPY_MSI_CLIENT_ID"
61
        MSI                             = "MSI"
62
        SPN                             = "SPN"
63
        authorizationPermissionMismatch = "AuthorizationPermissionMismatch"
64

65
        createdByMetadata = "createdBy"
66
)
67

68
// CreateVolume provisions a volume
69
func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
24✔
70
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
24✔
71
                klog.Errorf("invalid create volume req: %v", req)
×
72
                return nil, err
×
73
        }
×
74

75
        volName := req.GetName()
24✔
76
        if len(volName) == 0 {
25✔
77
                return nil, status.Error(codes.InvalidArgument, "CreateVolume Name must be provided")
1✔
78
        }
1✔
79

80
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
25✔
81
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
82
        }
2✔
83

84
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
21✔
85
        requestGiB := int(util.RoundUpGiB(volSizeBytes))
21✔
86

21✔
87
        volContentSource := req.GetVolumeContentSource()
21✔
88
        secrets := req.GetSecrets()
21✔
89

21✔
90
        parameters := req.GetParameters()
21✔
91
        if parameters == nil {
22✔
92
                parameters = make(map[string]string)
1✔
93
        }
1✔
94
        var storageAccountType, subsID, resourceGroup, location, account, containerName, containerNamePrefix, protocol, customTags, secretName, secretNamespace, pvcNamespace, tagValueDelimiter string
21✔
95
        var isHnsEnabled, requireInfraEncryption, enableBlobVersioning, createPrivateEndpoint, enableNfsV3, allowSharedKeyAccess *bool
21✔
96
        var vnetResourceGroup, vnetName, subnetName, accessTier, networkEndpointType, storageEndpointSuffix, fsGroupChangePolicy string
21✔
97
        var matchTags, useDataPlaneAPI, getLatestAccountKey bool
21✔
98
        var softDeleteBlobs, softDeleteContainers int32
21✔
99
        var vnetResourceIDs []string
21✔
100
        var err error
21✔
101
        // set allowBlobPublicAccess as false by default
21✔
102
        allowBlobPublicAccess := pointer.Bool(false)
21✔
103

21✔
104
        containerNameReplaceMap := map[string]string{}
21✔
105

21✔
106
        // store account key to k8s secret by default
21✔
107
        storeAccountKey := true
21✔
108

21✔
109
        // Apply ProvisionerParameters (case-insensitive). We leave validation of
21✔
110
        // the values to the cloud provider.
21✔
111
        for k, v := range parameters {
125✔
112
                switch strings.ToLower(k) {
104✔
113
                case skuNameField:
9✔
114
                        storageAccountType = v
9✔
115
                case storageAccountTypeField:
11✔
116
                        storageAccountType = v
11✔
117
                case locationField:
10✔
118
                        location = v
10✔
119
                case storageAccountField:
11✔
120
                        account = v
11✔
121
                case subscriptionIDField:
×
122
                        subsID = v
×
123
                case resourceGroupField:
10✔
124
                        resourceGroup = v
10✔
125
                case containerNameField:
11✔
126
                        containerName = v
11✔
127
                case containerNamePrefixField:
2✔
128
                        containerNamePrefix = v
2✔
129
                case protocolField:
10✔
130
                        protocol = v
10✔
131
                case tagsField:
1✔
132
                        customTags = v
1✔
133
                case matchTagsField:
1✔
134
                        matchTags = strings.EqualFold(v, trueValue)
1✔
135
                case secretNameField:
×
136
                        secretName = v
×
137
                case secretNamespaceField:
×
138
                        secretNamespace = v
×
139
                case isHnsEnabledField:
×
140
                        if strings.EqualFold(v, trueValue) {
×
141
                                isHnsEnabled = pointer.Bool(true)
×
142
                        }
×
143
                case softDeleteBlobsField:
×
144
                        days, err := parseDays(v)
×
145
                        if err != nil {
×
146
                                return nil, err
×
147
                        }
×
148
                        softDeleteBlobs = days
×
149
                case softDeleteContainersField:
×
150
                        days, err := parseDays(v)
×
151
                        if err != nil {
×
152
                                return nil, err
×
153
                        }
×
154
                        softDeleteContainers = days
×
155
                case enableBlobVersioningField:
×
156
                        enableBlobVersioning = pointer.Bool(strings.EqualFold(v, trueValue))
×
157
                case storeAccountKeyField:
3✔
158
                        if strings.EqualFold(v, falseValue) {
5✔
159
                                storeAccountKey = false
2✔
160
                        }
2✔
161
                case getLatestAccountKeyField:
1✔
162
                        if getLatestAccountKey, err = strconv.ParseBool(v); err != nil {
2✔
163
                                return nil, status.Errorf(codes.InvalidArgument, "invalid %s: %s in volume context", getLatestAccountKeyField, v)
1✔
164
                        }
1✔
165
                case allowBlobPublicAccessField:
×
166
                        if strings.EqualFold(v, trueValue) {
×
167
                                allowBlobPublicAccess = pointer.Bool(true)
×
168
                        }
×
169
                case allowSharedKeyAccessField:
2✔
170
                        var boolValue bool
2✔
171
                        if boolValue, err = strconv.ParseBool(v); err != nil {
3✔
172
                                return nil, status.Errorf(codes.InvalidArgument, "invalid %s: %s in volume context", allowSharedKeyAccessField, v)
1✔
173
                        }
1✔
174
                        allowSharedKeyAccess = pointer.Bool(boolValue)
1✔
175
                case requireInfraEncryptionField:
×
176
                        if strings.EqualFold(v, trueValue) {
×
177
                                requireInfraEncryption = pointer.Bool(true)
×
178
                        }
×
179
                case pvcNamespaceKey:
×
180
                        pvcNamespace = v
×
181
                        containerNameReplaceMap[pvcNamespaceMetadata] = v
×
182
                case pvcNameKey:
×
183
                        containerNameReplaceMap[pvcNameMetadata] = v
×
184
                case pvNameKey:
×
185
                        containerNameReplaceMap[pvNameMetadata] = v
×
186
                case serverNameField:
×
187
                case storageAuthTypeField:
1✔
188
                case storageIdentityClientIDField:
1✔
189
                case storageIdentityObjectIDField:
1✔
190
                case storageIdentityResourceIDField:
1✔
191
                case msiEndpointField:
1✔
192
                case storageAADEndpointField:
1✔
193
                        // no op, only used in NodeStageVolume
194
                case storageEndpointSuffixField:
×
195
                        storageEndpointSuffix = v
×
196
                case vnetResourceGroupField:
×
197
                        vnetResourceGroup = v
×
198
                case vnetNameField:
×
199
                        vnetName = v
×
200
                case subnetNameField:
1✔
201
                        subnetName = v
1✔
202
                case accessTierField:
×
203
                        accessTier = v
×
204
                case networkEndpointTypeField:
1✔
205
                        networkEndpointType = v
1✔
206
                case mountPermissionsField:
11✔
207
                        // only do validations here, used in NodeStageVolume, NodePublishVolume
11✔
208
                        if v != "" {
22✔
209
                                if _, err := strconv.ParseUint(v, 8, 32); err != nil {
12✔
210
                                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid mountPermissions %s in storage class", v))
1✔
211
                                }
1✔
212
                        }
213
                case useDataPlaneAPIField:
1✔
214
                        useDataPlaneAPI = strings.EqualFold(v, trueValue)
1✔
215
                case fsGroupChangePolicyField:
1✔
216
                        fsGroupChangePolicy = v
1✔
217
                case tagValueDelimiterField:
×
218
                        tagValueDelimiter = v
×
219
                default:
1✔
220
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid parameter %q in storage class", k))
1✔
221
                }
222
        }
223

224
        if pointer.BoolDeref(enableBlobVersioning, false) {
17✔
225
                if isNFSProtocol(protocol) || pointer.BoolDeref(isHnsEnabled, false) {
×
226
                        return nil, status.Errorf(codes.InvalidArgument, "enableBlobVersioning is not supported for NFS protocol or HNS enabled account")
×
227
                }
×
228
        }
229

230
        if !isSupportedFSGroupChangePolicy(fsGroupChangePolicy) {
18✔
231
                return nil, status.Errorf(codes.InvalidArgument, "fsGroupChangePolicy(%s) is not supported, supported fsGroupChangePolicy list: %v", fsGroupChangePolicy, supportedFSGroupChangePolicyList)
1✔
232
        }
1✔
233

234
        if matchTags && account != "" {
17✔
235
                return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("matchTags must set as false when storageAccount(%s) is provided", account))
1✔
236
        }
1✔
237

238
        if resourceGroup == "" {
21✔
239
                resourceGroup = d.cloud.ResourceGroup
6✔
240
        }
6✔
241

242
        if secretNamespace == "" {
30✔
243
                if pvcNamespace == "" {
30✔
244
                        secretNamespace = defaultNamespace
15✔
245
                } else {
15✔
246
                        secretNamespace = pvcNamespace
×
247
                }
×
248
        }
249

250
        if protocol == "" {
20✔
251
                protocol = Fuse
5✔
252
        }
5✔
253
        if !isSupportedProtocol(protocol) {
16✔
254
                return nil, status.Errorf(codes.InvalidArgument, "protocol(%s) is not supported, supported protocol list: %v", protocol, supportedProtocolList)
1✔
255
        }
1✔
256
        if !isSupportedAccessTier(accessTier) {
14✔
257
                return nil, status.Errorf(codes.InvalidArgument, "accessTier(%s) is not supported, supported AccessTier list: %v", accessTier, armstorage.PossibleAccessTierValues())
×
258
        }
×
259

260
        if containerName != "" && containerNamePrefix != "" {
15✔
261
                return nil, status.Errorf(codes.InvalidArgument, "containerName(%s) and containerNamePrefix(%s) could not be specified together", containerName, containerNamePrefix)
1✔
262
        }
1✔
263
        if !isSupportedContainerNamePrefix(containerNamePrefix) {
14✔
264
                return nil, status.Errorf(codes.InvalidArgument, "containerNamePrefix(%s) can only contain lowercase letters, numbers, hyphens, and length should be less than 21", containerNamePrefix)
1✔
265
        }
1✔
266

267
        enableHTTPSTrafficOnly := true
12✔
268
        if strings.EqualFold(networkEndpointType, privateEndpoint) {
13✔
269
                if strings.Contains(subnetName, ",") {
2✔
270
                        return nil, status.Errorf(codes.InvalidArgument, "subnetName(%s) can only contain one subnet for private endpoint", subnetName)
1✔
271
                }
1✔
272
                createPrivateEndpoint = pointer.BoolPtr(true)
×
273
        }
274
        accountKind := string(armstorage.KindStorageV2)
11✔
275
        if isNFSProtocol(protocol) {
12✔
276
                isHnsEnabled = pointer.Bool(true)
1✔
277
                enableNfsV3 = pointer.Bool(true)
1✔
278
                // NFS protocol does not need account key
1✔
279
                storeAccountKey = false
1✔
280
                if !pointer.BoolDeref(createPrivateEndpoint, false) {
2✔
281
                        // set VirtualNetworkResourceIDs for storage account firewall setting
1✔
282
                        var err error
1✔
283
                        if vnetResourceIDs, err = d.updateSubnetServiceEndpoints(ctx, vnetResourceGroup, vnetName, subnetName); err != nil {
2✔
284
                                return nil, status.Errorf(codes.Internal, "update service endpoints failed with error: %v", err)
1✔
285
                        }
1✔
286
                }
287
        }
288

289
        if strings.HasPrefix(strings.ToLower(storageAccountType), "premium") {
11✔
290
                accountKind = string(armstorage.KindBlockBlobStorage)
1✔
291
        }
1✔
292
        if IsAzureStackCloud(d.cloud) {
11✔
293
                accountKind = string(armstorage.KindStorage)
1✔
294
                if storageAccountType != "" && storageAccountType != string(armstorage.SKUNameStandardLRS) && storageAccountType != string(armstorage.SKUNamePremiumLRS) {
2✔
295
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("Invalid skuName value: %s, as Azure Stack only supports %s and %s Storage Account types.", storageAccountType, storage.SkuNamePremiumLRS, storage.SkuNameStandardLRS))
1✔
296
                }
1✔
297
        }
298

299
        tags, err := util.ConvertTagsToMap(customTags, tagValueDelimiter)
9✔
300
        if err != nil {
10✔
301
                return nil, status.Errorf(codes.InvalidArgument, err.Error())
1✔
302
        }
1✔
303

304
        if strings.TrimSpace(storageEndpointSuffix) == "" {
16✔
305
                storageEndpointSuffix = d.getStorageEndPointSuffix()
8✔
306
        }
8✔
307

308
        if storeAccountKey && !pointer.BoolDeref(allowSharedKeyAccess, true) {
9✔
309
                return nil, status.Errorf(codes.InvalidArgument, "storeAccountKey is not supported for account with shared access key disabled")
1✔
310
        }
1✔
311

312
        accountOptions := &azure.AccountOptions{
7✔
313
                Name:                            account,
7✔
314
                Type:                            storageAccountType,
7✔
315
                Kind:                            accountKind,
7✔
316
                SubscriptionID:                  subsID,
7✔
317
                ResourceGroup:                   resourceGroup,
7✔
318
                Location:                        location,
7✔
319
                EnableHTTPSTrafficOnly:          enableHTTPSTrafficOnly,
7✔
320
                VirtualNetworkResourceIDs:       vnetResourceIDs,
7✔
321
                Tags:                            tags,
7✔
322
                MatchTags:                       matchTags,
7✔
323
                IsHnsEnabled:                    isHnsEnabled,
7✔
324
                EnableNfsV3:                     enableNfsV3,
7✔
325
                AllowBlobPublicAccess:           allowBlobPublicAccess,
7✔
326
                AllowSharedKeyAccess:            allowSharedKeyAccess,
7✔
327
                RequireInfrastructureEncryption: requireInfraEncryption,
7✔
328
                VNetResourceGroup:               vnetResourceGroup,
7✔
329
                VNetName:                        vnetName,
7✔
330
                SubnetName:                      subnetName,
7✔
331
                AccessTier:                      accessTier,
7✔
332
                CreatePrivateEndpoint:           createPrivateEndpoint,
7✔
333
                StorageType:                     provider.StorageTypeBlob,
7✔
334
                StorageEndpointSuffix:           storageEndpointSuffix,
7✔
335
                EnableBlobVersioning:            enableBlobVersioning,
7✔
336
                SoftDeleteBlobs:                 softDeleteBlobs,
7✔
337
                SoftDeleteContainers:            softDeleteContainers,
7✔
338
                GetLatestAccountKey:             getLatestAccountKey,
7✔
339
        }
7✔
340

7✔
341
        containerName = replaceWithMap(containerName, containerNameReplaceMap)
7✔
342
        validContainerName := containerName
7✔
343
        if validContainerName == "" {
8✔
344
                validContainerName = volName
1✔
345
                if containerNamePrefix != "" {
1✔
346
                        validContainerName = containerNamePrefix + "-" + volName
×
347
                }
×
348
                validContainerName = getValidContainerName(validContainerName, protocol)
1✔
349
                setKeyValueInMap(parameters, containerNameField, validContainerName)
1✔
350
        }
351

352
        if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
7✔
353
                // logging the job status if it's volume cloning
×
354
                if volContentSource != nil {
×
355
                        jobState, percent, err := d.azcopy.GetAzcopyJob(validContainerName, []string{})
×
356
                        return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err)
×
357
                }
×
358
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
×
359
        }
360
        defer d.volumeLocks.Release(volName)
7✔
361

7✔
362
        requestName := "controller_create_volume"
7✔
363
        if volContentSource != nil {
9✔
364
                switch volContentSource.Type.(type) {
2✔
365
                case *csi.VolumeContentSource_Snapshot:
1✔
366
                        return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
1✔
367
                case *csi.VolumeContentSource_Volume:
1✔
368
                        requestName = "controller_create_volume_from_volume"
1✔
369
                }
370
        }
371

372
        var volumeID string
6✔
373
        mc := metrics.NewMetricContext(blobCSIDriverName, requestName, d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
6✔
374
        isOperationSucceeded := false
6✔
375
        defer func() {
12✔
376
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
6✔
377
        }()
6✔
378

379
        var accountKey string
6✔
380
        accountName := account
6✔
381
        if len(secrets) == 0 && accountName == "" {
7✔
382
                if v, ok := d.volMap.Load(volName); ok {
1✔
383
                        accountName = v.(string)
×
384
                } else {
1✔
385
                        lockKey := fmt.Sprintf("%s%s%s%s%s%v", storageAccountType, accountKind, resourceGroup, location, protocol, pointer.BoolDeref(createPrivateEndpoint, false))
1✔
386
                        // search in cache first
1✔
387
                        cache, err := d.accountSearchCache.Get(lockKey, azcache.CacheReadTypeDefault)
1✔
388
                        if err != nil {
1✔
389
                                return nil, status.Errorf(codes.Internal, err.Error())
×
390
                        }
×
391
                        if cache != nil {
1✔
392
                                accountName = cache.(string)
×
393
                        } else {
1✔
394
                                d.volLockMap.LockEntry(lockKey)
1✔
395
                                err = wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
2✔
396
                                        var retErr error
1✔
397
                                        accountName, accountKey, retErr = d.cloud.EnsureStorageAccount(ctx, accountOptions, protocol)
1✔
398
                                        if isRetriableError(retErr) {
1✔
399
                                                klog.Warningf("EnsureStorageAccount(%s) failed with error(%v), waiting for retrying", account, retErr)
×
400
                                                return false, nil
×
401
                                        }
×
402
                                        return true, retErr
1✔
403
                                })
404
                                d.volLockMap.UnlockEntry(lockKey)
1✔
405
                                if err != nil {
2✔
406
                                        return nil, status.Errorf(codes.Internal, "ensure storage account failed with %v", err)
1✔
407
                                }
1✔
408
                                d.accountSearchCache.Set(lockKey, accountName)
×
409
                                d.volMap.Store(volName, accountName)
×
410
                        }
411
                }
412
        }
413

414
        if pointer.BoolDeref(createPrivateEndpoint, false) && isNFSProtocol(protocol) {
5✔
415
                // As for blobfuse/blobfuse2, serverName, i.e.,AZURE_STORAGE_BLOB_ENDPOINT env variable can't include
×
416
                // "privatelink", issue: https://github.com/Azure/azure-storage-fuse/issues/1014
×
417
                //
×
418
                // And use public endpoint will be befine to blobfuse/blobfuse2, because it will be resolved to private endpoint
×
419
                // by private dns zone, which includes CNAME record, documented here:
×
420
                // https://learn.microsoft.com/en-us/azure/storage/common/storage-private-endpoints?toc=%2Fazure%2Fstorage%2Fblobs%2Ftoc.json&bc=%2Fazure%2Fstorage%2Fblobs%2Fbreadcrumb%2Ftoc.json#dns-changes-for-private-endpoints
×
421
                setKeyValueInMap(parameters, serverNameField, fmt.Sprintf("%s.privatelink.blob.%s", accountName, storageEndpointSuffix))
×
422
        }
×
423

424
        accountOptions.Name = accountName
5✔
425
        if len(secrets) == 0 && useDataPlaneAPI {
6✔
426
                if accountKey == "" {
2✔
427
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
2✔
428
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
429
                        }
1✔
430
                }
431
                secrets = createStorageAccountSecret(accountName, accountKey)
×
432
        }
433

434
        klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
4✔
435
        if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
5✔
436
                return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
1✔
437
        }
1✔
438
        if volContentSource != nil {
4✔
439
                accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace)
1✔
440
                if err != nil {
1✔
441
                        return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
×
442
                }
×
443
                if err := d.copyVolume(ctx, req, accountName, accountSASToken, authAzcopyEnv, validContainerName, secretNamespace, accountOptions, storageEndpointSuffix); err != nil {
2✔
444
                        return nil, err
1✔
445
                }
1✔
446
        }
447

448
        if storeAccountKey && len(req.GetSecrets()) == 0 {
4✔
449
                if accountKey == "" {
4✔
450
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
3✔
451
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
452
                        }
1✔
453
                }
454

455
                secretName, err := setAzureCredentials(ctx, d.KubeClient, accountName, accountKey, secretNamespace)
1✔
456
                if err != nil {
1✔
457
                        return nil, status.Errorf(codes.Internal, "failed to store storage account key: %v", err)
×
458
                }
×
459
                if secretName != "" {
1✔
460
                        klog.V(2).Infof("store account key to k8s secret(%v) in %s namespace", secretName, secretNamespace)
×
461
                }
×
462
        }
463

464
        var uuid string
1✔
465
        if containerName != "" {
2✔
466
                // add volume name as suffix to differentiate volumeID since "containerName" is specified
1✔
467
                // not necessary for dynamic container name creation since volumeID already contains volume name
1✔
468
                uuid = volName
1✔
469
        }
1✔
470
        volumeID = fmt.Sprintf(volumeIDTemplate, resourceGroup, accountName, validContainerName, uuid, secretNamespace, subsID)
1✔
471
        klog.V(2).Infof("create container %s on storage account %s successfully", validContainerName, accountName)
1✔
472

1✔
473
        if useDataPlaneAPI {
1✔
474
                d.dataPlaneAPIVolCache.Set(volumeID, "")
×
475
                d.dataPlaneAPIVolCache.Set(accountName, "")
×
476
        }
×
477

478
        isOperationSucceeded = true
1✔
479
        // reset secretNamespace field in VolumeContext
1✔
480
        setKeyValueInMap(parameters, secretNamespaceField, secretNamespace)
1✔
481
        return &csi.CreateVolumeResponse{
1✔
482
                Volume: &csi.Volume{
1✔
483
                        VolumeId:      volumeID,
1✔
484
                        CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
1✔
485
                        VolumeContext: parameters,
1✔
486
                        ContentSource: volContentSource,
1✔
487
                },
1✔
488
        }, nil
1✔
489
}
490

491
// DeleteVolume delete a volume
492
func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
5✔
493
        volumeID := req.GetVolumeId()
5✔
494
        if len(volumeID) == 0 {
6✔
495
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
496
        }
1✔
497

498
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
4✔
499
                return nil, status.Errorf(codes.Internal, "invalid delete volume req: %v", req)
×
500
        }
×
501

502
        if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
4✔
503
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
×
504
        }
×
505
        defer d.volumeLocks.Release(volumeID)
4✔
506

4✔
507
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
4✔
508
        if err != nil {
5✔
509
                // According to CSI Driver Sanity Tester, should succeed when an invalid volume id is used
1✔
510
                klog.Errorf("GetContainerInfo(%s) in DeleteVolume failed with error: %v", volumeID, err)
1✔
511
                return &csi.DeleteVolumeResponse{}, nil
1✔
512
        }
1✔
513

514
        secrets := req.GetSecrets()
3✔
515
        if len(secrets) == 0 && d.useDataPlaneAPI(volumeID, accountName) {
4✔
516
                _, accountName, accountKey, _, _, err := d.GetAuthEnv(ctx, volumeID, "", nil, secrets)
1✔
517
                if err != nil {
2✔
518
                        return nil, status.Errorf(codes.Internal, "GetAuthEnv(%s) failed with %v", volumeID, err)
1✔
519
                }
1✔
520
                if accountName != "" && accountKey != "" {
×
521
                        secrets = createStorageAccountSecret(accountName, accountKey)
×
522
                }
×
523
        }
524

525
        mc := metrics.NewMetricContext(blobCSIDriverName, "controller_delete_volume", d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
2✔
526
        isOperationSucceeded := false
2✔
527
        defer func() {
4✔
528
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
2✔
529
        }()
2✔
530

531
        if resourceGroupName == "" {
3✔
532
                resourceGroupName = d.cloud.ResourceGroup
1✔
533
        }
1✔
534
        klog.V(2).Infof("deleting container(%s) rg(%s) account(%s) volumeID(%s)", containerName, resourceGroupName, accountName, volumeID)
2✔
535
        if err := d.DeleteBlobContainer(ctx, subsID, resourceGroupName, accountName, containerName, secrets); err != nil {
4✔
536
                return nil, status.Errorf(codes.Internal, "failed to delete container(%s) under rg(%s) account(%s) volumeID(%s), error: %v", containerName, resourceGroupName, accountName, volumeID, err)
2✔
537
        }
2✔
538

539
        isOperationSucceeded = true
×
540
        klog.V(2).Infof("container(%s) under rg(%s) account(%s) volumeID(%s) is deleted successfully", containerName, resourceGroupName, accountName, volumeID)
×
541
        return &csi.DeleteVolumeResponse{}, nil
×
542
}
543

544
// ValidateVolumeCapabilities return the capabilities of the volume
545
func (d *Driver) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
8✔
546
        volumeID := req.GetVolumeId()
8✔
547
        if len(volumeID) == 0 {
9✔
548
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
549
        }
1✔
550
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
9✔
551
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
552
        }
2✔
553

554
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
5✔
555
        if err != nil {
6✔
556
                klog.Errorf("GetContainerInfo(%s) in ValidateVolumeCapabilities failed with error: %v", volumeID, err)
1✔
557
                return nil, status.Error(codes.NotFound, err.Error())
1✔
558
        }
1✔
559

560
        var exist bool
4✔
561
        secrets := req.GetSecrets()
4✔
562
        if len(secrets) > 0 {
5✔
563
                container, err := getContainerReference(containerName, secrets, d.getCloudEnvironment())
1✔
564
                if err != nil {
2✔
565
                        return nil, status.Error(codes.Internal, err.Error())
1✔
566
                }
1✔
567
                exist, err = container.Exists()
×
568
                if err != nil {
×
569
                        return nil, status.Error(codes.Internal, err.Error())
×
570
                }
×
571
        } else {
3✔
572
                if resourceGroupName == "" {
3✔
573
                        resourceGroupName = d.cloud.ResourceGroup
×
574
                }
×
575
                blobClient, err := d.clientFactory.GetBlobContainerClientForSub(subsID)
3✔
576
                if err != nil {
3✔
577
                        return nil, status.Error(codes.Internal, err.Error())
×
578
                }
×
579

580
                blobContainer, err := blobClient.Get(ctx, resourceGroupName, accountName, containerName)
3✔
581
                if err != nil {
4✔
582
                        return nil, status.Error(codes.Internal, err.Error())
1✔
583
                }
1✔
584
                if blobContainer.ContainerProperties == nil {
3✔
585
                        return nil, status.Errorf(codes.Internal, "ContainerProperties of volume(%s) is nil", volumeID)
1✔
586
                }
1✔
587
                exist = blobContainer.ContainerProperties.Deleted != nil && !*blobContainer.ContainerProperties.Deleted
1✔
588
        }
589
        if !exist {
2✔
590
                return nil, status.Errorf(codes.NotFound, "requested volume(%s) does not exist", volumeID)
1✔
591
        }
1✔
592
        klog.V(2).Infof("ValidateVolumeCapabilities on volume(%s) succeeded", volumeID)
×
593

×
594
        // blob driver supports all AccessModes, no need to check capabilities here
×
595
        return &csi.ValidateVolumeCapabilitiesResponse{
×
596
                Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
×
597
                        VolumeCapabilities: req.GetVolumeCapabilities(),
×
598
                },
×
599
                Message: "",
×
600
        }, nil
×
601
}
602

603
// ControllerModifyVolume modify volume
604
func (d *Driver) ControllerModifyVolume(_ context.Context, _ *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) {
×
605
        return nil, status.Error(codes.Unimplemented, "")
×
606
}
×
607

608
func (d *Driver) ControllerPublishVolume(_ context.Context, _ *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
1✔
609
        return nil, status.Error(codes.Unimplemented, "ControllerPublishVolume is not yet implemented")
1✔
610
}
1✔
611

612
func (d *Driver) ControllerUnpublishVolume(_ context.Context, _ *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
1✔
613
        return nil, status.Error(codes.Unimplemented, "ControllerUnpublishVolume is not yet implemented")
1✔
614
}
1✔
615

616
// ControllerGetVolume get volume
617
func (d *Driver) ControllerGetVolume(context.Context, *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
1✔
618
        return nil, status.Error(codes.Unimplemented, "ControllerGetVolume is not yet implemented")
1✔
619
}
1✔
620

621
// GetCapacity returns the capacity of the total available storage pool
622
func (d *Driver) GetCapacity(_ context.Context, _ *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
1✔
623
        return nil, status.Error(codes.Unimplemented, "GetCapacity is not yet implemented")
1✔
624
}
1✔
625

626
// ListVolumes return all available volumes
627
func (d *Driver) ListVolumes(_ context.Context, _ *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
1✔
628
        return nil, status.Error(codes.Unimplemented, "ListVolumes is not yet implemented")
1✔
629
}
1✔
630

631
// CreateSnapshot create snapshot
632
func (d *Driver) CreateSnapshot(_ context.Context, _ *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
1✔
633
        return nil, status.Error(codes.Unimplemented, "CreateSnapshot is not yet implemented")
1✔
634
}
1✔
635

636
// DeleteSnapshot delete snapshot
637
func (d *Driver) DeleteSnapshot(_ context.Context, _ *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
1✔
638
        return nil, status.Error(codes.Unimplemented, "DeleteSnapshot is not yet implemented")
1✔
639
}
1✔
640

641
// ListSnapshots list snapshots
642
func (d *Driver) ListSnapshots(_ context.Context, _ *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
1✔
643
        return nil, status.Error(codes.Unimplemented, "ListSnapshots is not yet implemented")
1✔
644
}
1✔
645

646
// ControllerGetCapabilities returns the capabilities of the Controller plugin
647
func (d *Driver) ControllerGetCapabilities(_ context.Context, _ *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
1✔
648
        return &csi.ControllerGetCapabilitiesResponse{
1✔
649
                Capabilities: d.Cap,
1✔
650
        }, nil
1✔
651
}
1✔
652

653
// ControllerExpandVolume controller expand volume
654
func (d *Driver) ControllerExpandVolume(_ context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
4✔
655
        if len(req.GetVolumeId()) == 0 {
5✔
656
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
657
        }
1✔
658

659
        if req.GetCapacityRange() == nil {
4✔
660
                return nil, status.Error(codes.InvalidArgument, "Capacity Range missing in request")
1✔
661
        }
1✔
662

663
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_EXPAND_VOLUME); err != nil {
2✔
664
                return nil, status.Errorf(codes.Internal, "invalid expand volume req: %v", req)
×
665
        }
×
666

667
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
2✔
668
        requestGiB := int64(util.RoundUpGiB(volSizeBytes))
2✔
669

2✔
670
        if volSizeBytes > containerMaxSize {
3✔
671
                return nil, status.Errorf(codes.OutOfRange, "required bytes (%d) exceeds the maximum supported bytes (%d)", volSizeBytes, containerMaxSize)
1✔
672
        }
1✔
673

674
        klog.V(2).Infof("ControllerExpandVolume(%s) successfully, currentQuota: %d Gi", req.VolumeId, requestGiB)
1✔
675

1✔
676
        return &csi.ControllerExpandVolumeResponse{CapacityBytes: req.GetCapacityRange().GetRequiredBytes()}, nil
1✔
677
}
678

679
// CreateBlobContainer creates a blob container
680
func (d *Driver) CreateBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
10✔
681
        if containerName == "" {
11✔
682
                return fmt.Errorf("containerName is empty")
1✔
683
        }
1✔
684
        return wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
18✔
685
                var err error
9✔
686
                if len(secrets) > 0 {
10✔
687
                        container, getErr := getContainerReference(containerName, secrets, d.getCloudEnvironment())
1✔
688
                        if getErr != nil {
2✔
689
                                return true, getErr
1✔
690
                        }
1✔
691
                        container.Metadata = map[string]string{createdByMetadata: d.Name}
×
692
                        _, err = container.CreateIfNotExists(&azstorage.CreateContainerOptions{Access: azstorage.ContainerAccessTypePrivate})
×
693
                } else {
8✔
694
                        blobContainer := armstorage.BlobContainer{
8✔
695
                                ContainerProperties: &armstorage.ContainerProperties{
8✔
696
                                        PublicAccess: to.Ptr(armstorage.PublicAccessNone),
8✔
697
                                        Metadata:     map[string]*string{createdByMetadata: to.Ptr(d.Name)},
8✔
698
                                },
8✔
699
                        }
8✔
700
                        var blobClient blobcontainerclient.Interface
8✔
701
                        blobClient, err = d.clientFactory.GetBlobContainerClientForSub(subsID)
8✔
702
                        if err != nil {
8✔
703
                                return true, err
×
704
                        }
×
705
                        _, err = blobClient.CreateContainer(ctx, resourceGroupName, accountName, containerName, blobContainer)
8✔
706
                }
707
                if err != nil {
12✔
708
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
4✔
709
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) {
6✔
710
                                klog.Warningf("CreateContainer(%s, %s, %s) failed with error(%v), retry", resourceGroupName, accountName, containerName, err)
2✔
711
                                return false, nil
2✔
712
                        }
2✔
713
                }
714
                return true, err
6✔
715
        })
716
}
717

718
// DeleteBlobContainer deletes a blob container
719
func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
8✔
720
        if containerName == "" {
9✔
721
                return fmt.Errorf("containerName is empty")
1✔
722
        }
1✔
723
        return wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
14✔
724
                var err error
7✔
725
                if len(secrets) > 0 {
10✔
726
                        container, getErr := getContainerReference(containerName, secrets, d.getCloudEnvironment())
3✔
727
                        if getErr != nil {
6✔
728
                                return true, getErr
3✔
729
                        }
3✔
730
                        _, err = container.DeleteIfExists(nil)
×
731
                } else {
4✔
732
                        var blobClient blobcontainerclient.Interface
4✔
733
                        blobClient, err = d.clientFactory.GetBlobContainerClientForSub(subsID)
4✔
734
                        if err != nil {
4✔
735
                                return true, err
×
736
                        }
×
737
                        err = blobClient.DeleteContainer(ctx, resourceGroupName, accountName, containerName)
4✔
738
                }
739
                if err != nil {
7✔
740
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
3✔
741
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) ||
3✔
742
                                strings.Contains(err.Error(), statusCodeNotFound) ||
3✔
743
                                strings.Contains(err.Error(), httpCodeNotFound) {
5✔
744
                                klog.Warningf("delete container(%s) on account(%s) failed with error(%v), return as success", containerName, accountName, err)
2✔
745
                                return true, nil
2✔
746
                        }
2✔
747
                        return false, fmt.Errorf("failed to delete container(%s) on account(%s), error: %w", containerName, accountName, err)
1✔
748
                }
749
                return true, err
1✔
750
        })
751
}
752

753
// copyBlobContainer copies source volume content into a destination volume
754
func (d *Driver) copyBlobContainer(ctx context.Context, req *csi.CreateVolumeRequest, dstAccountName string, dstAccountSasToken string, authAzcopyEnv []string, dstContainerName string, secretNamespace string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
7✔
755
        var sourceVolumeID string
7✔
756
        if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
14✔
757
                sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
7✔
758

7✔
759
        }
7✔
760
        srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
7✔
761
        if err != nil {
9✔
762
                return status.Error(codes.NotFound, err.Error())
2✔
763
        }
2✔
764
        if dstAccountName == "" {
9✔
765
                dstAccountName = srcAccountName
4✔
766
        }
4✔
767
        if srcAccountName == "" || srcContainerName == "" || dstContainerName == "" {
7✔
768
                return fmt.Errorf("srcAccountName(%s) or srcContainerName(%s) or dstContainerName(%s) is empty", srcAccountName, srcContainerName, dstContainerName)
2✔
769
        }
2✔
770
        srcAccountSasToken := dstAccountSasToken
3✔
771
        if srcAccountName != dstAccountName && dstAccountSasToken != "" {
4✔
772
                srcAccountOptions := &azure.AccountOptions{
1✔
773
                        Name:                srcAccountName,
1✔
774
                        ResourceGroup:       srcResourceGroupName,
1✔
775
                        SubscriptionID:      srcSubscriptionID,
1✔
776
                        GetLatestAccountKey: accountOptions.GetLatestAccountKey,
1✔
777
                }
1✔
778
                if srcAccountSasToken, _, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, nil, "", secretNamespace); err != nil {
2✔
779
                        return err
1✔
780
                }
1✔
781
        }
782
        srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", srcAccountName, storageEndpointSuffix, srcContainerName, srcAccountSasToken)
2✔
783
        dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", dstAccountName, storageEndpointSuffix, dstContainerName, dstAccountSasToken)
2✔
784

2✔
785
        jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
2✔
786
        klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
2✔
787
        switch jobState {
2✔
788
        case util.AzcopyJobError, util.AzcopyJobCompleted:
1✔
789
                return err
1✔
790
        case util.AzcopyJobRunning:
1✔
791
                return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
1✔
792
        case util.AzcopyJobNotFound:
×
793
                klog.V(2).Infof("copy blob container %s:%s to %s:%s", srcAccountName, srcContainerName, dstAccountName, dstContainerName)
×
794
                execFunc := func() error {
×
795
                        if out, err := d.execAzcopyCopy(srcPath, dstPath, azcopyCloneVolumeOptions, authAzcopyEnv); err != nil {
×
796
                                return fmt.Errorf("exec error: %v, output: %v", err, string(out))
×
797
                        }
×
798
                        return nil
×
799
                }
800
                timeoutFunc := func() error {
×
801
                        _, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
×
802
                        return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcContainerName, dstContainerName, percent)
×
803
                }
×
804
                copyErr := util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
×
805
                if copyErr != nil {
×
806
                        klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstContainerName, copyErr)
×
807
                } else {
×
808
                        klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
×
809
                }
×
810
                return copyErr
×
811
        }
812
        return err
×
813
}
814

815
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
816
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountName string, accountSASToken string, authAzcopyEnv []string, dstContainerName, secretNamespace string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
8✔
817
        vs := req.VolumeContentSource
8✔
818
        switch vs.Type.(type) {
8✔
819
        case *csi.VolumeContentSource_Snapshot:
1✔
820
                return status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
1✔
821
        case *csi.VolumeContentSource_Volume:
7✔
822
                return d.copyBlobContainer(ctx, req, accountName, accountSASToken, authAzcopyEnv, dstContainerName, secretNamespace, accountOptions, storageEndpointSuffix)
7✔
823
        default:
×
824
                return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
×
825
        }
826
}
827

828
// execAzcopyCopy exec azcopy copy command
829
func (d *Driver) execAzcopyCopy(srcPath, dstPath string, azcopyCopyOptions, authAzcopyEnv []string) ([]byte, error) {
×
830
        cmd := exec.Command("azcopy", "copy", srcPath, dstPath)
×
831
        cmd.Args = append(cmd.Args, azcopyCopyOptions...)
×
832
        if len(authAzcopyEnv) > 0 {
×
833
                cmd.Env = append(os.Environ(), authAzcopyEnv...)
×
834
        }
×
835
        return cmd.CombinedOutput()
×
836
}
837

838
// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
839
func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
7✔
840
        azureAuthConfig := d.cloud.Config.AzureAuthConfig
7✔
841
        var authAzcopyEnv []string
7✔
842
        if azureAuthConfig.UseManagedIdentityExtension {
10✔
843
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, MSI))
3✔
844
                if len(azureAuthConfig.UserAssignedIdentityID) > 0 {
4✔
845
                        klog.V(2).Infof("use user assigned managed identity to authorize azcopy")
1✔
846
                        authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyMSIClientID, azureAuthConfig.UserAssignedIdentityID))
1✔
847
                } else {
3✔
848
                        klog.V(2).Infof("use system-assigned managed identity to authorize azcopy")
2✔
849
                }
2✔
850
                return authAzcopyEnv, nil
3✔
851
        }
852
        if len(azureAuthConfig.AADClientSecret) > 0 {
6✔
853
                klog.V(2).Infof("use service principal to authorize azcopy")
2✔
854
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, SPN))
2✔
855
                if azureAuthConfig.AADClientID == "" || azureAuthConfig.TenantID == "" {
3✔
856
                        return []string{}, fmt.Errorf("AADClientID and TenantID must be set when use service principal")
1✔
857
                }
1✔
858
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAApplicationID, azureAuthConfig.AADClientID))
1✔
859
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAClientSecret, azureAuthConfig.AADClientSecret))
1✔
860
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyTenantID, azureAuthConfig.TenantID))
1✔
861
                klog.V(2).Infof(fmt.Sprintf("set AZCOPY_SPA_APPLICATION_ID=%s, AZCOPY_TENANT_ID=%s successfully", azureAuthConfig.AADClientID, azureAuthConfig.TenantID))
1✔
862

1✔
863
                return authAzcopyEnv, nil
1✔
864
        }
865
        return []string{}, fmt.Errorf("service principle or managed identity are both not set")
2✔
866
}
867

868
// getAzcopyAuth will only generate sas token for azcopy in following conditions:
869
// 1. secrets is not empty
870
// 2. driver is not using managed identity and service principal
871
// 3. azcopy returns AuthorizationPermissionMismatch error when using service principal or managed identity
872
func (d *Driver) getAzcopyAuth(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, accountOptions *azure.AccountOptions, secrets map[string]string, secretName, secretNamespace string) (string, []string, error) {
6✔
873
        var authAzcopyEnv []string
6✔
874
        var err error
6✔
875
        useSasToken := false
6✔
876
        if !d.useDataPlaneAPI("", accountName) && len(secrets) == 0 && len(secretName) == 0 {
8✔
877
                // search in cache first
2✔
878
                if cache, err := d.azcopySasTokenCache.Get(accountName, azcache.CacheReadTypeDefault); err == nil && cache != nil {
2✔
879
                        klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
880
                        return cache.(string), nil, nil
×
881
                }
×
882

883
                authAzcopyEnv, err = d.authorizeAzcopyWithIdentity()
2✔
884
                if err != nil {
3✔
885
                        klog.Warningf("failed to authorize azcopy with identity, error: %v", err)
1✔
886
                } else {
2✔
887
                        if len(authAzcopyEnv) > 0 {
2✔
888
                                out, testErr := d.azcopy.TestListJobs(accountName, storageEndpointSuffix, authAzcopyEnv)
1✔
889
                                if testErr != nil {
1✔
890
                                        return "", nil, fmt.Errorf("azcopy list command failed with error(%v): %v", testErr, out)
×
891
                                }
×
892
                                if strings.Contains(out, authorizationPermissionMismatch) {
1✔
893
                                        klog.Warningf("azcopy list failed with AuthorizationPermissionMismatch error, should assign \"Storage Blob Data Contributor\" role to controller identity, fall back to use sas token, original output: %v", out)
×
894
                                        useSasToken = true
×
895
                                }
×
896
                        }
897
                }
898
        }
899

900
        if len(secrets) > 0 || len(secretName) > 0 || len(authAzcopyEnv) == 0 || useSasToken {
11✔
901
                if accountKey == "" {
10✔
902
                        if _, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
7✔
903
                                return "", nil, err
2✔
904
                        }
2✔
905
                }
906
                klog.V(2).Infof("generate sas token for account(%s)", accountName)
3✔
907
                sasToken, err := d.generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
3✔
908
                return sasToken, nil, err
3✔
909
        }
910
        return "", authAzcopyEnv, nil
1✔
911
}
912

913
// isValidVolumeCapabilities validates the given VolumeCapability array is valid
914
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
30✔
915
        if len(volCaps) == 0 {
32✔
916
                return fmt.Errorf("volume capabilities missing in request")
2✔
917
        }
2✔
918
        for _, c := range volCaps {
56✔
919
                if c.GetBlock() != nil {
30✔
920
                        return fmt.Errorf("block volume capability not supported")
2✔
921
                }
2✔
922
        }
923
        return nil
26✔
924
}
925

926
func parseDays(dayStr string) (int32, error) {
3✔
927
        days, err := strconv.Atoi(dayStr)
3✔
928
        if err != nil {
4✔
929
                return 0, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid %s:%s in storage class", softDeleteBlobsField, dayStr))
1✔
930
        }
1✔
931
        if days <= 0 || days > 365 {
3✔
932
                return 0, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid %s:%s in storage class, should be in range [1, 365]", softDeleteBlobsField, dayStr))
1✔
933
        }
1✔
934

935
        return int32(days), nil
1✔
936
}
937

938
// generateSASToken generate a sas token for storage account
939
func (d *Driver) generateSASToken(accountName, accountKey, storageEndpointSuffix string, expiryTime int) (string, error) {
5✔
940
        // search in cache first
5✔
941
        cache, err := d.azcopySasTokenCache.Get(accountName, azcache.CacheReadTypeDefault)
5✔
942
        if err != nil {
5✔
943
                return "", fmt.Errorf("get(%s) from azcopySasTokenCache failed with error: %v", accountName, err)
×
944
        }
×
945
        if cache != nil {
5✔
946
                klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
947
                return cache.(string), nil
×
948
        }
×
949

950
        credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
5✔
951
        if err != nil {
8✔
952
                return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new shared key credential, accountName: %s, err: %s", accountName, err.Error()))
3✔
953
        }
3✔
954
        clientOptions := service.ClientOptions{}
2✔
955
        clientOptions.InsecureAllowCredentialWithHTTP = true
2✔
956
        serviceClient, err := service.NewClientWithSharedKeyCredential(fmt.Sprintf("https://%s.blob.%s/", accountName, storageEndpointSuffix), credential, &clientOptions)
2✔
957
        if err != nil {
2✔
958
                return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new client with shared key credential, accountName: %s, err: %s", accountName, err.Error()))
×
959
        }
×
960
        sasURL, err := serviceClient.GetSASURL(
2✔
961
                sas.AccountResourceTypes{Object: true, Service: false, Container: true},
2✔
962
                sas.AccountPermissions{Read: true, List: true, Write: true},
2✔
963
                time.Now().Add(time.Duration(expiryTime)*time.Minute),
2✔
964
                &service.GetSASURLOptions{StartTime: to.Ptr(time.Now())},
2✔
965
        )
2✔
966
        if err != nil {
2✔
967
                return "", err
×
968
        }
×
969
        u, err := url.Parse(sasURL)
2✔
970
        if err != nil {
2✔
971
                return "", err
×
972
        }
×
973
        sasToken := "?" + u.RawQuery
2✔
974
        d.azcopySasTokenCache.Set(accountName, sasToken)
2✔
975
        return sasToken, nil
2✔
976
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc