• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 4889231241

05 May 2023 02:18AM UTC coverage: 81.423%. Remained the same
4889231241

push

github

GitHub
Merge pull request #904 from k8s-infra-cherrypick-robot/cherry-pick-892-to-release-1.19

1797 of 2207 relevant lines covered (81.42%)

5.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.31
/pkg/blob/controllerserver.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package blob
18

19
import (
20
        "context"
21
        "fmt"
22
        "strconv"
23
        "strings"
24

25
        "google.golang.org/grpc/codes"
26
        "google.golang.org/grpc/status"
27

28
        "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage"
29
        azstorage "github.com/Azure/azure-sdk-for-go/storage"
30
        "github.com/container-storage-interface/spec/lib/go/csi"
31

32
        "k8s.io/apimachinery/pkg/util/wait"
33
        "k8s.io/klog/v2"
34
        "k8s.io/utils/pointer"
35

36
        "sigs.k8s.io/blob-csi-driver/pkg/util"
37
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
38
        "sigs.k8s.io/cloud-provider-azure/pkg/metrics"
39
        "sigs.k8s.io/cloud-provider-azure/pkg/provider"
40
        azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
41
)
42

43
const (
44
        privateEndpoint = "privateendpoint"
45
)
46

47
// CreateVolume provisions a volume
48
func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
20✔
49
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
21✔
50
                klog.Errorf("invalid create volume req: %v", req)
1✔
51
                return nil, err
1✔
52
        }
1✔
53

54
        volName := req.GetName()
19✔
55
        if len(volName) == 0 {
20✔
56
                return nil, status.Error(codes.InvalidArgument, "CreateVolume Name must be provided")
1✔
57
        }
1✔
58

59
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
20✔
60
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
61
        }
2✔
62

63
        if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
16✔
64
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
×
65
        }
×
66
        defer d.volumeLocks.Release(volName)
16✔
67

16✔
68
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
16✔
69
        requestGiB := int(util.RoundUpGiB(volSizeBytes))
16✔
70

16✔
71
        parameters := req.GetParameters()
16✔
72
        if parameters == nil {
17✔
73
                parameters = make(map[string]string)
1✔
74
        }
1✔
75
        var storageAccountType, subsID, resourceGroup, location, account, containerName, containerNamePrefix, protocol, customTags, secretName, secretNamespace, pvcNamespace string
16✔
76
        var isHnsEnabled, requireInfraEncryption *bool
16✔
77
        var vnetResourceGroup, vnetName, subnetName, accessTier, networkEndpointType, storageEndpointSuffix string
16✔
78
        var matchTags, useDataPlaneAPI bool
16✔
79
        // set allowBlobPublicAccess as false by default
16✔
80
        allowBlobPublicAccess := pointer.Bool(false)
16✔
81

16✔
82
        containerNameReplaceMap := map[string]string{}
16✔
83

16✔
84
        // store account key to k8s secret by default
16✔
85
        storeAccountKey := true
16✔
86

16✔
87
        // Apply ProvisionerParameters (case-insensitive). We leave validation of
16✔
88
        // the values to the cloud provider.
16✔
89
        for k, v := range parameters {
105✔
90
                switch strings.ToLower(k) {
89✔
91
                case skuNameField:
9✔
92
                        storageAccountType = v
9✔
93
                case storageAccountTypeField:
10✔
94
                        storageAccountType = v
10✔
95
                case locationField:
9✔
96
                        location = v
9✔
97
                case storageAccountField:
10✔
98
                        account = v
10✔
99
                case subscriptionIDField:
2✔
100
                        subsID = v
2✔
101
                case resourceGroupField:
9✔
102
                        resourceGroup = v
9✔
103
                case containerNameField:
10✔
104
                        containerName = v
10✔
105
                case containerNamePrefixField:
2✔
106
                        containerNamePrefix = v
2✔
107
                case protocolField:
9✔
108
                        protocol = v
9✔
109
                case tagsField:
1✔
110
                        customTags = v
1✔
111
                case matchTagsField:
1✔
112
                        matchTags = strings.EqualFold(v, trueValue)
1✔
113
                case secretNameField:
×
114
                        secretName = v
×
115
                case secretNamespaceField:
×
116
                        secretNamespace = v
×
117
                case isHnsEnabledField:
×
118
                        if strings.EqualFold(v, trueValue) {
×
119
                                isHnsEnabled = pointer.Bool(true)
×
120
                        }
×
121
                case storeAccountKeyField:
4✔
122
                        if strings.EqualFold(v, falseValue) {
7✔
123
                                storeAccountKey = false
3✔
124
                        }
3✔
125
                case allowBlobPublicAccessField:
×
126
                        if strings.EqualFold(v, trueValue) {
×
127
                                allowBlobPublicAccess = pointer.Bool(true)
×
128
                        }
×
129
                case requireInfraEncryptionField:
×
130
                        if strings.EqualFold(v, trueValue) {
×
131
                                requireInfraEncryption = pointer.Bool(true)
×
132
                        }
×
133
                case pvcNamespaceKey:
×
134
                        pvcNamespace = v
×
135
                        containerNameReplaceMap[pvcNamespaceMetadata] = v
×
136
                case pvcNameKey:
×
137
                        containerNameReplaceMap[pvcNameMetadata] = v
×
138
                case pvNameKey:
×
139
                        containerNameReplaceMap[pvNameMetadata] = v
×
140
                case serverNameField:
×
141
                        // no op, only used in NodeStageVolume
142
                case storageEndpointSuffixField:
×
143
                        storageEndpointSuffix = v
×
144
                case vnetResourceGroupField:
×
145
                        vnetResourceGroup = v
×
146
                case vnetNameField:
×
147
                        vnetName = v
×
148
                case subnetNameField:
×
149
                        subnetName = v
×
150
                case accessTierField:
×
151
                        accessTier = v
×
152
                case networkEndpointTypeField:
×
153
                        networkEndpointType = v
×
154
                case mountPermissionsField:
11✔
155
                        // only do validations here, used in NodeStageVolume, NodePublishVolume
11✔
156
                        if v != "" {
22✔
157
                                if _, err := strconv.ParseUint(v, 8, 32); err != nil {
12✔
158
                                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid mountPermissions %s in storage class", v))
1✔
159
                                }
1✔
160
                        }
161
                case useDataPlaneAPIField:
1✔
162
                        useDataPlaneAPI = strings.EqualFold(v, trueValue)
1✔
163
                default:
1✔
164
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid parameter %q in storage class", k))
1✔
165
                }
166
        }
167

168
        if matchTags && account != "" {
15✔
169
                return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("matchTags must set as false when storageAccount(%s) is provided", account))
1✔
170
        }
1✔
171

172
        if subsID != "" && subsID != d.cloud.SubscriptionID {
15✔
173
                if protocol == NFS {
3✔
174
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("NFS protocol is not supported in cross subscription(%s)", subsID))
1✔
175
                }
1✔
176
                if !storeAccountKey {
2✔
177
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("storeAccountKey must set as true in cross subscription(%s)", subsID))
1✔
178
                }
1✔
179
        }
180

181
        if resourceGroup == "" {
15✔
182
                resourceGroup = d.cloud.ResourceGroup
4✔
183
        }
4✔
184

185
        if secretNamespace == "" {
22✔
186
                if pvcNamespace == "" {
22✔
187
                        secretNamespace = defaultNamespace
11✔
188
                } else {
11✔
189
                        secretNamespace = pvcNamespace
×
190
                }
×
191
        }
192

193
        if protocol == "" {
15✔
194
                protocol = Fuse
4✔
195
        }
4✔
196
        if !isSupportedProtocol(protocol) {
12✔
197
                return nil, status.Errorf(codes.InvalidArgument, "protocol(%s) is not supported, supported protocol list: %v", protocol, supportedProtocolList)
1✔
198
        }
1✔
199
        if !isSupportedAccessTier(accessTier) {
10✔
200
                return nil, status.Errorf(codes.InvalidArgument, "accessTier(%s) is not supported, supported AccessTier list: %v", accessTier, storage.PossibleAccessTierValues())
×
201
        }
×
202

203
        if containerName != "" && containerNamePrefix != "" {
11✔
204
                return nil, status.Errorf(codes.InvalidArgument, "containerName(%s) and containerNamePrefix(%s) could not be specified together", containerName, containerNamePrefix)
1✔
205
        }
1✔
206
        if !isSupportedContainerNamePrefix(containerNamePrefix) {
10✔
207
                return nil, status.Errorf(codes.InvalidArgument, "containerNamePrefix(%s) can only contain lowercase letters, numbers, hyphens, and length should be less than 21", containerNamePrefix)
1✔
208
        }
1✔
209

210
        enableHTTPSTrafficOnly := true
8✔
211
        createPrivateEndpoint := false
8✔
212
        if strings.EqualFold(networkEndpointType, privateEndpoint) {
8✔
213
                createPrivateEndpoint = true
×
214
        }
×
215
        accountKind := string(storage.KindStorageV2)
8✔
216
        var (
8✔
217
                vnetResourceIDs []string
8✔
218
                enableNfsV3     *bool
8✔
219
        )
8✔
220
        if protocol == NFS {
9✔
221
                isHnsEnabled = pointer.Bool(true)
1✔
222
                enableNfsV3 = pointer.Bool(true)
1✔
223
                // NFS protocol does not need account key
1✔
224
                storeAccountKey = false
1✔
225
                if !createPrivateEndpoint {
2✔
226
                        // set VirtualNetworkResourceIDs for storage account firewall setting
1✔
227
                        vnetResourceID := d.getSubnetResourceID(vnetResourceGroup, vnetName, subnetName)
1✔
228
                        klog.V(2).Infof("set vnetResourceID(%s) for NFS protocol", vnetResourceID)
1✔
229
                        vnetResourceIDs = []string{vnetResourceID}
1✔
230
                        if err := d.updateSubnetServiceEndpoints(ctx, vnetResourceGroup, vnetName, subnetName); err != nil {
2✔
231
                                return nil, status.Errorf(codes.Internal, "update service endpoints failed with error: %v", err)
1✔
232
                        }
1✔
233
                }
234
        }
235

236
        if strings.HasPrefix(strings.ToLower(storageAccountType), "premium") {
8✔
237
                accountKind = string(storage.KindBlockBlobStorage)
1✔
238
        }
1✔
239
        if IsAzureStackCloud(d.cloud) {
8✔
240
                accountKind = string(storage.KindStorage)
1✔
241
                if storageAccountType != "" && storageAccountType != string(storage.SkuNameStandardLRS) && storageAccountType != string(storage.SkuNamePremiumLRS) {
2✔
242
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("Invalid skuName value: %s, as Azure Stack only supports %s and %s Storage Account types.", storageAccountType, storage.SkuNamePremiumLRS, storage.SkuNameStandardLRS))
1✔
243
                }
1✔
244
        }
245

246
        tags, err := util.ConvertTagsToMap(customTags)
6✔
247
        if err != nil {
7✔
248
                return nil, status.Errorf(codes.InvalidArgument, err.Error())
1✔
249
        }
1✔
250

251
        if strings.TrimSpace(storageEndpointSuffix) == "" {
10✔
252
                if d.cloud.Environment.StorageEndpointSuffix != "" {
5✔
253
                        storageEndpointSuffix = d.cloud.Environment.StorageEndpointSuffix
×
254
                } else {
5✔
255
                        storageEndpointSuffix = defaultStorageEndPointSuffix
5✔
256
                }
5✔
257
        }
258

259
        accountOptions := &azure.AccountOptions{
5✔
260
                Name:                            account,
5✔
261
                Type:                            storageAccountType,
5✔
262
                Kind:                            accountKind,
5✔
263
                SubscriptionID:                  subsID,
5✔
264
                ResourceGroup:                   resourceGroup,
5✔
265
                Location:                        location,
5✔
266
                EnableHTTPSTrafficOnly:          enableHTTPSTrafficOnly,
5✔
267
                VirtualNetworkResourceIDs:       vnetResourceIDs,
5✔
268
                Tags:                            tags,
5✔
269
                MatchTags:                       matchTags,
5✔
270
                IsHnsEnabled:                    isHnsEnabled,
5✔
271
                EnableNfsV3:                     enableNfsV3,
5✔
272
                AllowBlobPublicAccess:           allowBlobPublicAccess,
5✔
273
                RequireInfrastructureEncryption: requireInfraEncryption,
5✔
274
                VNetResourceGroup:               vnetResourceGroup,
5✔
275
                VNetName:                        vnetName,
5✔
276
                SubnetName:                      subnetName,
5✔
277
                AccessTier:                      accessTier,
5✔
278
                CreatePrivateEndpoint:           createPrivateEndpoint,
5✔
279
                StorageType:                     provider.StorageTypeBlob,
5✔
280
                StorageEndpointSuffix:           storageEndpointSuffix,
5✔
281
        }
5✔
282

5✔
283
        var accountKey string
5✔
284
        accountName := account
5✔
285
        secrets := req.GetSecrets()
5✔
286
        if len(secrets) == 0 && accountName == "" {
6✔
287
                if v, ok := d.volMap.Load(volName); ok {
1✔
288
                        accountName = v.(string)
×
289
                } else {
1✔
290
                        lockKey := fmt.Sprintf("%s%s%s%s%s%v", storageAccountType, accountKind, resourceGroup, location, protocol, createPrivateEndpoint)
1✔
291
                        // search in cache first
1✔
292
                        cache, err := d.accountSearchCache.Get(lockKey, azcache.CacheReadTypeDefault)
1✔
293
                        if err != nil {
1✔
294
                                return nil, status.Errorf(codes.Internal, err.Error())
×
295
                        }
×
296
                        if cache != nil {
1✔
297
                                accountName = cache.(string)
×
298
                        } else {
1✔
299
                                d.volLockMap.LockEntry(lockKey)
1✔
300
                                err = wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
2✔
301
                                        var retErr error
1✔
302
                                        accountName, accountKey, retErr = d.cloud.EnsureStorageAccount(ctx, accountOptions, protocol)
1✔
303
                                        if isRetriableError(retErr) {
1✔
304
                                                klog.Warningf("EnsureStorageAccount(%s) failed with error(%v), waiting for retrying", account, retErr)
×
305
                                                return false, nil
×
306
                                        }
×
307
                                        return true, retErr
1✔
308
                                })
309
                                d.volLockMap.UnlockEntry(lockKey)
1✔
310
                                if err != nil {
2✔
311
                                        return nil, status.Errorf(codes.Internal, "ensure storage account failed with %v", err)
1✔
312
                                }
1✔
313
                                d.accountSearchCache.Set(lockKey, accountName)
×
314
                                d.volMap.Store(volName, accountName)
×
315
                        }
316
                }
317
        }
318

319
        if createPrivateEndpoint && protocol == NFS {
4✔
320
                // As for blobfuse/blobfuse2, serverName, i.e.,AZURE_STORAGE_BLOB_ENDPOINT env variable can't include
×
321
                // "privatelink", issue: https://github.com/Azure/azure-storage-fuse/issues/1014
×
322
                //
×
323
                // And use public endpoint will be befine to blobfuse/blobfuse2, because it will be resolved to private endpoint
×
324
                // by private dns zone, which includes CNAME record, documented here:
×
325
                // https://learn.microsoft.com/en-us/azure/storage/common/storage-private-endpoints?toc=%2Fazure%2Fstorage%2Fblobs%2Ftoc.json&bc=%2Fazure%2Fstorage%2Fblobs%2Fbreadcrumb%2Ftoc.json#dns-changes-for-private-endpoints
×
326
                setKeyValueInMap(parameters, serverNameField, fmt.Sprintf("%s.privatelink.blob.%s", accountName, storageEndpointSuffix))
×
327
        }
×
328

329
        accountOptions.Name = accountName
4✔
330
        if len(secrets) == 0 && useDataPlaneAPI {
5✔
331
                if accountKey == "" {
2✔
332
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
2✔
333
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
334
                        }
1✔
335
                }
336
                secrets = createStorageAccountSecret(accountName, accountKey)
×
337
        }
338

339
        // replace pv/pvc name namespace metadata in subDir
340
        containerName = replaceWithMap(containerName, containerNameReplaceMap)
3✔
341
        validContainerName := containerName
3✔
342
        if validContainerName == "" {
3✔
343
                validContainerName = volName
×
344
                if containerNamePrefix != "" {
×
345
                        validContainerName = containerNamePrefix + "-" + volName
×
346
                }
×
347
                validContainerName = getValidContainerName(validContainerName, protocol)
×
348
                setKeyValueInMap(parameters, containerNameField, validContainerName)
×
349
        }
350

351
        var volumeID string
3✔
352
        mc := metrics.NewMetricContext(blobCSIDriverName, "controller_create_volume", d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
3✔
353
        isOperationSucceeded := false
3✔
354
        defer func() {
6✔
355
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
3✔
356
        }()
3✔
357

358
        klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
3✔
359
        if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
4✔
360
                return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
1✔
361
        }
1✔
362

363
        if storeAccountKey && len(req.GetSecrets()) == 0 {
4✔
364
                if accountKey == "" {
4✔
365
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
3✔
366
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
367
                        }
1✔
368
                }
369

370
                secretName, err := setAzureCredentials(d.cloud.KubeClient, accountName, accountKey, secretNamespace)
1✔
371
                if err != nil {
1✔
372
                        return nil, status.Errorf(codes.Internal, "failed to store storage account key: %v", err)
×
373
                }
×
374
                if secretName != "" {
1✔
375
                        klog.V(2).Infof("store account key to k8s secret(%v) in %s namespace", secretName, secretNamespace)
×
376
                }
×
377
        }
378

379
        var uuid string
1✔
380
        if containerName != "" {
2✔
381
                // add volume name as suffix to differentiate volumeID since "containerName" is specified
1✔
382
                // not necessary for dynamic container name creation since volumeID already contains volume name
1✔
383
                uuid = volName
1✔
384
        }
1✔
385
        volumeID = fmt.Sprintf(volumeIDTemplate, resourceGroup, accountName, validContainerName, uuid, secretNamespace, subsID)
1✔
386
        klog.V(2).Infof("create container %s on storage account %s successfully", validContainerName, accountName)
1✔
387

1✔
388
        if useDataPlaneAPI {
1✔
389
                d.dataPlaneAPIVolCache.Set(volumeID, "")
×
390
                d.dataPlaneAPIVolCache.Set(accountName, "")
×
391
        }
×
392

393
        isOperationSucceeded = true
1✔
394
        // reset secretNamespace field in VolumeContext
1✔
395
        setKeyValueInMap(parameters, secretNamespaceField, secretNamespace)
1✔
396
        return &csi.CreateVolumeResponse{
1✔
397
                Volume: &csi.Volume{
1✔
398
                        VolumeId:      volumeID,
1✔
399
                        CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
1✔
400
                        VolumeContext: parameters,
1✔
401
                },
1✔
402
        }, nil
1✔
403
}
404

405
// DeleteVolume delete a volume
406
func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
6✔
407
        volumeID := req.GetVolumeId()
6✔
408
        if len(volumeID) == 0 {
7✔
409
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
410
        }
1✔
411

412
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
6✔
413
                return nil, status.Errorf(codes.Internal, "invalid delete volume req: %v", req)
1✔
414
        }
1✔
415

416
        if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
4✔
417
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
×
418
        }
×
419
        defer d.volumeLocks.Release(volumeID)
4✔
420

4✔
421
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
4✔
422
        if err != nil {
5✔
423
                // According to CSI Driver Sanity Tester, should succeed when an invalid volume id is used
1✔
424
                klog.Errorf("GetContainerInfo(%s) in DeleteVolume failed with error: %v", volumeID, err)
1✔
425
                return &csi.DeleteVolumeResponse{}, nil
1✔
426
        }
1✔
427

428
        secrets := req.GetSecrets()
3✔
429
        if len(secrets) == 0 && d.useDataPlaneAPI(volumeID, accountName) {
4✔
430
                _, accountName, accountKey, _, _, err := d.GetAuthEnv(ctx, volumeID, "", nil, secrets)
1✔
431
                if err != nil {
2✔
432
                        return nil, status.Errorf(codes.Internal, "GetAuthEnv(%s) failed with %v", volumeID, err)
1✔
433
                }
1✔
434
                if accountName != "" && accountKey != "" {
×
435
                        secrets = createStorageAccountSecret(accountName, accountKey)
×
436
                }
×
437
        }
438

439
        mc := metrics.NewMetricContext(blobCSIDriverName, "controller_delete_volume", d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
2✔
440
        isOperationSucceeded := false
2✔
441
        defer func() {
4✔
442
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
2✔
443
        }()
2✔
444

445
        if resourceGroupName == "" {
3✔
446
                resourceGroupName = d.cloud.ResourceGroup
1✔
447
        }
1✔
448
        klog.V(2).Infof("deleting container(%s) rg(%s) account(%s) volumeID(%s)", containerName, resourceGroupName, accountName, volumeID)
2✔
449
        if err := d.DeleteBlobContainer(ctx, subsID, resourceGroupName, accountName, containerName, secrets); err != nil {
4✔
450
                return nil, status.Errorf(codes.Internal, "failed to delete container(%s) under rg(%s) account(%s) volumeID(%s), error: %v", containerName, resourceGroupName, accountName, volumeID, err)
2✔
451
        }
2✔
452

453
        isOperationSucceeded = true
×
454
        klog.V(2).Infof("container(%s) under rg(%s) account(%s) volumeID(%s) is deleted successfully", containerName, resourceGroupName, accountName, volumeID)
×
455
        return &csi.DeleteVolumeResponse{}, nil
×
456
}
457

458
// ValidateVolumeCapabilities return the capabilities of the volume
459
func (d *Driver) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
8✔
460
        volumeID := req.GetVolumeId()
8✔
461
        if len(volumeID) == 0 {
9✔
462
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
463
        }
1✔
464
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
9✔
465
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
466
        }
2✔
467

468
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
5✔
469
        if err != nil {
6✔
470
                klog.Errorf("GetContainerInfo(%s) in ValidateVolumeCapabilities failed with error: %v", volumeID, err)
1✔
471
                return nil, status.Error(codes.NotFound, err.Error())
1✔
472
        }
1✔
473

474
        var exist bool
4✔
475
        secrets := req.GetSecrets()
4✔
476
        if len(secrets) > 0 {
5✔
477
                container, err := getContainerReference(containerName, secrets, d.cloud.Environment)
1✔
478
                if err != nil {
2✔
479
                        return nil, status.Error(codes.Internal, err.Error())
1✔
480
                }
1✔
481
                exist, err = container.Exists()
×
482
                if err != nil {
×
483
                        return nil, status.Error(codes.Internal, err.Error())
×
484
                }
×
485
        } else {
3✔
486
                if resourceGroupName == "" {
3✔
487
                        resourceGroupName = d.cloud.ResourceGroup
×
488
                }
×
489
                blobContainer, retryErr := d.cloud.BlobClient.GetContainer(ctx, subsID, resourceGroupName, accountName, containerName)
3✔
490
                err = retryErr.Error()
3✔
491
                if err != nil {
4✔
492
                        return nil, status.Error(codes.Internal, err.Error())
1✔
493
                }
1✔
494
                if blobContainer.ContainerProperties == nil {
3✔
495
                        return nil, status.Errorf(codes.Internal, "ContainerProperties of volume(%s) is nil", volumeID)
1✔
496
                }
1✔
497
                exist = blobContainer.ContainerProperties.Deleted != nil && !*blobContainer.ContainerProperties.Deleted
1✔
498
        }
499
        if !exist {
2✔
500
                return nil, status.Errorf(codes.NotFound, "requested volume(%s) does not exist", volumeID)
1✔
501
        }
1✔
502
        klog.V(2).Infof("ValidateVolumeCapabilities on volume(%s) succeeded", volumeID)
×
503

×
504
        // blob driver supports all AccessModes, no need to check capabilities here
×
505
        return &csi.ValidateVolumeCapabilitiesResponse{
×
506
                Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
×
507
                        VolumeCapabilities: req.GetVolumeCapabilities(),
×
508
                },
×
509
                Message: "",
×
510
        }, nil
×
511
}
512

513
func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
1✔
514
        return nil, status.Error(codes.Unimplemented, "ControllerPublishVolume is not yet implemented")
1✔
515
}
1✔
516

517
func (d *Driver) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
1✔
518
        return nil, status.Error(codes.Unimplemented, "ControllerUnpublishVolume is not yet implemented")
1✔
519
}
1✔
520

521
// ControllerGetVolume get volume
522
func (d *Driver) ControllerGetVolume(context.Context, *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
1✔
523
        return nil, status.Error(codes.Unimplemented, "ControllerGetVolume is not yet implemented")
1✔
524
}
1✔
525

526
// GetCapacity returns the capacity of the total available storage pool
527
func (d *Driver) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
1✔
528
        return nil, status.Error(codes.Unimplemented, "GetCapacity is not yet implemented")
1✔
529
}
1✔
530

531
// ListVolumes return all available volumes
532
func (d *Driver) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
1✔
533
        return nil, status.Error(codes.Unimplemented, "ListVolumes is not yet implemented")
1✔
534
}
1✔
535

536
// CreateSnapshot create snapshot
537
func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
1✔
538
        return nil, status.Error(codes.Unimplemented, "CreateSnapshot is not yet implemented")
1✔
539
}
1✔
540

541
// DeleteSnapshot delete snapshot
542
func (d *Driver) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
1✔
543
        return nil, status.Error(codes.Unimplemented, "DeleteSnapshot is not yet implemented")
1✔
544
}
1✔
545

546
// ListSnapshots list snapshots
547
func (d *Driver) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
1✔
548
        return nil, status.Error(codes.Unimplemented, "ListSnapshots is not yet implemented")
1✔
549
}
1✔
550

551
// ControllerGetCapabilities returns the capabilities of the Controller plugin
552
func (d *Driver) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
1✔
553
        return &csi.ControllerGetCapabilitiesResponse{
1✔
554
                Capabilities: d.Cap,
1✔
555
        }, nil
1✔
556
}
1✔
557

558
// ControllerExpandVolume controller expand volume
559
func (d *Driver) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
5✔
560
        if len(req.GetVolumeId()) == 0 {
6✔
561
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
562
        }
1✔
563

564
        if req.GetCapacityRange() == nil {
5✔
565
                return nil, status.Error(codes.InvalidArgument, "Capacity Range missing in request")
1✔
566
        }
1✔
567

568
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_EXPAND_VOLUME); err != nil {
4✔
569
                return nil, status.Errorf(codes.Internal, "invalid expand volume req: %v", req)
1✔
570
        }
1✔
571

572
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
2✔
573
        requestGiB := int64(util.RoundUpGiB(volSizeBytes))
2✔
574

2✔
575
        if volSizeBytes > containerMaxSize {
3✔
576
                return nil, status.Errorf(codes.OutOfRange, "required bytes (%d) exceeds the maximum supported bytes (%d)", volSizeBytes, containerMaxSize)
1✔
577
        }
1✔
578

579
        klog.V(2).Infof("ControllerExpandVolume(%s) successfully, currentQuota: %d Gi", req.VolumeId, requestGiB)
1✔
580

1✔
581
        return &csi.ControllerExpandVolumeResponse{CapacityBytes: req.GetCapacityRange().GetRequiredBytes()}, nil
1✔
582
}
583

584
// CreateBlobContainer creates a blob container
585
func (d *Driver) CreateBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
9✔
586
        if containerName == "" {
10✔
587
                return fmt.Errorf("containerName is empty")
1✔
588
        }
1✔
589
        return wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
16✔
590
                var err error
8✔
591
                if len(secrets) > 0 {
9✔
592
                        container, getErr := getContainerReference(containerName, secrets, d.cloud.Environment)
1✔
593
                        if getErr != nil {
2✔
594
                                return true, getErr
1✔
595
                        }
1✔
596
                        _, err = container.CreateIfNotExists(&azstorage.CreateContainerOptions{Access: azstorage.ContainerAccessTypePrivate})
×
597
                } else {
7✔
598
                        blobContainer := storage.BlobContainer{
7✔
599
                                ContainerProperties: &storage.ContainerProperties{
7✔
600
                                        PublicAccess: storage.PublicAccessNone,
7✔
601
                                },
7✔
602
                        }
7✔
603
                        err = d.cloud.BlobClient.CreateContainer(ctx, subsID, resourceGroupName, accountName, containerName, blobContainer).Error()
7✔
604
                }
7✔
605
                if err != nil {
11✔
606
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
4✔
607
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) {
7✔
608
                                klog.Warningf("CreateContainer(%s, %s, %s) failed with error(%v), retry", resourceGroupName, accountName, containerName, err)
3✔
609
                                return false, nil
3✔
610
                        }
3✔
611
                }
612
                return true, err
4✔
613
        })
614
}
615

616
// DeleteBlobContainer deletes a blob container
617
func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
8✔
618
        if containerName == "" {
9✔
619
                return fmt.Errorf("containerName is empty")
1✔
620
        }
1✔
621
        return wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
14✔
622
                var err error
7✔
623
                if len(secrets) > 0 {
10✔
624
                        container, getErr := getContainerReference(containerName, secrets, d.cloud.Environment)
3✔
625
                        if getErr != nil {
6✔
626
                                return true, getErr
3✔
627
                        }
3✔
628
                        _, err = container.DeleteIfExists(nil)
×
629
                } else {
4✔
630
                        err = d.cloud.BlobClient.DeleteContainer(ctx, subsID, resourceGroupName, accountName, containerName).Error()
4✔
631
                }
4✔
632
                if err != nil {
7✔
633
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
3✔
634
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) ||
3✔
635
                                strings.Contains(err.Error(), statusCodeNotFound) ||
3✔
636
                                strings.Contains(err.Error(), httpCodeNotFound) {
5✔
637
                                klog.Warningf("delete container(%s) on account(%s) failed with error(%v), return as success", containerName, accountName, err)
2✔
638
                                return true, nil
2✔
639
                        }
2✔
640
                        return false, fmt.Errorf("failed to delete container(%s) on account(%s), error: %w", containerName, accountName, err)
1✔
641
                }
642
                return true, err
1✔
643
        })
644
}
645

646
// isValidVolumeCapabilities validates the given VolumeCapability array is valid
647
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
25✔
648
        if len(volCaps) == 0 {
27✔
649
                return fmt.Errorf("volume capabilities missing in request")
2✔
650
        }
2✔
651
        for _, c := range volCaps {
46✔
652
                if c.GetBlock() != nil {
25✔
653
                        return fmt.Errorf("block volume capability not supported")
2✔
654
                }
2✔
655
        }
656
        return nil
21✔
657
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc