• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 9067400353

13 May 2024 05:42PM UTC coverage: 74.326%. Remained the same
9067400353

Pull #1392

github

web-flow
chore(deps): bump build-image/debian-base in /pkg/blobplugin

Bumps build-image/debian-base from bookworm-v1.0.2 to bookworm-v1.0.3.

---
updated-dependencies:
- dependency-name: build-image/debian-base
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #1392: chore(deps): bump build-image/debian-base from bookworm-v1.0.2 to bookworm-v1.0.3 in /pkg/blobplugin

2206 of 2968 relevant lines covered (74.33%)

7.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.68
/pkg/blob/controllerserver.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package blob
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "os"
24
        "os/exec"
25
        "strconv"
26
        "strings"
27
        "time"
28

29
        "google.golang.org/grpc/codes"
30
        "google.golang.org/grpc/status"
31

32
        "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
33
        "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage"
34
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
35
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
36
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
37
        "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage"
38
        azstorage "github.com/Azure/azure-sdk-for-go/storage"
39
        "github.com/container-storage-interface/spec/lib/go/csi"
40

41
        "k8s.io/apimachinery/pkg/util/wait"
42
        "k8s.io/klog/v2"
43
        "k8s.io/utils/pointer"
44

45
        "sigs.k8s.io/blob-csi-driver/pkg/util"
46
        "sigs.k8s.io/cloud-provider-azure/pkg/azclient/blobcontainerclient"
47
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
48
        "sigs.k8s.io/cloud-provider-azure/pkg/metrics"
49
        "sigs.k8s.io/cloud-provider-azure/pkg/provider"
50
        azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
51
)
52

53
const (
54
        privateEndpoint = "privateendpoint"
55

56
        azcopyAutoLoginType             = "AZCOPY_AUTO_LOGIN_TYPE"
57
        azcopySPAApplicationID          = "AZCOPY_SPA_APPLICATION_ID"
58
        azcopySPAClientSecret           = "AZCOPY_SPA_CLIENT_SECRET"
59
        azcopyTenantID                  = "AZCOPY_TENANT_ID"
60
        azcopyMSIClientID               = "AZCOPY_MSI_CLIENT_ID"
61
        MSI                             = "MSI"
62
        SPN                             = "SPN"
63
        authorizationPermissionMismatch = "AuthorizationPermissionMismatch"
64
)
65

66
// CreateVolume provisions a volume
67
func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
22✔
68
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
22✔
69
                klog.Errorf("invalid create volume req: %v", req)
×
70
                return nil, err
×
71
        }
×
72

73
        volName := req.GetName()
22✔
74
        if len(volName) == 0 {
23✔
75
                return nil, status.Error(codes.InvalidArgument, "CreateVolume Name must be provided")
1✔
76
        }
1✔
77

78
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
23✔
79
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
80
        }
2✔
81

82
        if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
19✔
83
                // logging the job status if it's volume cloning
×
84
                if req.GetVolumeContentSource() != nil {
×
85
                        jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{})
×
86
                        return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err)
×
87
                }
×
88
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
×
89
        }
90
        defer d.volumeLocks.Release(volName)
19✔
91

19✔
92
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
19✔
93
        requestGiB := int(util.RoundUpGiB(volSizeBytes))
19✔
94

19✔
95
        parameters := req.GetParameters()
19✔
96
        if parameters == nil {
20✔
97
                parameters = make(map[string]string)
1✔
98
        }
1✔
99
        var storageAccountType, subsID, resourceGroup, location, account, containerName, containerNamePrefix, protocol, customTags, secretName, secretNamespace, pvcNamespace string
19✔
100
        var isHnsEnabled, requireInfraEncryption, enableBlobVersioning, createPrivateEndpoint, enableNfsV3 *bool
19✔
101
        var vnetResourceGroup, vnetName, subnetName, accessTier, networkEndpointType, storageEndpointSuffix, fsGroupChangePolicy string
19✔
102
        var matchTags, useDataPlaneAPI, getLatestAccountKey bool
19✔
103
        var softDeleteBlobs, softDeleteContainers int32
19✔
104
        var vnetResourceIDs []string
19✔
105
        var err error
19✔
106
        // set allowBlobPublicAccess as false by default
19✔
107
        allowBlobPublicAccess := pointer.Bool(false)
19✔
108

19✔
109
        containerNameReplaceMap := map[string]string{}
19✔
110

19✔
111
        // store account key to k8s secret by default
19✔
112
        storeAccountKey := true
19✔
113

19✔
114
        // Apply ProvisionerParameters (case-insensitive). We leave validation of
19✔
115
        // the values to the cloud provider.
19✔
116
        for k, v := range parameters {
108✔
117
                switch strings.ToLower(k) {
89✔
118
                case skuNameField:
8✔
119
                        storageAccountType = v
8✔
120
                case storageAccountTypeField:
9✔
121
                        storageAccountType = v
9✔
122
                case locationField:
8✔
123
                        location = v
8✔
124
                case storageAccountField:
9✔
125
                        account = v
9✔
126
                case subscriptionIDField:
×
127
                        subsID = v
×
128
                case resourceGroupField:
8✔
129
                        resourceGroup = v
8✔
130
                case containerNameField:
9✔
131
                        containerName = v
9✔
132
                case containerNamePrefixField:
2✔
133
                        containerNamePrefix = v
2✔
134
                case protocolField:
9✔
135
                        protocol = v
9✔
136
                case tagsField:
1✔
137
                        customTags = v
1✔
138
                case matchTagsField:
1✔
139
                        matchTags = strings.EqualFold(v, trueValue)
1✔
140
                case secretNameField:
×
141
                        secretName = v
×
142
                case secretNamespaceField:
×
143
                        secretNamespace = v
×
144
                case isHnsEnabledField:
×
145
                        if strings.EqualFold(v, trueValue) {
×
146
                                isHnsEnabled = pointer.Bool(true)
×
147
                        }
×
148
                case softDeleteBlobsField:
×
149
                        days, err := parseDays(v)
×
150
                        if err != nil {
×
151
                                return nil, err
×
152
                        }
×
153
                        softDeleteBlobs = days
×
154
                case softDeleteContainersField:
×
155
                        days, err := parseDays(v)
×
156
                        if err != nil {
×
157
                                return nil, err
×
158
                        }
×
159
                        softDeleteContainers = days
×
160
                case enableBlobVersioningField:
×
161
                        enableBlobVersioning = pointer.Bool(strings.EqualFold(v, trueValue))
×
162
                case storeAccountKeyField:
3✔
163
                        if strings.EqualFold(v, falseValue) {
5✔
164
                                storeAccountKey = false
2✔
165
                        }
2✔
166
                case getLatestAccountKeyField:
1✔
167
                        if getLatestAccountKey, err = strconv.ParseBool(v); err != nil {
2✔
168
                                return nil, status.Errorf(codes.InvalidArgument, "invalid %s: %s in volume context", getLatestAccountKeyField, v)
1✔
169
                        }
1✔
170
                case allowBlobPublicAccessField:
×
171
                        if strings.EqualFold(v, trueValue) {
×
172
                                allowBlobPublicAccess = pointer.Bool(true)
×
173
                        }
×
174
                case requireInfraEncryptionField:
×
175
                        if strings.EqualFold(v, trueValue) {
×
176
                                requireInfraEncryption = pointer.Bool(true)
×
177
                        }
×
178
                case pvcNamespaceKey:
×
179
                        pvcNamespace = v
×
180
                        containerNameReplaceMap[pvcNamespaceMetadata] = v
×
181
                case pvcNameKey:
×
182
                        containerNameReplaceMap[pvcNameMetadata] = v
×
183
                case pvNameKey:
×
184
                        containerNameReplaceMap[pvNameMetadata] = v
×
185
                case serverNameField:
×
186
                case storageAuthTypeField:
1✔
187
                case storageIdentityClientIDField:
1✔
188
                case storageIdentityObjectIDField:
1✔
189
                case storageIdentityResourceIDField:
1✔
190
                case msiEndpointField:
1✔
191
                case storageAADEndpointField:
1✔
192
                        // no op, only used in NodeStageVolume
193
                case storageEndpointSuffixField:
×
194
                        storageEndpointSuffix = v
×
195
                case vnetResourceGroupField:
×
196
                        vnetResourceGroup = v
×
197
                case vnetNameField:
×
198
                        vnetName = v
×
199
                case subnetNameField:
1✔
200
                        subnetName = v
1✔
201
                case accessTierField:
×
202
                        accessTier = v
×
203
                case networkEndpointTypeField:
1✔
204
                        networkEndpointType = v
1✔
205
                case mountPermissionsField:
10✔
206
                        // only do validations here, used in NodeStageVolume, NodePublishVolume
10✔
207
                        if v != "" {
20✔
208
                                if _, err := strconv.ParseUint(v, 8, 32); err != nil {
11✔
209
                                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid mountPermissions %s in storage class", v))
1✔
210
                                }
1✔
211
                        }
212
                case useDataPlaneAPIField:
1✔
213
                        useDataPlaneAPI = strings.EqualFold(v, trueValue)
1✔
214
                case fsGroupChangePolicyField:
1✔
215
                        fsGroupChangePolicy = v
1✔
216
                default:
1✔
217
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid parameter %q in storage class", k))
1✔
218
                }
219
        }
220

221
        if pointer.BoolDeref(enableBlobVersioning, false) {
16✔
222
                if isNFSProtocol(protocol) || pointer.BoolDeref(isHnsEnabled, false) {
×
223
                        return nil, status.Errorf(codes.InvalidArgument, "enableBlobVersioning is not supported for NFS protocol or HNS enabled account")
×
224
                }
×
225
        }
226

227
        if !isSupportedFSGroupChangePolicy(fsGroupChangePolicy) {
17✔
228
                return nil, status.Errorf(codes.InvalidArgument, "fsGroupChangePolicy(%s) is not supported, supported fsGroupChangePolicy list: %v", fsGroupChangePolicy, supportedFSGroupChangePolicyList)
1✔
229
        }
1✔
230

231
        if matchTags && account != "" {
16✔
232
                return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("matchTags must set as false when storageAccount(%s) is provided", account))
1✔
233
        }
1✔
234

235
        if resourceGroup == "" {
20✔
236
                resourceGroup = d.cloud.ResourceGroup
6✔
237
        }
6✔
238

239
        if secretNamespace == "" {
28✔
240
                if pvcNamespace == "" {
28✔
241
                        secretNamespace = defaultNamespace
14✔
242
                } else {
14✔
243
                        secretNamespace = pvcNamespace
×
244
                }
×
245
        }
246

247
        if protocol == "" {
19✔
248
                protocol = Fuse
5✔
249
        }
5✔
250
        if !isSupportedProtocol(protocol) {
15✔
251
                return nil, status.Errorf(codes.InvalidArgument, "protocol(%s) is not supported, supported protocol list: %v", protocol, supportedProtocolList)
1✔
252
        }
1✔
253
        if !isSupportedAccessTier(accessTier) {
13✔
254
                return nil, status.Errorf(codes.InvalidArgument, "accessTier(%s) is not supported, supported AccessTier list: %v", accessTier, armstorage.PossibleAccessTierValues())
×
255
        }
×
256

257
        if containerName != "" && containerNamePrefix != "" {
14✔
258
                return nil, status.Errorf(codes.InvalidArgument, "containerName(%s) and containerNamePrefix(%s) could not be specified together", containerName, containerNamePrefix)
1✔
259
        }
1✔
260
        if !isSupportedContainerNamePrefix(containerNamePrefix) {
13✔
261
                return nil, status.Errorf(codes.InvalidArgument, "containerNamePrefix(%s) can only contain lowercase letters, numbers, hyphens, and length should be less than 21", containerNamePrefix)
1✔
262
        }
1✔
263

264
        enableHTTPSTrafficOnly := true
11✔
265
        if strings.EqualFold(networkEndpointType, privateEndpoint) {
12✔
266
                if strings.Contains(subnetName, ",") {
2✔
267
                        return nil, status.Errorf(codes.InvalidArgument, "subnetName(%s) can only contain one subnet for private endpoint", subnetName)
1✔
268
                }
1✔
269
                createPrivateEndpoint = pointer.BoolPtr(true)
×
270
        }
271
        accountKind := string(armstorage.KindStorageV2)
10✔
272
        if isNFSProtocol(protocol) {
11✔
273
                isHnsEnabled = pointer.Bool(true)
1✔
274
                enableNfsV3 = pointer.Bool(true)
1✔
275
                // NFS protocol does not need account key
1✔
276
                storeAccountKey = false
1✔
277
                if !pointer.BoolDeref(createPrivateEndpoint, false) {
2✔
278
                        // set VirtualNetworkResourceIDs for storage account firewall setting
1✔
279
                        subnets := strings.Split(subnetName, ",")
1✔
280
                        for _, subnet := range subnets {
2✔
281
                                subnet = strings.TrimSpace(subnet)
1✔
282
                                vnetResourceID := d.getSubnetResourceID(vnetResourceGroup, vnetName, subnet)
1✔
283
                                klog.V(2).Infof("set vnetResourceID(%s) for NFS protocol", vnetResourceID)
1✔
284
                                vnetResourceIDs = []string{vnetResourceID}
1✔
285
                                if err := d.updateSubnetServiceEndpoints(ctx, vnetResourceGroup, vnetName, subnet); err != nil {
2✔
286
                                        return nil, status.Errorf(codes.Internal, "update service endpoints failed with error: %v", err)
1✔
287
                                }
1✔
288
                        }
289
                }
290
        }
291

292
        if strings.HasPrefix(strings.ToLower(storageAccountType), "premium") {
10✔
293
                accountKind = string(armstorage.KindBlockBlobStorage)
1✔
294
        }
1✔
295
        if IsAzureStackCloud(d.cloud) {
10✔
296
                accountKind = string(armstorage.KindStorage)
1✔
297
                if storageAccountType != "" && storageAccountType != string(armstorage.SKUNameStandardLRS) && storageAccountType != string(armstorage.SKUNamePremiumLRS) {
2✔
298
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("Invalid skuName value: %s, as Azure Stack only supports %s and %s Storage Account types.", storageAccountType, storage.SkuNamePremiumLRS, storage.SkuNameStandardLRS))
1✔
299
                }
1✔
300
        }
301

302
        tags, err := util.ConvertTagsToMap(customTags)
8✔
303
        if err != nil {
9✔
304
                return nil, status.Errorf(codes.InvalidArgument, err.Error())
1✔
305
        }
1✔
306

307
        if strings.TrimSpace(storageEndpointSuffix) == "" {
14✔
308
                storageEndpointSuffix = d.getStorageEndPointSuffix()
7✔
309
        }
7✔
310

311
        accountOptions := &azure.AccountOptions{
7✔
312
                Name:                            account,
7✔
313
                Type:                            storageAccountType,
7✔
314
                Kind:                            accountKind,
7✔
315
                SubscriptionID:                  subsID,
7✔
316
                ResourceGroup:                   resourceGroup,
7✔
317
                Location:                        location,
7✔
318
                EnableHTTPSTrafficOnly:          enableHTTPSTrafficOnly,
7✔
319
                VirtualNetworkResourceIDs:       vnetResourceIDs,
7✔
320
                Tags:                            tags,
7✔
321
                MatchTags:                       matchTags,
7✔
322
                IsHnsEnabled:                    isHnsEnabled,
7✔
323
                EnableNfsV3:                     enableNfsV3,
7✔
324
                AllowBlobPublicAccess:           allowBlobPublicAccess,
7✔
325
                RequireInfrastructureEncryption: requireInfraEncryption,
7✔
326
                VNetResourceGroup:               vnetResourceGroup,
7✔
327
                VNetName:                        vnetName,
7✔
328
                SubnetName:                      subnetName,
7✔
329
                AccessTier:                      accessTier,
7✔
330
                CreatePrivateEndpoint:           createPrivateEndpoint,
7✔
331
                StorageType:                     provider.StorageTypeBlob,
7✔
332
                StorageEndpointSuffix:           storageEndpointSuffix,
7✔
333
                EnableBlobVersioning:            enableBlobVersioning,
7✔
334
                SoftDeleteBlobs:                 softDeleteBlobs,
7✔
335
                SoftDeleteContainers:            softDeleteContainers,
7✔
336
                GetLatestAccountKey:             getLatestAccountKey,
7✔
337
        }
7✔
338

7✔
339
        var volumeID string
7✔
340
        requestName := "controller_create_volume"
7✔
341
        if req.GetVolumeContentSource() != nil {
9✔
342
                switch req.VolumeContentSource.Type.(type) {
2✔
343
                case *csi.VolumeContentSource_Snapshot:
1✔
344
                        requestName = "controller_create_volume_from_snapshot"
1✔
345
                case *csi.VolumeContentSource_Volume:
1✔
346
                        requestName = "controller_create_volume_from_volume"
1✔
347
                }
348
        }
349
        mc := metrics.NewMetricContext(blobCSIDriverName, requestName, d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
7✔
350
        isOperationSucceeded := false
7✔
351
        defer func() {
14✔
352
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
7✔
353
        }()
7✔
354

355
        var accountKey string
7✔
356
        accountName := account
7✔
357
        secrets := req.GetSecrets()
7✔
358
        if len(secrets) == 0 && accountName == "" {
8✔
359
                if v, ok := d.volMap.Load(volName); ok {
1✔
360
                        accountName = v.(string)
×
361
                } else {
1✔
362
                        lockKey := fmt.Sprintf("%s%s%s%s%s%v", storageAccountType, accountKind, resourceGroup, location, protocol, pointer.BoolDeref(createPrivateEndpoint, false))
1✔
363
                        // search in cache first
1✔
364
                        cache, err := d.accountSearchCache.Get(lockKey, azcache.CacheReadTypeDefault)
1✔
365
                        if err != nil {
1✔
366
                                return nil, status.Errorf(codes.Internal, err.Error())
×
367
                        }
×
368
                        if cache != nil {
1✔
369
                                accountName = cache.(string)
×
370
                        } else {
1✔
371
                                d.volLockMap.LockEntry(lockKey)
1✔
372
                                err = wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
2✔
373
                                        var retErr error
1✔
374
                                        accountName, accountKey, retErr = d.cloud.EnsureStorageAccount(ctx, accountOptions, protocol)
1✔
375
                                        if isRetriableError(retErr) {
1✔
376
                                                klog.Warningf("EnsureStorageAccount(%s) failed with error(%v), waiting for retrying", account, retErr)
×
377
                                                return false, nil
×
378
                                        }
×
379
                                        return true, retErr
1✔
380
                                })
381
                                d.volLockMap.UnlockEntry(lockKey)
1✔
382
                                if err != nil {
2✔
383
                                        return nil, status.Errorf(codes.Internal, "ensure storage account failed with %v", err)
1✔
384
                                }
1✔
385
                                d.accountSearchCache.Set(lockKey, accountName)
×
386
                                d.volMap.Store(volName, accountName)
×
387
                        }
388
                }
389
        }
390

391
        if pointer.BoolDeref(createPrivateEndpoint, false) && isNFSProtocol(protocol) {
6✔
392
                // As for blobfuse/blobfuse2, serverName, i.e.,AZURE_STORAGE_BLOB_ENDPOINT env variable can't include
×
393
                // "privatelink", issue: https://github.com/Azure/azure-storage-fuse/issues/1014
×
394
                //
×
395
                // And use public endpoint will be befine to blobfuse/blobfuse2, because it will be resolved to private endpoint
×
396
                // by private dns zone, which includes CNAME record, documented here:
×
397
                // https://learn.microsoft.com/en-us/azure/storage/common/storage-private-endpoints?toc=%2Fazure%2Fstorage%2Fblobs%2Ftoc.json&bc=%2Fazure%2Fstorage%2Fblobs%2Fbreadcrumb%2Ftoc.json#dns-changes-for-private-endpoints
×
398
                setKeyValueInMap(parameters, serverNameField, fmt.Sprintf("%s.privatelink.blob.%s", accountName, storageEndpointSuffix))
×
399
        }
×
400

401
        accountOptions.Name = accountName
6✔
402
        if len(secrets) == 0 && useDataPlaneAPI {
7✔
403
                if accountKey == "" {
2✔
404
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
2✔
405
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
406
                        }
1✔
407
                }
408
                secrets = createStorageAccountSecret(accountName, accountKey)
×
409
        }
410

411
        // replace pv/pvc name namespace metadata in subDir
412
        containerName = replaceWithMap(containerName, containerNameReplaceMap)
5✔
413
        validContainerName := containerName
5✔
414
        if validContainerName == "" {
5✔
415
                validContainerName = volName
×
416
                if containerNamePrefix != "" {
×
417
                        validContainerName = containerNamePrefix + "-" + volName
×
418
                }
×
419
                validContainerName = getValidContainerName(validContainerName, protocol)
×
420
                setKeyValueInMap(parameters, containerNameField, validContainerName)
×
421
        }
422

423
        if req.GetVolumeContentSource() != nil {
7✔
424
                accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace)
2✔
425
                if err != nil {
2✔
426
                        return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
×
427
                }
×
428
                if err := d.copyVolume(req, accountSASToken, authAzcopyEnv, validContainerName, storageEndpointSuffix); err != nil {
4✔
429
                        return nil, err
2✔
430
                }
2✔
431
        } else {
3✔
432
                klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
3✔
433
                if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
4✔
434
                        return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
1✔
435
                }
1✔
436
        }
437

438
        if storeAccountKey && len(req.GetSecrets()) == 0 {
4✔
439
                if accountKey == "" {
4✔
440
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
3✔
441
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
442
                        }
1✔
443
                }
444

445
                secretName, err := setAzureCredentials(ctx, d.KubeClient, accountName, accountKey, secretNamespace)
1✔
446
                if err != nil {
1✔
447
                        return nil, status.Errorf(codes.Internal, "failed to store storage account key: %v", err)
×
448
                }
×
449
                if secretName != "" {
1✔
450
                        klog.V(2).Infof("store account key to k8s secret(%v) in %s namespace", secretName, secretNamespace)
×
451
                }
×
452
        }
453

454
        var uuid string
1✔
455
        if containerName != "" {
2✔
456
                // add volume name as suffix to differentiate volumeID since "containerName" is specified
1✔
457
                // not necessary for dynamic container name creation since volumeID already contains volume name
1✔
458
                uuid = volName
1✔
459
        }
1✔
460
        volumeID = fmt.Sprintf(volumeIDTemplate, resourceGroup, accountName, validContainerName, uuid, secretNamespace, subsID)
1✔
461
        klog.V(2).Infof("create container %s on storage account %s successfully", validContainerName, accountName)
1✔
462

1✔
463
        if useDataPlaneAPI {
1✔
464
                d.dataPlaneAPIVolCache.Set(volumeID, "")
×
465
                d.dataPlaneAPIVolCache.Set(accountName, "")
×
466
        }
×
467

468
        isOperationSucceeded = true
1✔
469
        // reset secretNamespace field in VolumeContext
1✔
470
        setKeyValueInMap(parameters, secretNamespaceField, secretNamespace)
1✔
471
        return &csi.CreateVolumeResponse{
1✔
472
                Volume: &csi.Volume{
1✔
473
                        VolumeId:      volumeID,
1✔
474
                        CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
1✔
475
                        VolumeContext: parameters,
1✔
476
                        ContentSource: req.GetVolumeContentSource(),
1✔
477
                },
1✔
478
        }, nil
1✔
479
}
480

481
// DeleteVolume delete a volume
482
func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
5✔
483
        volumeID := req.GetVolumeId()
5✔
484
        if len(volumeID) == 0 {
6✔
485
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
486
        }
1✔
487

488
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
4✔
489
                return nil, status.Errorf(codes.Internal, "invalid delete volume req: %v", req)
×
490
        }
×
491

492
        if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
4✔
493
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
×
494
        }
×
495
        defer d.volumeLocks.Release(volumeID)
4✔
496

4✔
497
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
4✔
498
        if err != nil {
5✔
499
                // According to CSI Driver Sanity Tester, should succeed when an invalid volume id is used
1✔
500
                klog.Errorf("GetContainerInfo(%s) in DeleteVolume failed with error: %v", volumeID, err)
1✔
501
                return &csi.DeleteVolumeResponse{}, nil
1✔
502
        }
1✔
503

504
        secrets := req.GetSecrets()
3✔
505
        if len(secrets) == 0 && d.useDataPlaneAPI(volumeID, accountName) {
4✔
506
                _, accountName, accountKey, _, _, err := d.GetAuthEnv(ctx, volumeID, "", nil, secrets)
1✔
507
                if err != nil {
2✔
508
                        return nil, status.Errorf(codes.Internal, "GetAuthEnv(%s) failed with %v", volumeID, err)
1✔
509
                }
1✔
510
                if accountName != "" && accountKey != "" {
×
511
                        secrets = createStorageAccountSecret(accountName, accountKey)
×
512
                }
×
513
        }
514

515
        mc := metrics.NewMetricContext(blobCSIDriverName, "controller_delete_volume", d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
2✔
516
        isOperationSucceeded := false
2✔
517
        defer func() {
4✔
518
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
2✔
519
        }()
2✔
520

521
        if resourceGroupName == "" {
3✔
522
                resourceGroupName = d.cloud.ResourceGroup
1✔
523
        }
1✔
524
        klog.V(2).Infof("deleting container(%s) rg(%s) account(%s) volumeID(%s)", containerName, resourceGroupName, accountName, volumeID)
2✔
525
        if err := d.DeleteBlobContainer(ctx, subsID, resourceGroupName, accountName, containerName, secrets); err != nil {
4✔
526
                return nil, status.Errorf(codes.Internal, "failed to delete container(%s) under rg(%s) account(%s) volumeID(%s), error: %v", containerName, resourceGroupName, accountName, volumeID, err)
2✔
527
        }
2✔
528

529
        isOperationSucceeded = true
×
530
        klog.V(2).Infof("container(%s) under rg(%s) account(%s) volumeID(%s) is deleted successfully", containerName, resourceGroupName, accountName, volumeID)
×
531
        return &csi.DeleteVolumeResponse{}, nil
×
532
}
533

534
// ValidateVolumeCapabilities return the capabilities of the volume
535
func (d *Driver) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
8✔
536
        volumeID := req.GetVolumeId()
8✔
537
        if len(volumeID) == 0 {
9✔
538
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
539
        }
1✔
540
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
9✔
541
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
542
        }
2✔
543

544
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
5✔
545
        if err != nil {
6✔
546
                klog.Errorf("GetContainerInfo(%s) in ValidateVolumeCapabilities failed with error: %v", volumeID, err)
1✔
547
                return nil, status.Error(codes.NotFound, err.Error())
1✔
548
        }
1✔
549

550
        var exist bool
4✔
551
        secrets := req.GetSecrets()
4✔
552
        if len(secrets) > 0 {
5✔
553
                container, err := getContainerReference(containerName, secrets, d.getCloudEnvironment())
1✔
554
                if err != nil {
2✔
555
                        return nil, status.Error(codes.Internal, err.Error())
1✔
556
                }
1✔
557
                exist, err = container.Exists()
×
558
                if err != nil {
×
559
                        return nil, status.Error(codes.Internal, err.Error())
×
560
                }
×
561
        } else {
3✔
562
                if resourceGroupName == "" {
3✔
563
                        resourceGroupName = d.cloud.ResourceGroup
×
564
                }
×
565
                blobClient, err := d.clientFactory.GetBlobContainerClientForSub(subsID)
3✔
566
                if err != nil {
3✔
567
                        return nil, status.Error(codes.Internal, err.Error())
×
568
                }
×
569

570
                blobContainer, err := blobClient.Get(ctx, resourceGroupName, accountName, containerName)
3✔
571
                if err != nil {
4✔
572
                        return nil, status.Error(codes.Internal, err.Error())
1✔
573
                }
1✔
574
                if blobContainer.ContainerProperties == nil {
3✔
575
                        return nil, status.Errorf(codes.Internal, "ContainerProperties of volume(%s) is nil", volumeID)
1✔
576
                }
1✔
577
                exist = blobContainer.ContainerProperties.Deleted != nil && !*blobContainer.ContainerProperties.Deleted
1✔
578
        }
579
        if !exist {
2✔
580
                return nil, status.Errorf(codes.NotFound, "requested volume(%s) does not exist", volumeID)
1✔
581
        }
1✔
582
        klog.V(2).Infof("ValidateVolumeCapabilities on volume(%s) succeeded", volumeID)
×
583

×
584
        // blob driver supports all AccessModes, no need to check capabilities here
×
585
        return &csi.ValidateVolumeCapabilitiesResponse{
×
586
                Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
×
587
                        VolumeCapabilities: req.GetVolumeCapabilities(),
×
588
                },
×
589
                Message: "",
×
590
        }, nil
×
591
}
592

593
func (d *Driver) ControllerPublishVolume(_ context.Context, _ *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
1✔
594
        return nil, status.Error(codes.Unimplemented, "ControllerPublishVolume is not yet implemented")
1✔
595
}
1✔
596

597
func (d *Driver) ControllerUnpublishVolume(_ context.Context, _ *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
1✔
598
        return nil, status.Error(codes.Unimplemented, "ControllerUnpublishVolume is not yet implemented")
1✔
599
}
1✔
600

601
// ControllerGetVolume get volume
602
func (d *Driver) ControllerGetVolume(context.Context, *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
1✔
603
        return nil, status.Error(codes.Unimplemented, "ControllerGetVolume is not yet implemented")
1✔
604
}
1✔
605

606
// GetCapacity returns the capacity of the total available storage pool
607
func (d *Driver) GetCapacity(_ context.Context, _ *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
1✔
608
        return nil, status.Error(codes.Unimplemented, "GetCapacity is not yet implemented")
1✔
609
}
1✔
610

611
// ListVolumes return all available volumes
612
func (d *Driver) ListVolumes(_ context.Context, _ *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
1✔
613
        return nil, status.Error(codes.Unimplemented, "ListVolumes is not yet implemented")
1✔
614
}
1✔
615

616
// CreateSnapshot create snapshot
617
func (d *Driver) CreateSnapshot(_ context.Context, _ *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
1✔
618
        return nil, status.Error(codes.Unimplemented, "CreateSnapshot is not yet implemented")
1✔
619
}
1✔
620

621
// DeleteSnapshot delete snapshot
622
func (d *Driver) DeleteSnapshot(_ context.Context, _ *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
1✔
623
        return nil, status.Error(codes.Unimplemented, "DeleteSnapshot is not yet implemented")
1✔
624
}
1✔
625

626
// ListSnapshots list snapshots
627
func (d *Driver) ListSnapshots(_ context.Context, _ *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
1✔
628
        return nil, status.Error(codes.Unimplemented, "ListSnapshots is not yet implemented")
1✔
629
}
1✔
630

631
// ControllerGetCapabilities returns the capabilities of the Controller plugin
632
func (d *Driver) ControllerGetCapabilities(_ context.Context, _ *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
1✔
633
        return &csi.ControllerGetCapabilitiesResponse{
1✔
634
                Capabilities: d.Cap,
1✔
635
        }, nil
1✔
636
}
1✔
637

638
// ControllerExpandVolume controller expand volume
639
func (d *Driver) ControllerExpandVolume(_ context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
4✔
640
        if len(req.GetVolumeId()) == 0 {
5✔
641
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
642
        }
1✔
643

644
        if req.GetCapacityRange() == nil {
4✔
645
                return nil, status.Error(codes.InvalidArgument, "Capacity Range missing in request")
1✔
646
        }
1✔
647

648
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_EXPAND_VOLUME); err != nil {
2✔
649
                return nil, status.Errorf(codes.Internal, "invalid expand volume req: %v", req)
×
650
        }
×
651

652
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
2✔
653
        requestGiB := int64(util.RoundUpGiB(volSizeBytes))
2✔
654

2✔
655
        if volSizeBytes > containerMaxSize {
3✔
656
                return nil, status.Errorf(codes.OutOfRange, "required bytes (%d) exceeds the maximum supported bytes (%d)", volSizeBytes, containerMaxSize)
1✔
657
        }
1✔
658

659
        klog.V(2).Infof("ControllerExpandVolume(%s) successfully, currentQuota: %d Gi", req.VolumeId, requestGiB)
1✔
660

1✔
661
        return &csi.ControllerExpandVolumeResponse{CapacityBytes: req.GetCapacityRange().GetRequiredBytes()}, nil
1✔
662
}
663

664
// CreateBlobContainer creates a blob container
665
func (d *Driver) CreateBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
9✔
666
        if containerName == "" {
10✔
667
                return fmt.Errorf("containerName is empty")
1✔
668
        }
1✔
669
        return wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
16✔
670
                var err error
8✔
671
                if len(secrets) > 0 {
9✔
672
                        container, getErr := getContainerReference(containerName, secrets, d.getCloudEnvironment())
1✔
673
                        if getErr != nil {
2✔
674
                                return true, getErr
1✔
675
                        }
1✔
676
                        _, err = container.CreateIfNotExists(&azstorage.CreateContainerOptions{Access: azstorage.ContainerAccessTypePrivate})
×
677
                } else {
7✔
678
                        blobContainer := armstorage.BlobContainer{
7✔
679
                                ContainerProperties: &armstorage.ContainerProperties{
7✔
680
                                        PublicAccess: to.Ptr(armstorage.PublicAccessNone),
7✔
681
                                },
7✔
682
                        }
7✔
683
                        var blobClient blobcontainerclient.Interface
7✔
684
                        blobClient, err = d.clientFactory.GetBlobContainerClientForSub(subsID)
7✔
685
                        if err != nil {
7✔
686
                                return true, err
×
687
                        }
×
688
                        _, err = blobClient.CreateContainer(ctx, resourceGroupName, accountName, containerName, blobContainer)
7✔
689
                }
690
                if err != nil {
11✔
691
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
4✔
692
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) {
6✔
693
                                klog.Warningf("CreateContainer(%s, %s, %s) failed with error(%v), retry", resourceGroupName, accountName, containerName, err)
2✔
694
                                return false, nil
2✔
695
                        }
2✔
696
                }
697
                return true, err
5✔
698
        })
699
}
700

701
// DeleteBlobContainer deletes a blob container
702
func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
8✔
703
        if containerName == "" {
9✔
704
                return fmt.Errorf("containerName is empty")
1✔
705
        }
1✔
706
        return wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
14✔
707
                var err error
7✔
708
                if len(secrets) > 0 {
10✔
709
                        container, getErr := getContainerReference(containerName, secrets, d.getCloudEnvironment())
3✔
710
                        if getErr != nil {
6✔
711
                                return true, getErr
3✔
712
                        }
3✔
713
                        _, err = container.DeleteIfExists(nil)
×
714
                } else {
4✔
715
                        var blobClient blobcontainerclient.Interface
4✔
716
                        blobClient, err = d.clientFactory.GetBlobContainerClientForSub(subsID)
4✔
717
                        if err != nil {
4✔
718
                                return true, err
×
719
                        }
×
720
                        err = blobClient.DeleteContainer(ctx, resourceGroupName, accountName, containerName)
4✔
721
                }
722
                if err != nil {
7✔
723
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
3✔
724
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) ||
3✔
725
                                strings.Contains(err.Error(), statusCodeNotFound) ||
3✔
726
                                strings.Contains(err.Error(), httpCodeNotFound) {
5✔
727
                                klog.Warningf("delete container(%s) on account(%s) failed with error(%v), return as success", containerName, accountName, err)
2✔
728
                                return true, nil
2✔
729
                        }
2✔
730
                        return false, fmt.Errorf("failed to delete container(%s) on account(%s), error: %w", containerName, accountName, err)
1✔
731
                }
732
                return true, err
1✔
733
        })
734
}
735

736
// CopyBlobContainer copies a blob container in the same storage account
737
func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
6✔
738
        var sourceVolumeID string
6✔
739
        if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
12✔
740
                sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
6✔
741

6✔
742
        }
6✔
743
        resourceGroupName, accountName, srcContainerName, _, _, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
6✔
744
        if err != nil {
8✔
745
                return status.Error(codes.NotFound, err.Error())
2✔
746
        }
2✔
747
        if srcContainerName == "" || dstContainerName == "" {
6✔
748
                return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName)
2✔
749
        }
2✔
750

751
        srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken)
2✔
752
        dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken)
2✔
753

2✔
754
        jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
2✔
755
        klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
2✔
756
        switch jobState {
2✔
757
        case util.AzcopyJobError, util.AzcopyJobCompleted:
1✔
758
                return err
1✔
759
        case util.AzcopyJobRunning:
1✔
760
                return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
1✔
761
        case util.AzcopyJobNotFound:
×
762
                klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
×
763
                execFunc := func() error {
×
764
                        cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false")
×
765
                        if len(authAzcopyEnv) > 0 {
×
766
                                cmd.Env = append(os.Environ(), authAzcopyEnv...)
×
767
                        }
×
768
                        if out, err := cmd.CombinedOutput(); err != nil {
×
769
                                return fmt.Errorf("exec error: %v, output: %v", err, string(out))
×
770
                        }
×
771
                        return nil
×
772
                }
773
                timeoutFunc := func() error {
×
774
                        _, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
×
775
                        return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcContainerName, dstContainerName, percent)
×
776
                }
×
777
                copyErr := util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
×
778
                if copyErr != nil {
×
779
                        klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", resourceGroupName, accountName, dstPath, copyErr)
×
780
                } else {
×
781
                        klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
×
782
                }
×
783
                return copyErr
×
784
        }
785
        return err
×
786
}
787

788
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
789
func (d *Driver) copyVolume(req *csi.CreateVolumeRequest, accountSASToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
8✔
790
        vs := req.VolumeContentSource
8✔
791
        switch vs.Type.(type) {
8✔
792
        case *csi.VolumeContentSource_Snapshot:
2✔
793
                return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
2✔
794
        case *csi.VolumeContentSource_Volume:
6✔
795
                return d.copyBlobContainer(req, accountSASToken, authAzcopyEnv, dstContainerName, storageEndpointSuffix)
6✔
796
        default:
×
797
                return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
×
798
        }
799
}
800

801
// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
802
func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
7✔
803
        azureAuthConfig := d.cloud.Config.AzureAuthConfig
7✔
804
        var authAzcopyEnv []string
7✔
805
        if azureAuthConfig.UseManagedIdentityExtension {
11✔
806
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, MSI))
4✔
807
                if len(azureAuthConfig.UserAssignedIdentityID) > 0 {
5✔
808
                        klog.V(2).Infof("use user assigned managed identity to authorize azcopy")
1✔
809
                        authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyMSIClientID, azureAuthConfig.UserAssignedIdentityID))
1✔
810
                } else {
4✔
811
                        klog.V(2).Infof("use system-assigned managed identity to authorize azcopy")
3✔
812
                }
3✔
813
                return authAzcopyEnv, nil
4✔
814
        }
815
        if len(azureAuthConfig.AADClientSecret) > 0 {
5✔
816
                klog.V(2).Infof("use service principal to authorize azcopy")
2✔
817
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, SPN))
2✔
818
                if azureAuthConfig.AADClientID == "" || azureAuthConfig.TenantID == "" {
3✔
819
                        return []string{}, fmt.Errorf("AADClientID and TenantID must be set when use service principal")
1✔
820
                }
1✔
821
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAApplicationID, azureAuthConfig.AADClientID))
1✔
822
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAClientSecret, azureAuthConfig.AADClientSecret))
1✔
823
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyTenantID, azureAuthConfig.TenantID))
1✔
824
                klog.V(2).Infof(fmt.Sprintf("set AZCOPY_SPA_APPLICATION_ID=%s, AZCOPY_TENANT_ID=%s successfully", azureAuthConfig.AADClientID, azureAuthConfig.TenantID))
1✔
825

1✔
826
                return authAzcopyEnv, nil
1✔
827
        }
828
        return []string{}, fmt.Errorf("service principle or managed identity are both not set")
1✔
829
}
830

831
// getAzcopyAuth will only generate sas token for azcopy in following conditions:
832
// 1. secrets is not empty
833
// 2. driver is not using managed identity and service principal
834
// 3. azcopy returns AuthorizationPermissionMismatch error when using service principal or managed identity
835
func (d *Driver) getAzcopyAuth(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, accountOptions *azure.AccountOptions, secrets map[string]string, secretName, secretNamespace string) (string, []string, error) {
6✔
836
        var authAzcopyEnv []string
6✔
837
        var err error
6✔
838
        useSasToken := false
6✔
839
        if !d.useDataPlaneAPI("", accountName) && len(secrets) == 0 && len(secretName) == 0 {
8✔
840
                // search in cache first
2✔
841
                if cache, err := d.azcopySasTokenCache.Get(accountName, azcache.CacheReadTypeDefault); err == nil && cache != nil {
2✔
842
                        klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
843
                        return cache.(string), nil, nil
×
844
                }
×
845

846
                authAzcopyEnv, err = d.authorizeAzcopyWithIdentity()
2✔
847
                if err != nil {
2✔
848
                        klog.Warningf("failed to authorize azcopy with identity, error: %v", err)
×
849
                } else {
2✔
850
                        if len(authAzcopyEnv) > 0 {
4✔
851
                                out, testErr := d.azcopy.TestListJobs(accountName, storageEndpointSuffix, authAzcopyEnv)
2✔
852
                                if testErr != nil {
2✔
853
                                        return "", nil, fmt.Errorf("azcopy list command failed with error(%v): %v", testErr, out)
×
854
                                }
×
855
                                if strings.Contains(out, authorizationPermissionMismatch) {
2✔
856
                                        klog.Warningf("azcopy list failed with AuthorizationPermissionMismatch error, should assign \"Storage Blob Data Contributor\" role to controller identity, fall back to use sas token, original output: %v", out)
×
857
                                        useSasToken = true
×
858
                                }
×
859
                        }
860
                }
861
        }
862

863
        if len(secrets) > 0 || len(secretName) > 0 || len(authAzcopyEnv) == 0 || useSasToken {
10✔
864
                if accountKey == "" {
8✔
865
                        if _, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
5✔
866
                                return "", nil, err
1✔
867
                        }
1✔
868
                }
869
                klog.V(2).Infof("generate sas token for account(%s)", accountName)
3✔
870
                sasToken, err := d.generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
3✔
871
                return sasToken, nil, err
3✔
872
        }
873
        return "", authAzcopyEnv, nil
2✔
874
}
875

876
// isValidVolumeCapabilities validates the given VolumeCapability array is valid
877
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
28✔
878
        if len(volCaps) == 0 {
30✔
879
                return fmt.Errorf("volume capabilities missing in request")
2✔
880
        }
2✔
881
        for _, c := range volCaps {
52✔
882
                if c.GetBlock() != nil {
28✔
883
                        return fmt.Errorf("block volume capability not supported")
2✔
884
                }
2✔
885
        }
886
        return nil
24✔
887
}
888

889
func parseDays(dayStr string) (int32, error) {
3✔
890
        days, err := strconv.Atoi(dayStr)
3✔
891
        if err != nil {
4✔
892
                return 0, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid %s:%s in storage class", softDeleteBlobsField, dayStr))
1✔
893
        }
1✔
894
        if days <= 0 || days > 365 {
3✔
895
                return 0, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid %s:%s in storage class, should be in range [1, 365]", softDeleteBlobsField, dayStr))
1✔
896
        }
1✔
897

898
        return int32(days), nil
1✔
899
}
900

901
// generateSASToken generate a sas token for storage account
902
func (d *Driver) generateSASToken(accountName, accountKey, storageEndpointSuffix string, expiryTime int) (string, error) {
5✔
903
        // search in cache first
5✔
904
        cache, err := d.azcopySasTokenCache.Get(accountName, azcache.CacheReadTypeDefault)
5✔
905
        if err != nil {
5✔
906
                return "", fmt.Errorf("get(%s) from azcopySasTokenCache failed with error: %v", accountName, err)
×
907
        }
×
908
        if cache != nil {
5✔
909
                klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
910
                return cache.(string), nil
×
911
        }
×
912

913
        credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
5✔
914
        if err != nil {
8✔
915
                return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new shared key credential, accountName: %s, err: %s", accountName, err.Error()))
3✔
916
        }
3✔
917
        clientOptions := service.ClientOptions{}
2✔
918
        clientOptions.InsecureAllowCredentialWithHTTP = true
2✔
919
        serviceClient, err := service.NewClientWithSharedKeyCredential(fmt.Sprintf("https://%s.blob.%s/", accountName, storageEndpointSuffix), credential, &clientOptions)
2✔
920
        if err != nil {
2✔
921
                return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new client with shared key credential, accountName: %s, err: %s", accountName, err.Error()))
×
922
        }
×
923
        sasURL, err := serviceClient.GetSASURL(
2✔
924
                sas.AccountResourceTypes{Object: true, Service: false, Container: true},
2✔
925
                sas.AccountPermissions{Read: true, List: true, Write: true},
2✔
926
                time.Now().Add(time.Duration(expiryTime)*time.Minute),
2✔
927
                &service.GetSASURLOptions{StartTime: to.Ptr(time.Now())},
2✔
928
        )
2✔
929
        if err != nil {
2✔
930
                return "", err
×
931
        }
×
932
        u, err := url.Parse(sasURL)
2✔
933
        if err != nil {
2✔
934
                return "", err
×
935
        }
×
936
        sasToken := "?" + u.RawQuery
2✔
937
        d.azcopySasTokenCache.Set(accountName, sasToken)
2✔
938
        return sasToken, nil
2✔
939
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc