• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / azuredisk-csi-driver / 15660492627

15 Jun 2025 06:44AM UTC coverage: 59.438% (-0.05%) from 59.486%
15660492627

Pull #3140

github

andyzhangx
test: fix golint errors
Pull Request #3140: [release-1.30] test: fix golint errors

6 of 6 new or added lines in 2 files covered. (100.0%)

3 existing lines in 1 file now uncovered.

3725 of 6267 relevant lines covered (59.44%)

5.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.73
/pkg/azuredisk/azuredisk.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package azuredisk
18

19
import (
20
        "context"
21
        "encoding/json"
22
        "errors"
23
        "fmt"
24
        "reflect"
25
        "strconv"
26
        "strings"
27
        "time"
28

29
        "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v5"
30
        "github.com/container-storage-interface/spec/lib/go/csi"
31
        "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
32

33
        "google.golang.org/grpc"
34
        "google.golang.org/grpc/codes"
35
        "google.golang.org/grpc/status"
36

37
        corev1 "k8s.io/api/core/v1"
38
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39
        "k8s.io/apimachinery/pkg/types"
40
        k8stypes "k8s.io/apimachinery/pkg/types"
41
        "k8s.io/apimachinery/pkg/util/wait"
42
        "k8s.io/client-go/kubernetes"
43
        clientset "k8s.io/client-go/kubernetes"
44
        "k8s.io/klog/v2"
45
        "k8s.io/kubernetes/pkg/volume/util/hostutil"
46
        "k8s.io/mount-utils"
47
        "k8s.io/utils/pointer"
48

49
        consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
50
        "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
51
        csicommon "sigs.k8s.io/azuredisk-csi-driver/pkg/csi-common"
52
        "sigs.k8s.io/azuredisk-csi-driver/pkg/mounter"
53
        "sigs.k8s.io/azuredisk-csi-driver/pkg/optimization"
54
        volumehelper "sigs.k8s.io/azuredisk-csi-driver/pkg/util"
55
        "sigs.k8s.io/cloud-provider-azure/pkg/azclient"
56
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
57
        azurecloudconsts "sigs.k8s.io/cloud-provider-azure/pkg/consts"
58
        azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
59
)
60

61
var (
62
        // taintRemovalInitialDelay is the initial delay for node taint removal
63
        taintRemovalInitialDelay = 1 * time.Second
64
        // taintRemovalBackoff is the exponential backoff configuration for node taint removal
65
        taintRemovalBackoff = wait.Backoff{
66
                Duration: 500 * time.Millisecond,
67
                Factor:   2,
68
                Steps:    10, // Max delay = 0.5 * 2^9 = ~4 minutes
69
        }
70
)
71

72
// CSIDriver defines the interface for a CSI driver.
73
type CSIDriver interface {
74
        csi.ControllerServer
75
        csi.NodeServer
76
        csi.IdentityServer
77

78
        Run(ctx context.Context) error
79
}
80

81
type hostUtil interface {
82
        PathIsDevice(string) (bool, error)
83
}
84

85
// DriverCore contains fields common to both the V1 and V2 driver, and implements all interfaces of CSI drivers
86
type DriverCore struct {
87
        csicommon.CSIDriver
88
        perfOptimizationEnabled      bool
89
        cloudConfigSecretName        string
90
        cloudConfigSecretNamespace   string
91
        customUserAgent              string
92
        userAgentSuffix              string
93
        cloud                        *azure.Cloud
94
        clientFactory                azclient.ClientFactory
95
        diskController               *ManagedDiskController
96
        mounter                      *mount.SafeFormatAndMount
97
        deviceHelper                 optimization.Interface
98
        nodeInfo                     *optimization.NodeInfo
99
        ioHandler                    azureutils.IOHandler
100
        hostUtil                     hostUtil
101
        useCSIProxyGAInterface       bool
102
        enableDiskOnlineResize       bool
103
        allowEmptyCloudConfig        bool
104
        enableListVolumes            bool
105
        enableListSnapshots          bool
106
        supportZone                  bool
107
        getNodeInfoFromLabels        bool
108
        enableDiskCapacityCheck      bool
109
        disableUpdateCache           bool
110
        enableTrafficManager         bool
111
        trafficManagerPort           int64
112
        vmssCacheTTLInSeconds        int64
113
        volStatsCacheExpireInMinutes int64
114
        attachDetachInitialDelayInMs int64
115
        getDiskTimeoutInSeconds      int64
116
        vmType                       string
117
        enableWindowsHostProcess     bool
118
        getNodeIDFromIMDS            bool
119
        enableOtelTracing            bool
120
        shouldWaitForSnapshotReady   bool
121
        checkDiskLUNCollision        bool
122
        checkDiskCountForBatching    bool
123
        forceDetachBackoff           bool
124
        waitForDetach                bool
125
        endpoint                     string
126
        disableAVSetNodes            bool
127
        removeNotReadyTaint          bool
128
        kubeClient                   kubernetes.Interface
129
        // a timed cache storing volume stats <volumeID, volumeStats>
130
        volStatsCache           azcache.Resource
131
        maxConcurrentFormat     int64
132
        concurrentFormatTimeout int64
133
}
134

135
// Driver is the v1 implementation of the Azure Disk CSI Driver.
136
type Driver struct {
137
        DriverCore
138
        volumeLocks *volumehelper.VolumeLocks
139
        // a timed cache for throttling
140
        throttlingCache azcache.Resource
141
        // a timed cache for disk lun collision check throttling
142
        checkDiskLunThrottlingCache azcache.Resource
143
}
144

145
// newDriverV1 Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &
146
// does not support optional driver plugin info manifest field. Refer to CSI spec for more details.
147
func newDriverV1(options *DriverOptions) *Driver {
3✔
148
        driver := Driver{}
3✔
149
        driver.Name = options.DriverName
3✔
150
        driver.Version = driverVersion
3✔
151
        driver.NodeID = options.NodeID
3✔
152
        driver.VolumeAttachLimit = options.VolumeAttachLimit
3✔
153
        driver.ReservedDataDiskSlotNum = options.ReservedDataDiskSlotNum
3✔
154
        driver.perfOptimizationEnabled = options.EnablePerfOptimization
3✔
155
        driver.cloudConfigSecretName = options.CloudConfigSecretName
3✔
156
        driver.cloudConfigSecretNamespace = options.CloudConfigSecretNamespace
3✔
157
        driver.customUserAgent = options.CustomUserAgent
3✔
158
        driver.userAgentSuffix = options.UserAgentSuffix
3✔
159
        driver.useCSIProxyGAInterface = options.UseCSIProxyGAInterface
3✔
160
        driver.enableDiskOnlineResize = options.EnableDiskOnlineResize
3✔
161
        driver.allowEmptyCloudConfig = options.AllowEmptyCloudConfig
3✔
162
        driver.enableListVolumes = options.EnableListVolumes
3✔
163
        driver.enableListSnapshots = options.EnableListVolumes
3✔
164
        driver.supportZone = options.SupportZone
3✔
165
        driver.getNodeInfoFromLabels = options.GetNodeInfoFromLabels
3✔
166
        driver.enableDiskCapacityCheck = options.EnableDiskCapacityCheck
3✔
167
        driver.disableUpdateCache = options.DisableUpdateCache
3✔
168
        driver.attachDetachInitialDelayInMs = options.AttachDetachInitialDelayInMs
3✔
169
        driver.enableTrafficManager = options.EnableTrafficManager
3✔
170
        driver.trafficManagerPort = options.TrafficManagerPort
3✔
171
        driver.vmssCacheTTLInSeconds = options.VMSSCacheTTLInSeconds
3✔
172
        driver.volStatsCacheExpireInMinutes = options.VolStatsCacheExpireInMinutes
3✔
173
        driver.getDiskTimeoutInSeconds = options.GetDiskTimeoutInSeconds
3✔
174
        driver.vmType = options.VMType
3✔
175
        driver.enableWindowsHostProcess = options.EnableWindowsHostProcess
3✔
176
        driver.getNodeIDFromIMDS = options.GetNodeIDFromIMDS
3✔
177
        driver.enableOtelTracing = options.EnableOtelTracing
3✔
178
        driver.shouldWaitForSnapshotReady = options.WaitForSnapshotReady
3✔
179
        driver.checkDiskLUNCollision = options.CheckDiskLUNCollision
3✔
180
        driver.checkDiskCountForBatching = options.CheckDiskCountForBatching
3✔
181
        driver.forceDetachBackoff = options.ForceDetachBackoff
3✔
182
        driver.waitForDetach = options.WaitForDetach
3✔
183
        driver.endpoint = options.Endpoint
3✔
184
        driver.disableAVSetNodes = options.DisableAVSetNodes
3✔
185
        driver.removeNotReadyTaint = options.RemoveNotReadyTaint
3✔
186
        driver.maxConcurrentFormat = options.MaxConcurrentFormat
3✔
187
        driver.concurrentFormatTimeout = options.ConcurrentFormatTimeout
3✔
188
        driver.volumeLocks = volumehelper.NewVolumeLocks()
3✔
189
        driver.ioHandler = azureutils.NewOSIOHandler()
3✔
190
        driver.hostUtil = hostutil.NewHostUtil()
3✔
191

3✔
192
        if driver.NodeID == "" {
6✔
193
                // nodeid is not needed in controller component
3✔
194
                klog.Warning("nodeid is empty")
3✔
195
        }
3✔
196
        topologyKey = fmt.Sprintf("topology.%s/zone", driver.Name)
3✔
197

3✔
198
        getter := func(_ string) (interface{}, error) { return nil, nil }
3✔
199
        var err error
3✔
200
        if driver.throttlingCache, err = azcache.NewTimedCache(5*time.Minute, getter, false); err != nil {
3✔
201
                klog.Fatalf("%v", err)
×
202
        }
×
203
        if driver.checkDiskLunThrottlingCache, err = azcache.NewTimedCache(30*time.Minute, getter, false); err != nil {
3✔
204
                klog.Fatalf("%v", err)
×
205
        }
×
206

207
        if options.VolStatsCacheExpireInMinutes <= 0 {
6✔
208
                options.VolStatsCacheExpireInMinutes = 10 // default expire in 10 minutes
3✔
209
        }
3✔
210
        if driver.volStatsCache, err = azcache.NewTimedCache(time.Duration(options.VolStatsCacheExpireInMinutes)*time.Minute, getter, false); err != nil {
3✔
211
                klog.Fatalf("%v", err)
×
212
        }
×
213

214
        userAgent := GetUserAgent(driver.Name, driver.customUserAgent, driver.userAgentSuffix)
3✔
215
        klog.V(2).Infof("driver userAgent: %s", userAgent)
3✔
216

3✔
217
        kubeClient, err := azureutils.GetKubeClient(options.Kubeconfig)
3✔
218
        if err != nil {
6✔
219
                klog.Warningf("get kubeconfig(%s) failed with error: %v", options.Kubeconfig, err)
3✔
220
        }
3✔
221
        driver.kubeClient = kubeClient
3✔
222

3✔
223
        cloud, err := azureutils.GetCloudProviderFromClient(context.Background(), kubeClient, driver.cloudConfigSecretName, driver.cloudConfigSecretNamespace,
3✔
224
                userAgent, driver.allowEmptyCloudConfig, driver.enableTrafficManager, driver.trafficManagerPort)
3✔
225
        if err != nil {
3✔
226
                klog.Fatalf("failed to get Azure Cloud Provider, error: %v", err)
×
227
        }
×
228
        driver.cloud = cloud
3✔
229

3✔
230
        if driver.cloud != nil {
6✔
231
                driver.clientFactory = driver.cloud.ComputeClientFactory
3✔
232
                if driver.vmType != "" {
5✔
233
                        klog.V(2).Infof("override VMType(%s) in cloud config as %s", driver.cloud.VMType, driver.vmType)
2✔
234
                        driver.cloud.VMType = driver.vmType
2✔
235
                }
2✔
236

237
                if driver.NodeID == "" {
6✔
238
                        // Disable UseInstanceMetadata for controller to mitigate a timeout issue using IMDS
3✔
239
                        // https://github.com/kubernetes-sigs/azuredisk-csi-driver/issues/168
3✔
240
                        klog.V(2).Infof("disable UseInstanceMetadata for controller")
3✔
241
                        driver.cloud.Config.UseInstanceMetadata = false
3✔
242

3✔
243
                        if driver.cloud.VMType == azurecloudconsts.VMTypeStandard && driver.cloud.DisableAvailabilitySetNodes {
3✔
244
                                klog.V(2).Infof("set DisableAvailabilitySetNodes as false since VMType is %s", driver.cloud.VMType)
×
245
                                driver.cloud.DisableAvailabilitySetNodes = false
×
246
                        }
×
247

248
                        if driver.cloud.VMType == azurecloudconsts.VMTypeVMSS && !driver.cloud.DisableAvailabilitySetNodes && driver.disableAVSetNodes {
3✔
249
                                klog.V(2).Infof("DisableAvailabilitySetNodes for controller since current VMType is vmss")
×
250
                                driver.cloud.DisableAvailabilitySetNodes = true
×
251
                        }
×
252
                        klog.V(2).Infof("cloud: %s, location: %s, rg: %s, VMType: %s, PrimaryScaleSetName: %s, PrimaryAvailabilitySetName: %s, DisableAvailabilitySetNodes: %v", driver.cloud.Cloud, driver.cloud.Location, driver.cloud.ResourceGroup, driver.cloud.VMType, driver.cloud.PrimaryScaleSetName, driver.cloud.PrimaryAvailabilitySetName, driver.cloud.DisableAvailabilitySetNodes)
3✔
253
                }
254

255
                if driver.vmssCacheTTLInSeconds > 0 {
5✔
256
                        klog.V(2).Infof("reset vmssCacheTTLInSeconds as %d", driver.vmssCacheTTLInSeconds)
2✔
257
                        driver.cloud.VMCacheTTLInSeconds = int(driver.vmssCacheTTLInSeconds)
2✔
258
                        driver.cloud.VmssCacheTTLInSeconds = int(driver.vmssCacheTTLInSeconds)
2✔
259
                }
2✔
260

261
                driver.diskController = NewManagedDiskController(driver.cloud)
3✔
262
                driver.diskController.DisableUpdateCache = driver.disableUpdateCache
3✔
263
                driver.diskController.AttachDetachInitialDelayInMs = int(driver.attachDetachInitialDelayInMs)
3✔
264
                driver.diskController.ForceDetachBackoff = driver.forceDetachBackoff
3✔
265
                driver.diskController.WaitForDetach = driver.waitForDetach
3✔
266
                driver.diskController.CheckDiskCountForBatching = driver.checkDiskCountForBatching
3✔
267
        }
268

269
        driver.deviceHelper = optimization.NewSafeDeviceHelper()
3✔
270

3✔
271
        if driver.getPerfOptimizationEnabled() {
5✔
272
                driver.nodeInfo, err = optimization.NewNodeInfo(context.TODO(), driver.getCloud(), driver.NodeID)
2✔
273
                if err != nil {
4✔
274
                        klog.Warningf("Failed to get node info. Error: %v", err)
2✔
275
                }
2✔
276
        }
277

278
        driver.mounter, err = mounter.NewSafeMounter(driver.enableWindowsHostProcess, driver.useCSIProxyGAInterface, int(driver.maxConcurrentFormat), time.Duration(driver.concurrentFormatTimeout)*time.Second)
3✔
279
        if err != nil {
3✔
280
                klog.Fatalf("Failed to get safe mounter. Error: %v", err)
×
281
        }
×
282

283
        controllerCap := []csi.ControllerServiceCapability_RPC_Type{
3✔
284
                csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
3✔
285
                csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
3✔
286
                csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
3✔
287
                csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
3✔
288
                csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
3✔
289
                csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
3✔
290
                csi.ControllerServiceCapability_RPC_MODIFY_VOLUME,
3✔
291
        }
3✔
292
        if driver.enableListVolumes {
5✔
293
                controllerCap = append(controllerCap, csi.ControllerServiceCapability_RPC_LIST_VOLUMES, csi.ControllerServiceCapability_RPC_LIST_VOLUMES_PUBLISHED_NODES)
2✔
294
        }
2✔
295
        if driver.enableListSnapshots {
5✔
296
                controllerCap = append(controllerCap, csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS)
2✔
297
        }
2✔
298

299
        driver.AddControllerServiceCapabilities(controllerCap)
3✔
300
        driver.AddVolumeCapabilityAccessModes(
3✔
301
                []csi.VolumeCapability_AccessMode_Mode{
3✔
302
                        csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
3✔
303
                        csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY,
3✔
304
                        csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER,
3✔
305
                        csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER,
3✔
306
                })
3✔
307
        driver.AddNodeServiceCapabilities([]csi.NodeServiceCapability_RPC_Type{
3✔
308
                csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
3✔
309
                csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
3✔
310
                csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
3✔
311
                csi.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
3✔
312
        })
3✔
313

3✔
314
        if kubeClient != nil && driver.removeNotReadyTaint && driver.NodeID != "" {
3✔
315
                // Remove taint from node to indicate driver startup success
×
316
                // This is done at the last possible moment to prevent race conditions or false positive removals
×
317
                time.AfterFunc(taintRemovalInitialDelay, func() {
×
318
                        removeTaintInBackground(kubeClient, driver.NodeID, driver.Name, taintRemovalBackoff, removeNotReadyTaint)
×
319
                })
×
320
        }
321
        return &driver
3✔
322
}
323

324
// Run driver initialization
325
func (d *Driver) Run(ctx context.Context) error {
5✔
326
        versionMeta, err := GetVersionYAML(d.Name)
5✔
327
        if err != nil {
5✔
328
                klog.Fatalf("%v", err)
×
329
        }
×
330
        klog.Infof("\nDRIVER INFORMATION:\n-------------------\n%s\n\nStreaming logs below:", versionMeta)
5✔
331

5✔
332
        grpcInterceptor := grpc.UnaryInterceptor(csicommon.LogGRPC)
5✔
333
        opts := []grpc.ServerOption{
5✔
334
                grpcInterceptor,
5✔
335
        }
5✔
336
        if d.enableOtelTracing {
5✔
337
                exporter, err := InitOtelTracing()
×
338
                if err != nil {
×
339
                        klog.Fatalf("Failed to initialize otel tracing: %v", err)
×
340
                }
×
341
                // Exporter will flush traces on shutdown
342
                defer func() {
×
343
                        if err := exporter.Shutdown(context.Background()); err != nil {
×
344
                                klog.Errorf("Could not shutdown otel exporter: %v", err)
×
345
                        }
×
346
                }()
347
                opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler()))
×
348
        }
349

350
        s := grpc.NewServer(opts...)
5✔
351
        csi.RegisterIdentityServer(s, d)
5✔
352
        csi.RegisterControllerServer(s, d)
5✔
353
        csi.RegisterNodeServer(s, d)
5✔
354

5✔
355
        go func() {
10✔
356
                //graceful shutdown
5✔
357
                <-ctx.Done()
5✔
358
                s.GracefulStop()
5✔
359
        }()
5✔
360
        // Driver d act as IdentityServer, ControllerServer and NodeServer
361
        listener, err := csicommon.Listen(ctx, d.endpoint)
5✔
362
        if err != nil {
5✔
363
                klog.Fatalf("failed to listen to endpoint, error: %v", err)
×
364
        }
×
365
        err = s.Serve(listener)
5✔
366
        if errors.Is(err, grpc.ErrServerStopped) {
5✔
UNCOV
367
                klog.Infof("gRPC server stopped serving")
×
UNCOV
368
                return nil
×
UNCOV
369
        }
×
370
        return err
5✔
371
}
372

373
func (d *Driver) isGetDiskThrottled() bool {
18✔
374
        cache, err := d.throttlingCache.Get(consts.GetDiskThrottlingKey, azcache.CacheReadTypeDefault)
18✔
375
        if err != nil {
18✔
376
                klog.Warningf("throttlingCache(%s) return with error: %s", consts.GetDiskThrottlingKey, err)
×
377
                return false
×
378
        }
×
379
        return cache != nil
18✔
380
}
381

382
func (d *Driver) isCheckDiskLunThrottled() bool {
×
383
        cache, err := d.checkDiskLunThrottlingCache.Get(consts.CheckDiskLunThrottlingKey, azcache.CacheReadTypeDefault)
×
384
        if err != nil {
×
385
                klog.Warningf("throttlingCache(%s) return with error: %s", consts.CheckDiskLunThrottlingKey, err)
×
386
                return false
×
387
        }
×
388
        return cache != nil
×
389
}
390

391
func (d *Driver) checkDiskExists(ctx context.Context, diskURI string) (*armcompute.Disk, error) {
13✔
392
        diskName, err := azureutils.GetDiskName(diskURI)
13✔
393
        if err != nil {
16✔
394
                return nil, err
3✔
395
        }
3✔
396

397
        resourceGroup, err := azureutils.GetResourceGroupFromURI(diskURI)
10✔
398
        if err != nil {
10✔
399
                return nil, err
×
400
        }
×
401

402
        if d.isGetDiskThrottled() {
11✔
403
                klog.Warningf("skip checkDiskExists(%s) since it's still in throttling", diskURI)
1✔
404
                return nil, nil
1✔
405
        }
1✔
406
        subsID := azureutils.GetSubscriptionIDFromURI(diskURI)
9✔
407
        diskClient, err := d.diskController.clientFactory.GetDiskClientForSub(subsID)
9✔
408
        if err != nil {
9✔
409
                return nil, err
×
410
        }
×
411

412
        newCtx, cancel := context.WithTimeout(ctx, time.Duration(d.getDiskTimeoutInSeconds)*time.Second)
9✔
413
        defer cancel()
9✔
414

9✔
415
        disk, err := diskClient.Get(newCtx, resourceGroup, diskName)
9✔
416
        if err != nil {
9✔
417
                return nil, err
×
418
        }
×
419
        return disk, nil
9✔
420
}
421

422
func (d *Driver) checkDiskCapacity(ctx context.Context, subsID, resourceGroup, diskName string, requestGiB int) (bool, error) {
3✔
423
        if d.isGetDiskThrottled() {
4✔
424
                klog.Warningf("skip checkDiskCapacity(%s, %s) since it's still in throttling", resourceGroup, diskName)
1✔
425
                return true, nil
1✔
426
        }
1✔
427
        diskClient, err := d.clientFactory.GetDiskClientForSub(subsID)
2✔
428
        if err != nil {
2✔
429
                return false, err
×
430
        }
×
431
        disk, err := diskClient.Get(ctx, resourceGroup, diskName)
2✔
432
        // Because we can not judge the reason of the error. Maybe the disk does not exist.
2✔
433
        // So here we do not handle the error.
2✔
434
        if err == nil {
4✔
435
                if !reflect.DeepEqual(disk, armcompute.Disk{}) && disk.Properties.DiskSizeGB != nil && int(*disk.Properties.DiskSizeGB) != requestGiB {
3✔
436
                        return false, status.Errorf(codes.AlreadyExists, "the request volume already exists, but its capacity(%v) is different from (%v)", *disk.Properties.DiskSizeGB, requestGiB)
1✔
437
                }
1✔
438
        }
439
        return true, nil
1✔
440
}
441

442
func (d *Driver) getVolumeLocks() *volumehelper.VolumeLocks {
4✔
443
        return d.volumeLocks
4✔
444
}
4✔
445

446
// setControllerCapabilities sets the controller capabilities field. It is intended for use with unit tests.
447
func (d *DriverCore) setControllerCapabilities(caps []*csi.ControllerServiceCapability) {
3✔
448
        d.Cap = caps
3✔
449
}
3✔
450

451
// setNodeCapabilities sets the node capabilities field. It is intended for use with unit tests.
452
func (d *DriverCore) setNodeCapabilities(nodeCaps []*csi.NodeServiceCapability) {
1✔
453
        d.NSCap = nodeCaps
1✔
454
}
1✔
455

456
// setName sets the Name field. It is intended for use with unit tests.
457
func (d *DriverCore) setName(name string) {
1✔
458
        d.Name = name
1✔
459
}
1✔
460

461
// setName sets the NodeId field. It is intended for use with unit tests.
462
func (d *DriverCore) setNodeID(nodeID string) {
1✔
463
        d.NodeID = nodeID
1✔
464
}
1✔
465

466
// setName sets the Version field. It is intended for use with unit tests.
467
func (d *DriverCore) setVersion(version string) {
1✔
468
        d.Version = version
1✔
469
}
1✔
470

471
// getCloud returns the value of the cloud field. It is intended for use with unit tests.
472
func (d *DriverCore) getCloud() *azure.Cloud {
43✔
473
        return d.cloud
43✔
474
}
43✔
475

476
// setCloud sets the cloud field. It is intended for use with unit tests.
477
func (d *DriverCore) setCloud(cloud *azure.Cloud) {
11✔
478
        d.cloud = cloud
11✔
479
}
11✔
480

481
// getMounter returns the value of the mounter field. It is intended for use with unit tests.
482
func (d *DriverCore) getMounter() *mount.SafeFormatAndMount {
4✔
483
        return d.mounter
4✔
484
}
4✔
485

486
// setMounter sets the mounter field. It is intended for use with unit tests.
487
func (d *DriverCore) setMounter(mounter *mount.SafeFormatAndMount) {
28✔
488
        d.mounter = mounter
28✔
489
}
28✔
490

491
// getPerfOptimizationEnabled returns the value of the perfOptimizationEnabled field. It is intended for use with unit tests.
492
func (d *DriverCore) getPerfOptimizationEnabled() bool {
18✔
493
        return d.perfOptimizationEnabled
18✔
494
}
18✔
495

496
// setPerfOptimizationEnabled sets the value of the perfOptimizationEnabled field. It is intended for use with unit tests.
497
func (d *DriverCore) setPerfOptimizationEnabled(enabled bool) {
9✔
498
        d.perfOptimizationEnabled = enabled
9✔
499
}
9✔
500

501
// getDeviceHelper returns the value of the deviceHelper field. It is intended for use with unit tests.
502
func (d *DriverCore) getDeviceHelper() optimization.Interface {
8✔
503
        return d.deviceHelper
8✔
504
}
8✔
505

506
// getNodeInfo returns the value of the nodeInfo field. It is intended for use with unit tests.
507
func (d *DriverCore) getNodeInfo() *optimization.NodeInfo {
2✔
508
        return d.nodeInfo
2✔
509
}
2✔
510

511
func (d *DriverCore) getHostUtil() hostUtil {
11✔
512
        return d.hostUtil
11✔
513
}
11✔
514

515
// getSnapshotCompletionPercent returns the completion percent of snapshot
516
func (d *DriverCore) getSnapshotCompletionPercent(ctx context.Context, subsID, resourceGroup, snapshotName string) (float32, error) {
15✔
517
        snapshotClient, err := d.clientFactory.GetSnapshotClientForSub(subsID)
15✔
518
        if err != nil {
15✔
519
                return 0.0, err
×
520
        }
×
521
        copySnapshot, err := snapshotClient.Get(ctx, resourceGroup, snapshotName)
15✔
522
        if err != nil {
17✔
523
                return 0.0, err
2✔
524
        }
2✔
525

526
        if copySnapshot.Properties == nil || copySnapshot.Properties.CompletionPercent == nil {
14✔
527
                // If CompletionPercent is nil, it means the snapshot is complete
1✔
528
                klog.V(2).Infof("snapshot(%s) under rg(%s) has no SnapshotProperties or CompletionPercent is nil", snapshotName, resourceGroup)
1✔
529
                return 100.0, nil
1✔
530
        }
1✔
531

532
        return *copySnapshot.Properties.CompletionPercent, nil
12✔
533
}
534

535
// waitForSnapshotReady wait for completionPercent of snapshot is 100.0
536
func (d *DriverCore) waitForSnapshotReady(ctx context.Context, subsID, resourceGroup, snapshotName string, intervel, timeout time.Duration) error {
5✔
537
        completionPercent, err := d.getSnapshotCompletionPercent(ctx, subsID, resourceGroup, snapshotName)
5✔
538
        if err != nil {
7✔
539
                return err
2✔
540
        }
2✔
541

542
        if completionPercent >= float32(100.0) {
5✔
543
                klog.V(2).Infof("snapshot(%s) under rg(%s) complete", snapshotName, resourceGroup)
2✔
544
                return nil
2✔
545
        }
2✔
546

547
        timeTick := time.Tick(intervel)
1✔
548
        timeAfter := time.After(timeout)
1✔
549
        for {
12✔
550
                select {
11✔
551
                case <-timeTick:
10✔
552
                        completionPercent, err = d.getSnapshotCompletionPercent(ctx, subsID, resourceGroup, snapshotName)
10✔
553
                        if err != nil {
10✔
554
                                return err
×
555
                        }
×
556

557
                        if completionPercent >= float32(100.0) {
10✔
558
                                klog.V(2).Infof("snapshot(%s) under rg(%s) complete", snapshotName, resourceGroup)
×
559
                                return nil
×
560
                        }
×
561
                        klog.V(2).Infof("snapshot(%s) under rg(%s) completionPercent: %f", snapshotName, resourceGroup, completionPercent)
10✔
562
                case <-timeAfter:
1✔
563
                        return fmt.Errorf("timeout waiting for snapshot(%s) under rg(%s)", snapshotName, resourceGroup)
1✔
564
                }
565
        }
566
}
567

568
// getUsedLunsFromVolumeAttachments returns a list of used luns from VolumeAttachments
569
func (d *DriverCore) getUsedLunsFromVolumeAttachments(ctx context.Context, nodeName string) ([]int, error) {
1✔
570
        kubeClient := d.cloud.KubeClient
1✔
571
        if kubeClient == nil || kubeClient.StorageV1() == nil || kubeClient.StorageV1().VolumeAttachments() == nil {
2✔
572
                return nil, fmt.Errorf("kubeClient or kubeClient.StorageV1() or kubeClient.StorageV1().VolumeAttachments() is nil")
1✔
573
        }
1✔
574

575
        volumeAttachments, err := kubeClient.StorageV1().VolumeAttachments().List(ctx, metav1.ListOptions{
×
576
                TimeoutSeconds: pointer.Int64Ptr(2)})
×
577
        if err != nil {
×
578
                return nil, err
×
579
        }
×
580

581
        usedLuns := make([]int, 0)
×
582
        if volumeAttachments == nil {
×
583
                klog.V(2).Infof("volumeAttachments is nil")
×
584
                return usedLuns, nil
×
585
        }
×
586

587
        klog.V(2).Infof("volumeAttachments count: %d, nodeName: %s", len(volumeAttachments.Items), nodeName)
×
588
        for _, va := range volumeAttachments.Items {
×
589
                klog.V(6).Infof("attacher: %s, nodeName: %s, Status: %v, PV: %s, attachmentMetadata: %v", va.Spec.Attacher, va.Spec.NodeName,
×
590
                        va.Status.Attached, pointer.StringDeref(va.Spec.Source.PersistentVolumeName, ""), va.Status.AttachmentMetadata)
×
591
                if va.Spec.Attacher == d.Name && strings.EqualFold(va.Spec.NodeName, nodeName) && va.Status.Attached {
×
592
                        if k, ok := va.Status.AttachmentMetadata[consts.LUN]; ok {
×
593
                                lun, err := strconv.Atoi(k)
×
594
                                if err != nil {
×
595
                                        klog.Warningf("VolumeAttachment(%s) lun(%s) is not a valid integer", va.Name, k)
×
596
                                        continue
×
597
                                }
598
                                usedLuns = append(usedLuns, lun)
×
599
                        }
600
                }
601
        }
602
        return usedLuns, nil
×
603
}
604

605
// getUsedLunsFromNode returns a list of sorted used luns from Node
606
func (d *DriverCore) getUsedLunsFromNode(nodeName types.NodeName) ([]int, error) {
1✔
607
        disks, _, err := d.diskController.GetNodeDataDisks(nodeName, azcache.CacheReadTypeDefault)
1✔
608
        if err != nil {
1✔
609
                klog.Errorf("error of getting data disks for node %s: %v", nodeName, err)
×
610
                return nil, err
×
611
        }
×
612

613
        usedLuns := make([]int, 0)
1✔
614
        // get all disks attached to the node
1✔
615
        for _, disk := range disks {
3✔
616
                if disk.Lun == nil {
2✔
617
                        klog.Warningf("disk(%s) lun is nil", *disk.Name)
×
618
                        continue
×
619
                }
620
                usedLuns = append(usedLuns, int(*disk.Lun))
2✔
621
        }
622
        return usedLuns, nil
1✔
623
}
624

625
// getNodeInfoFromLabels get zone, instanceType from node labels
626
func getNodeInfoFromLabels(ctx context.Context, nodeName string, kubeClient clientset.Interface) (string, string, error) {
2✔
627
        if kubeClient == nil || kubeClient.CoreV1() == nil {
4✔
628
                return "", "", fmt.Errorf("kubeClient is nil")
2✔
629
        }
2✔
630

631
        node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
×
632
        if err != nil {
×
633
                return "", "", fmt.Errorf("get node(%s) failed with %v", nodeName, err)
×
634
        }
×
635

636
        if len(node.Labels) == 0 {
×
637
                return "", "", fmt.Errorf("node(%s) label is empty", nodeName)
×
638
        }
×
639
        return node.Labels[consts.WellKnownTopologyKey], node.Labels[consts.InstanceTypeKey], nil
×
640
}
641

642
// getDefaultDiskIOPSReadWrite according to requestGiB
643
//
644
//        ref: https://docs.microsoft.com/en-us/azure/virtual-machines/disks-types#ultra-disk-iops
645
func getDefaultDiskIOPSReadWrite(requestGiB int) int {
9✔
646
        iops := azurecloudconsts.DefaultDiskIOPSReadWrite
9✔
647
        if requestGiB > iops {
16✔
648
                iops = requestGiB
7✔
649
        }
7✔
650
        if iops > 160000 {
12✔
651
                iops = 160000
3✔
652
        }
3✔
653
        return iops
9✔
654
}
655

656
// getDefaultDiskMBPSReadWrite according to requestGiB
657
//
658
//        ref: https://docs.microsoft.com/en-us/azure/virtual-machines/disks-types#ultra-disk-throughput
659
func getDefaultDiskMBPSReadWrite(requestGiB int) int {
6✔
660
        bandwidth := azurecloudconsts.DefaultDiskMBpsReadWrite
6✔
661
        iops := getDefaultDiskIOPSReadWrite(requestGiB)
6✔
662
        if iops/256 > bandwidth {
10✔
663
                bandwidth = int(volumehelper.RoundUpSize(int64(iops), 256))
4✔
664
        }
4✔
665
        if bandwidth > iops/4 {
6✔
666
                bandwidth = int(volumehelper.RoundUpSize(int64(iops), 4))
×
667
        }
×
668
        if bandwidth > 4000 {
6✔
669
                bandwidth = 4000
×
670
        }
×
671
        return bandwidth
6✔
672
}
673

674
// getVMSSInstanceName get instance name from vmss compute name, e.g. "aks-agentpool-20657377-vmss_2" -> "aks-agentpool-20657377-vmss000002"
675
func getVMSSInstanceName(computeName string) (string, error) {
5✔
676
        names := strings.Split(computeName, "_")
5✔
677
        if len(names) != 2 {
6✔
678
                return "", fmt.Errorf("invalid vmss compute name: %s", computeName)
1✔
679
        }
1✔
680

681
        instanceID, err := strconv.Atoi(names[1])
4✔
682
        if err != nil {
5✔
683
                return "", fmt.Errorf("parsing vmss compute name(%s) failed with %v", computeName, err)
1✔
684
        }
1✔
685
        return fmt.Sprintf("%s%06s", names[0], strconv.FormatInt(int64(instanceID), 36)), nil
3✔
686
}
687

688
// Struct for JSON patch operations
689
type JSONPatch struct {
690
        OP    string      `json:"op,omitempty"`
691
        Path  string      `json:"path,omitempty"`
692
        Value interface{} `json:"value"`
693
}
694

695
// removeTaintInBackground is a goroutine that retries removeNotReadyTaint with exponential backoff
696
func removeTaintInBackground(k8sClient kubernetes.Interface, nodeName, driverName string, backoff wait.Backoff, removalFunc func(kubernetes.Interface, string, string) error) {
×
697
        backoffErr := wait.ExponentialBackoff(backoff, func() (bool, error) {
×
698
                err := removalFunc(k8sClient, nodeName, driverName)
×
699
                if err != nil {
×
700
                        klog.ErrorS(err, "Unexpected failure when attempting to remove node taint(s)")
×
701
                        return false, nil
×
702
                }
×
703
                return true, nil
×
704
        })
705

706
        if backoffErr != nil {
×
707
                klog.ErrorS(backoffErr, "Retries exhausted, giving up attempting to remove node taint(s)")
×
708
        }
×
709
}
710

711
// removeNotReadyTaint removes the taint disk.csi.azure.com/agent-not-ready from the local node
712
// This taint can be optionally applied by users to prevent startup race conditions such as
713
// https://github.com/kubernetes/kubernetes/issues/95911
714
func removeNotReadyTaint(clientset kubernetes.Interface, nodeName, driverName string) error {
×
715
        ctx := context.Background()
×
716
        node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
×
717
        if err != nil {
×
718
                return err
×
719
        }
×
720

721
        if err := checkAllocatable(ctx, clientset, nodeName, driverName); err != nil {
×
722
                return err
×
723
        }
×
724

725
        taintKeyToRemove := driverName + consts.AgentNotReadyNodeTaintKeySuffix
×
726
        klog.V(2).Infof("removing taint with key %s from local node %s", taintKeyToRemove, nodeName)
×
727
        var taintsToKeep []corev1.Taint
×
728
        for _, taint := range node.Spec.Taints {
×
729
                klog.V(5).Infof("checking taint key %s, value %s, effect %s", taint.Key, taint.Value, taint.Effect)
×
730
                if taint.Key != taintKeyToRemove {
×
731
                        taintsToKeep = append(taintsToKeep, taint)
×
732
                } else {
×
733
                        klog.V(2).Infof("queued taint for removal with key %s, effect %s", taint.Key, taint.Effect)
×
734
                }
×
735
        }
736

737
        if len(taintsToKeep) == len(node.Spec.Taints) {
×
738
                klog.V(2).Infof("No taints to remove on node, skipping taint removal")
×
739
                return nil
×
740
        }
×
741

742
        patchRemoveTaints := []JSONPatch{
×
743
                {
×
744
                        OP:    "test",
×
745
                        Path:  "/spec/taints",
×
746
                        Value: node.Spec.Taints,
×
747
                },
×
748
                {
×
749
                        OP:    "replace",
×
750
                        Path:  "/spec/taints",
×
751
                        Value: taintsToKeep,
×
752
                },
×
753
        }
×
754

×
755
        patch, err := json.Marshal(patchRemoveTaints)
×
756
        if err != nil {
×
757
                return err
×
758
        }
×
759

760
        _, err = clientset.CoreV1().Nodes().Patch(ctx, nodeName, k8stypes.JSONPatchType, patch, metav1.PatchOptions{})
×
761
        if err != nil {
×
762
                return err
×
763
        }
×
764
        klog.V(2).Infof("removed taint with key %s from local node %s successfully", taintKeyToRemove, nodeName)
×
765
        return nil
×
766
}
767

768
func checkAllocatable(ctx context.Context, clientset kubernetes.Interface, nodeName, driverName string) error {
×
769
        csiNode, err := clientset.StorageV1().CSINodes().Get(ctx, nodeName, metav1.GetOptions{})
×
770
        if err != nil {
×
771
                return fmt.Errorf("isAllocatableSet: failed to get CSINode for %s: %w", nodeName, err)
×
772
        }
×
773

774
        for _, driver := range csiNode.Spec.Drivers {
×
775
                if driver.Name == driverName {
×
776
                        if driver.Allocatable != nil && driver.Allocatable.Count != nil {
×
777
                                klog.V(2).Infof("CSINode Allocatable value is set for driver on node %s, count %d", nodeName, *driver.Allocatable.Count)
×
778
                                return nil
×
779
                        }
×
780
                        return fmt.Errorf("isAllocatableSet: allocatable value not set for driver on node %s", nodeName)
×
781
                }
782
        }
783

784
        return fmt.Errorf("isAllocatableSet: driver not found on node %s", nodeName)
×
785
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc