• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 13386305256

18 Feb 2025 08:27AM UTC coverage: 77.861%. Remained the same
13386305256

Pull #1839

github

andyzhangx
feat: optimize azcopy perf in volume cloning scenario
Pull Request #1839: feat: optimize azcopy perf in volume cloning scenario

2286 of 2936 relevant lines covered (77.86%)

7.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.71
/pkg/blob/nodeserver.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package blob
18

19
import (
20
        "fmt"
21
        "io/fs"
22
        "os"
23
        "os/exec"
24
        "path/filepath"
25
        "strconv"
26
        "strings"
27
        "time"
28

29
        volumehelper "sigs.k8s.io/blob-csi-driver/pkg/util"
30
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
31
        "sigs.k8s.io/cloud-provider-azure/pkg/metrics"
32

33
        "github.com/container-storage-interface/spec/lib/go/csi"
34

35
        "k8s.io/klog/v2"
36
        "k8s.io/kubernetes/pkg/volume"
37
        "k8s.io/kubernetes/pkg/volume/util"
38
        mount "k8s.io/mount-utils"
39

40
        "google.golang.org/grpc/codes"
41
        "google.golang.org/grpc/status"
42

43
        "golang.org/x/net/context"
44
        "google.golang.org/grpc"
45
        mount_azure_blob "sigs.k8s.io/blob-csi-driver/pkg/blobfuse-proxy/pb"
46
)
47

48
const (
49
        waitForMountInterval = 20 * time.Millisecond
50
        waitForMountTimeout  = 60 * time.Second
51
)
52

53
type MountClient struct {
54
        service mount_azure_blob.MountServiceClient
55
}
56

57
// NewMountClient returns a new mount client
58
func NewMountClient(cc *grpc.ClientConn) *MountClient {
1✔
59
        service := mount_azure_blob.NewMountServiceClient(cc)
1✔
60
        return &MountClient{service}
1✔
61
}
1✔
62

63
// NodePublishVolume mount the volume from staging to target path
64
func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
8✔
65
        volCap := req.GetVolumeCapability()
8✔
66
        if volCap == nil {
9✔
67
                return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
1✔
68
        }
1✔
69
        volumeID := req.GetVolumeId()
7✔
70
        if len(req.GetVolumeId()) == 0 {
8✔
71
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
72
        }
1✔
73

74
        target := req.GetTargetPath()
6✔
75
        if len(target) == 0 {
7✔
76
                return nil, status.Error(codes.InvalidArgument, "Target path not provided")
1✔
77
        }
1✔
78

79
        mountPermissions := d.mountPermissions
5✔
80
        context := req.GetVolumeContext()
5✔
81
        if context != nil {
7✔
82
                // token request
2✔
83
                if context[serviceAccountTokenField] != "" && getValueInMap(context, clientIDField) != "" {
2✔
84
                        klog.V(2).Infof("NodePublishVolume: volume(%s) mount on %s with service account token, clientID: %s", volumeID, target, getValueInMap(context, clientIDField))
×
85
                        _, err := d.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{
×
86
                                StagingTargetPath: target,
×
87
                                VolumeContext:     context,
×
88
                                VolumeCapability:  volCap,
×
89
                                VolumeId:          volumeID,
×
90
                        })
×
91
                        return &csi.NodePublishVolumeResponse{}, err
×
92
                }
×
93

94
                // ephemeral volume
95
                if strings.EqualFold(context[ephemeralField], trueValue) {
2✔
96
                        setKeyValueInMap(context, secretNamespaceField, context[podNamespaceField])
×
97
                        if !d.allowInlineVolumeKeyAccessWithIdentity {
×
98
                                // only get storage account from secret
×
99
                                setKeyValueInMap(context, getAccountKeyFromSecretField, trueValue)
×
100
                                setKeyValueInMap(context, storageAccountField, "")
×
101
                        }
×
102
                        klog.V(2).Infof("NodePublishVolume: ephemeral volume(%s) mount on %s", volumeID, target)
×
103
                        _, err := d.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{
×
104
                                StagingTargetPath: target,
×
105
                                VolumeContext:     context,
×
106
                                VolumeCapability:  volCap,
×
107
                                VolumeId:          volumeID,
×
108
                        })
×
109
                        return &csi.NodePublishVolumeResponse{}, err
×
110
                }
111

112
                if perm := getValueInMap(context, mountPermissionsField); perm != "" {
4✔
113
                        var err error
2✔
114
                        if mountPermissions, err = strconv.ParseUint(perm, 8, 32); err != nil {
3✔
115
                                return nil, status.Errorf(codes.InvalidArgument, "invalid mountPermissions %s", perm)
1✔
116
                        }
1✔
117
                }
118
        }
119

120
        source := req.GetStagingTargetPath()
4✔
121
        if len(source) == 0 {
5✔
122
                return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
1✔
123
        }
1✔
124

125
        mountOptions := []string{"bind"}
3✔
126
        if req.GetReadonly() {
6✔
127
                mountOptions = append(mountOptions, "ro")
3✔
128
        }
3✔
129

130
        mnt, err := d.ensureMountPoint(target, fs.FileMode(mountPermissions))
3✔
131
        if err != nil {
4✔
132
                return nil, status.Errorf(codes.Internal, "Could not mount target %q: %v", target, err)
1✔
133
        }
1✔
134
        if mnt {
2✔
135
                klog.V(2).Infof("NodePublishVolume: volume %s is already mounted on %s", volumeID, target)
×
136
                return &csi.NodePublishVolumeResponse{}, nil
×
137
        }
×
138

139
        klog.V(2).Infof("NodePublishVolume: volume %s mounting %s at %s with mountOptions: %v", volumeID, source, target, mountOptions)
2✔
140
        if d.enableBlobMockMount {
2✔
141
                klog.Warningf("NodePublishVolume: mock mount on volumeID(%s), this is only for TESTING!!!", volumeID)
×
142
                if err := volumehelper.MakeDir(target, os.FileMode(mountPermissions)); err != nil {
×
143
                        klog.Errorf("MakeDir failed on target: %s (%v)", target, err)
×
144
                        return nil, status.Errorf(codes.Internal, "%v", err)
×
145
                }
×
146
                return &csi.NodePublishVolumeResponse{}, nil
×
147
        }
148

149
        if err := d.mounter.Mount(source, target, "", mountOptions); err != nil {
2✔
150
                if removeErr := os.Remove(target); removeErr != nil {
×
151
                        return nil, status.Errorf(codes.Internal, "Could not remove mount target %q: %v", target, removeErr)
×
152
                }
×
153
                return nil, status.Errorf(codes.Internal, "Could not mount %q at %q: %v", source, target, err)
×
154
        }
155
        klog.V(2).Infof("NodePublishVolume: volume %s mount %s at %s successfully", volumeID, source, target)
2✔
156

2✔
157
        return &csi.NodePublishVolumeResponse{}, nil
2✔
158
}
159

160
func (d *Driver) mountBlobfuseWithProxy(args, protocol string, authEnv []string) (string, error) {
1✔
161
        var resp *mount_azure_blob.MountAzureBlobResponse
1✔
162
        var output string
1✔
163
        connectionTimout := time.Duration(d.blobfuseProxyConnTimout) * time.Second
1✔
164
        ctx, cancel := context.WithTimeout(context.Background(), connectionTimout)
1✔
165
        defer cancel()
1✔
166
        klog.V(2).Infof("start connecting to blobfuse proxy, protocol: %s, args: %s", protocol, args)
1✔
167
        conn, err := grpc.DialContext(ctx, d.blobfuseProxyEndpoint, grpc.WithInsecure(), grpc.WithBlock())
1✔
168
        if err == nil {
1✔
169
                mountClient := NewMountClient(conn)
×
170
                mountreq := mount_azure_blob.MountAzureBlobRequest{
×
171
                        MountArgs: args,
×
172
                        Protocol:  protocol,
×
173
                        AuthEnv:   authEnv,
×
174
                }
×
175
                klog.V(2).Infof("begin to mount with blobfuse proxy, protocol: %s, args: %s", protocol, args)
×
176
                resp, err = mountClient.service.MountAzureBlob(context.TODO(), &mountreq)
×
177
                if err != nil {
×
178
                        klog.Error("GRPC call returned with an error:", err)
×
179
                }
×
180
                output = resp.GetOutput()
×
181
        }
182
        return output, err
1✔
183
}
184

185
func (d *Driver) mountBlobfuseInsideDriver(args string, protocol string, authEnv []string) (string, error) {
1✔
186
        var cmd *exec.Cmd
1✔
187

1✔
188
        args = volumehelper.TrimDuplicatedSpace(args)
1✔
189

1✔
190
        mountLog := "mount inside driver with"
1✔
191
        if protocol == Fuse2 {
1✔
192
                mountLog += " v2"
×
193
                args = "mount " + args
×
194
                cmd = exec.Command("blobfuse2", strings.Split(args, " ")...)
×
195
        } else {
1✔
196
                mountLog += " v1"
1✔
197
                cmd = exec.Command("blobfuse", strings.Split(args, " ")...)
1✔
198
        }
1✔
199
        klog.V(2).Infof("%s, protocol: %s, args: %s", mountLog, protocol, args)
1✔
200

1✔
201
        cmd.Env = append(os.Environ(), authEnv...)
1✔
202
        output, err := cmd.CombinedOutput()
1✔
203
        klog.V(2).Infof("mount output: %s\n", string(output))
1✔
204

1✔
205
        return string(output), err
1✔
206
}
207

208
// NodeUnpublishVolume unmount the volume from the target path
209
func (d *Driver) NodeUnpublishVolume(_ context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
3✔
210
        volumeID := req.GetVolumeId()
3✔
211
        if len(volumeID) == 0 {
4✔
212
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
213
        }
1✔
214
        targetPath := req.GetTargetPath()
2✔
215
        if len(targetPath) == 0 {
3✔
216
                return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
1✔
217
        }
1✔
218

219
        klog.V(2).Infof("NodeUnpublishVolume: unmounting volume %s on %s", volumeID, targetPath)
1✔
220
        err := mount.CleanupMountPoint(targetPath, d.mounter, true /*extensiveMountPointCheck*/)
1✔
221
        if err != nil {
1✔
222
                return nil, status.Errorf(codes.Internal, "failed to unmount target %q: %v", targetPath, err)
×
223
        }
×
224
        klog.V(2).Infof("NodeUnpublishVolume: unmount volume %s on %s successfully", volumeID, targetPath)
1✔
225

1✔
226
        return &csi.NodeUnpublishVolumeResponse{}, nil
1✔
227
}
228

229
// NodeStageVolume mount the volume to a staging path
230
func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
10✔
231
        volumeID := req.GetVolumeId()
10✔
232
        if len(volumeID) == 0 {
11✔
233
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
234
        }
1✔
235
        targetPath := req.GetStagingTargetPath()
9✔
236
        if len(targetPath) == 0 {
10✔
237
                return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
1✔
238
        }
1✔
239
        volumeCapability := req.GetVolumeCapability()
8✔
240
        if volumeCapability == nil {
9✔
241
                return nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
1✔
242
        }
1✔
243

244
        lockKey := fmt.Sprintf("%s-%s", volumeID, targetPath)
7✔
245
        if acquired := d.volumeLocks.TryAcquire(lockKey); !acquired {
9✔
246
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
2✔
247
        }
2✔
248
        defer d.volumeLocks.Release(lockKey)
5✔
249

5✔
250
        mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
5✔
251
        volumeMountGroup := req.GetVolumeCapability().GetMount().GetVolumeMountGroup()
5✔
252
        attrib := req.GetVolumeContext()
5✔
253
        secrets := req.GetSecrets()
5✔
254

5✔
255
        if getValueInMap(attrib, clientIDField) != "" && attrib[serviceAccountTokenField] == "" {
5✔
256
                klog.V(2).Infof("Skip NodeStageVolume for volume(%s) since clientID %s is provided but service account token is empty", volumeID, getValueInMap(attrib, clientIDField))
×
257
                return &csi.NodeStageVolumeResponse{}, nil
×
258
        }
×
259

260
        mc := metrics.NewMetricContext(blobCSIDriverName, "node_stage_volume", d.cloud.ResourceGroup, "", d.Name)
5✔
261
        isOperationSucceeded := false
5✔
262
        defer func() {
10✔
263
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
5✔
264
        }()
5✔
265

266
        var serverAddress, storageEndpointSuffix, protocol, ephemeralVolMountOptions string
5✔
267
        var ephemeralVol, isHnsEnabled bool
5✔
268

5✔
269
        containerNameReplaceMap := map[string]string{}
5✔
270

5✔
271
        fsGroupChangePolicy := d.fsGroupChangePolicy
5✔
272

5✔
273
        mountPermissions := d.mountPermissions
5✔
274
        performChmodOp := (mountPermissions > 0)
5✔
275
        for k, v := range attrib {
12✔
276
                switch strings.ToLower(k) {
7✔
277
                case serverNameField:
×
278
                        serverAddress = v
×
279
                case protocolField:
2✔
280
                        protocol = v
2✔
281
                case storageEndpointSuffixField:
×
282
                        storageEndpointSuffix = v
×
283
                case ephemeralField:
×
284
                        ephemeralVol = strings.EqualFold(v, trueValue)
×
285
                case mountOptionsField:
×
286
                        ephemeralVolMountOptions = v
×
287
                case isHnsEnabledField:
×
288
                        isHnsEnabled = strings.EqualFold(v, trueValue)
×
289
                case pvcNamespaceKey:
×
290
                        containerNameReplaceMap[pvcNamespaceMetadata] = v
×
291
                case pvcNameKey:
×
292
                        containerNameReplaceMap[pvcNameMetadata] = v
×
293
                case pvNameKey:
×
294
                        containerNameReplaceMap[pvNameMetadata] = v
×
295
                case mountPermissionsField:
4✔
296
                        if v != "" {
8✔
297
                                var err error
4✔
298
                                var perm uint64
4✔
299
                                if perm, err = strconv.ParseUint(v, 8, 32); err != nil {
5✔
300
                                        return nil, status.Errorf(codes.InvalidArgument, "invalid mountPermissions %s", v)
1✔
301
                                }
1✔
302
                                if perm == 0 {
3✔
303
                                        performChmodOp = false
×
304
                                } else {
3✔
305
                                        mountPermissions = perm
3✔
306
                                }
3✔
307
                        }
308
                case fsGroupChangePolicyField:
1✔
309
                        fsGroupChangePolicy = v
1✔
310
                }
311
        }
312

313
        if !isSupportedFSGroupChangePolicy(fsGroupChangePolicy) {
5✔
314
                return nil, status.Errorf(codes.InvalidArgument, "fsGroupChangePolicy(%s) is not supported, supported fsGroupChangePolicy list: %v", fsGroupChangePolicy, supportedFSGroupChangePolicyList)
1✔
315
        }
1✔
316

317
        mnt, err := d.ensureMountPoint(targetPath, fs.FileMode(mountPermissions))
3✔
318
        if err != nil {
4✔
319
                return nil, status.Errorf(codes.Internal, "Could not mount target %q: %v", targetPath, err)
1✔
320
        }
1✔
321
        if mnt {
2✔
322
                klog.V(2).Infof("NodeStageVolume: volume %s is already mounted on %s", volumeID, targetPath)
×
323
                return &csi.NodeStageVolumeResponse{}, nil
×
324
        }
×
325

326
        _, accountName, _, containerName, authEnv, err := d.GetAuthEnv(ctx, volumeID, protocol, attrib, secrets)
2✔
327
        if err != nil {
2✔
328
                return nil, status.Errorf(codes.Internal, "%v", err)
×
329
        }
×
330

331
        // replace pv/pvc name namespace metadata in subDir
332
        containerName = replaceWithMap(containerName, containerNameReplaceMap)
2✔
333

2✔
334
        if strings.TrimSpace(storageEndpointSuffix) == "" {
4✔
335
                storageEndpointSuffix = d.getStorageEndPointSuffix()
2✔
336
        }
2✔
337

338
        if strings.TrimSpace(serverAddress) == "" {
4✔
339
                // server address is "accountname.blob.core.windows.net" by default
2✔
340
                serverAddress = fmt.Sprintf("%s.blob.%s", accountName, storageEndpointSuffix)
2✔
341
        }
2✔
342

343
        if isReadOnlyFromCapability(volumeCapability) {
2✔
344
                if isNFSProtocol(protocol) {
×
345
                        mountFlags = util.JoinMountOptions(mountFlags, []string{"ro"})
×
346
                } else {
×
347
                        mountFlags = util.JoinMountOptions(mountFlags, []string{"-o ro"})
×
348
                }
×
349
                klog.V(2).Infof("CSI volume is read-only, mounting with extra option ro")
×
350
        }
351

352
        if isNFSProtocol(protocol) {
3✔
353
                klog.V(2).Infof("target %v\nprotocol %v\n\nvolumeId %v\nmountflags %v\nserverAddress %v",
1✔
354
                        targetPath, protocol, volumeID, mountFlags, serverAddress)
1✔
355

1✔
356
                mountType := AZNFS
1✔
357
                if !d.enableAznfsMount || protocol == NFSv3 {
2✔
358
                        mountType = NFS
1✔
359
                }
1✔
360

361
                if storageEndpointSuffix != "" && mountType == AZNFS {
1✔
362
                        aznfsEndpoint := strings.Replace(storageEndpointSuffix, "core.", "", 1)
×
363
                        klog.V(2).Infof("set AZURE_ENDPOINT_OVERRIDE to %s", aznfsEndpoint)
×
364
                        os.Setenv("AZURE_ENDPOINT_OVERRIDE", aznfsEndpoint)
×
365
                }
×
366

367
                source := fmt.Sprintf("%s:/%s/%s", serverAddress, accountName, containerName)
1✔
368
                mountOptions := util.JoinMountOptions(mountFlags, []string{"sec=sys,vers=3,nolock"})
1✔
369
                execFunc := func() error { return d.mounter.MountSensitive(source, targetPath, mountType, mountOptions, []string{}) }
2✔
370
                timeoutFunc := func() error { return fmt.Errorf("time out") }
1✔
371
                if err := volumehelper.WaitUntilTimeout(90*time.Second, execFunc, timeoutFunc); err != nil {
1✔
372
                        var helpLinkMsg string
×
373
                        if d.appendMountErrorHelpLink {
×
374
                                helpLinkMsg = "\nPlease refer to http://aka.ms/blobmounterror for possible causes and solutions for mount errors."
×
375
                        }
×
376
                        return nil, status.Error(codes.Internal, fmt.Sprintf("volume(%s) mount %q on %q failed with %v%s", volumeID, source, targetPath, err, helpLinkMsg))
×
377
                }
378

379
                if performChmodOp {
1✔
380
                        if err := chmodIfPermissionMismatch(targetPath, os.FileMode(mountPermissions)); err != nil {
×
381
                                return nil, status.Error(codes.Internal, err.Error())
×
382
                        }
×
383
                } else {
1✔
384
                        klog.V(2).Infof("skip chmod on targetPath(%s) since mountPermissions is set as 0", targetPath)
1✔
385
                }
1✔
386

387
                if volumeMountGroup != "" && fsGroupChangePolicy != FSGroupChangeNone {
1✔
388
                        klog.V(2).Infof("set gid of volume(%s) as %s using fsGroupChangePolicy(%s)", volumeID, volumeMountGroup, fsGroupChangePolicy)
×
389
                        if err := volumehelper.SetVolumeOwnership(targetPath, volumeMountGroup, fsGroupChangePolicy); err != nil {
×
390
                                return nil, status.Error(codes.Internal, fmt.Sprintf("SetVolumeOwnership with volume(%s) on %s failed with %v", volumeID, targetPath, err))
×
391
                        }
×
392
                }
393

394
                isOperationSucceeded = true
1✔
395
                klog.V(2).Infof("volume(%s) mount %s on %s succeeded", volumeID, source, targetPath)
1✔
396
                return &csi.NodeStageVolumeResponse{}, nil
1✔
397
        }
398

399
        // Get mountOptions that the volume will be formatted and mounted with
400
        mountOptions := mountFlags
1✔
401
        if ephemeralVol {
1✔
402
                mountOptions = util.JoinMountOptions(mountOptions, strings.Split(ephemeralVolMountOptions, ","))
×
403
        }
×
404
        if isHnsEnabled {
1✔
405
                mountOptions = util.JoinMountOptions(mountOptions, []string{"--use-adls=true"})
×
406
        }
×
407

408
        if !checkGidPresentInMountFlags(mountFlags) && volumeMountGroup != "" {
1✔
409
                klog.V(2).Infof("append volumeMountGroup %s", volumeMountGroup)
×
410
                mountOptions = append(mountOptions, fmt.Sprintf("-o gid=%s", volumeMountGroup))
×
411
        }
×
412

413
        tmpPath := fmt.Sprintf("%s/%s", "/mnt", volumeID)
1✔
414
        if d.appendTimeStampInCacheDir {
1✔
415
                tmpPath += fmt.Sprintf("#%d", time.Now().Unix())
×
416
        }
×
417
        mountOptions = appendDefaultMountOptions(mountOptions, tmpPath, containerName)
1✔
418

1✔
419
        args := targetPath
1✔
420
        for _, opt := range mountOptions {
7✔
421
                args = args + " " + opt
6✔
422
        }
6✔
423

424
        klog.V(2).Infof("target %v protocol %v volumeId %v\nmountflags %v\nmountOptions %v volumeMountGroup %s\nargs %v\nserverAddress %v",
1✔
425
                targetPath, protocol, volumeID, mountFlags, mountOptions, volumeMountGroup, args, serverAddress)
1✔
426

1✔
427
        authEnv = append(authEnv, "AZURE_STORAGE_ACCOUNT="+accountName, "AZURE_STORAGE_BLOB_ENDPOINT="+serverAddress)
1✔
428
        if d.enableBlobMockMount {
2✔
429
                klog.Warningf("NodeStageVolume: mock mount on volumeID(%s), this is only for TESTING!!!", volumeID)
1✔
430
                if err := volumehelper.MakeDir(targetPath, os.FileMode(mountPermissions)); err != nil {
1✔
431
                        klog.Errorf("MakeDir failed on target: %s (%v)", targetPath, err)
×
432
                        return nil, status.Errorf(codes.Internal, "%v", err)
×
433
                }
×
434
                return &csi.NodeStageVolumeResponse{}, nil
1✔
435
        }
436

437
        var output string
×
438
        if d.enableBlobfuseProxy {
×
439
                output, err = d.mountBlobfuseWithProxy(args, protocol, authEnv)
×
440
        } else {
×
441
                output, err = d.mountBlobfuseInsideDriver(args, protocol, authEnv)
×
442
        }
×
443

444
        if err != nil {
×
445
                var helpLinkMsg string
×
446
                if d.appendMountErrorHelpLink {
×
447
                        helpLinkMsg = "\nPlease refer to http://aka.ms/blobmounterror for possible causes and solutions for mount errors."
×
448
                }
×
449
                err = status.Errorf(codes.Internal, "Mount failed with error: %v, output: %v%s", err, output, helpLinkMsg)
×
450
                klog.Errorf("%v", err)
×
451
                notMnt, mntErr := d.mounter.IsLikelyNotMountPoint(targetPath)
×
452
                if mntErr != nil {
×
453
                        klog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
×
454
                        return nil, err
×
455
                }
×
456
                if !notMnt {
×
457
                        if mntErr = d.mounter.Unmount(targetPath); mntErr != nil {
×
458
                                klog.Errorf("Failed to unmount: %v", mntErr)
×
459
                                return nil, err
×
460
                        }
×
461
                        notMnt, mntErr := d.mounter.IsLikelyNotMountPoint(targetPath)
×
462
                        if mntErr != nil {
×
463
                                klog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
×
464
                                return nil, err
×
465
                        }
×
466
                        if !notMnt {
×
467
                                // This is very odd, we don't expect it.  We'll try again next sync loop.
×
468
                                klog.Errorf("%s is still mounted, despite call to unmount().  Will try again next sync loop.", targetPath)
×
469
                                return nil, err
×
470
                        }
×
471
                }
472
                os.Remove(targetPath)
×
473
                return nil, err
×
474
        }
475

476
        // wait a few seconds to make sure blobfuse mount is successful
477
        // please refer to https://github.com/Azure/azure-storage-fuse/pull/1088 for more details
478
        if err := waitForMount(targetPath, waitForMountInterval, waitForMountTimeout); err != nil {
×
479
                return nil, fmt.Errorf("failed to wait for mount: %w", err)
×
480
        }
×
481

482
        klog.V(2).Infof("volume(%s) mount on %q succeeded", volumeID, targetPath)
×
483
        return &csi.NodeStageVolumeResponse{}, nil
×
484
}
485

486
// NodeUnstageVolume unmount the volume from the staging path
487
func (d *Driver) NodeUnstageVolume(_ context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
3✔
488
        volumeID := req.GetVolumeId()
3✔
489
        if len(volumeID) == 0 {
4✔
490
                return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
1✔
491
        }
1✔
492

493
        stagingTargetPath := req.GetStagingTargetPath()
2✔
494
        if len(stagingTargetPath) == 0 {
3✔
495
                return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
1✔
496
        }
1✔
497

498
        lockKey := fmt.Sprintf("%s-%s", volumeID, stagingTargetPath)
1✔
499
        if acquired := d.volumeLocks.TryAcquire(lockKey); !acquired {
1✔
500
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
×
501
        }
×
502
        defer d.volumeLocks.Release(lockKey)
1✔
503

1✔
504
        mc := metrics.NewMetricContext(blobCSIDriverName, "node_unstage_volume", d.cloud.ResourceGroup, "", d.Name)
1✔
505
        isOperationSucceeded := false
1✔
506
        defer func() {
2✔
507
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
1✔
508
        }()
1✔
509

510
        klog.V(2).Infof("NodeUnstageVolume: volume %s unmounting on %s", volumeID, stagingTargetPath)
1✔
511
        err := mount.CleanupMountPoint(stagingTargetPath, d.mounter, true /*extensiveMountPointCheck*/)
1✔
512
        if err != nil {
1✔
513
                return nil, status.Errorf(codes.Internal, "failed to unmount staging target %q: %v", stagingTargetPath, err)
×
514
        }
×
515
        klog.V(2).Infof("NodeUnstageVolume: volume %s unmount on %s successfully", volumeID, stagingTargetPath)
1✔
516

1✔
517
        isOperationSucceeded = true
1✔
518
        return &csi.NodeUnstageVolumeResponse{}, nil
1✔
519
}
520

521
// NodeGetCapabilities return the capabilities of the Node plugin
522
func (d *Driver) NodeGetCapabilities(_ context.Context, _ *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
1✔
523
        return &csi.NodeGetCapabilitiesResponse{
1✔
524
                Capabilities: d.NSCap,
1✔
525
        }, nil
1✔
526
}
1✔
527

528
// NodeGetInfo return info of the node on which this plugin is running
529
func (d *Driver) NodeGetInfo(_ context.Context, _ *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
1✔
530
        return &csi.NodeGetInfoResponse{
1✔
531
                NodeId: d.NodeID,
1✔
532
        }, nil
1✔
533
}
1✔
534

535
// NodeExpandVolume node expand volume
536
func (d *Driver) NodeExpandVolume(_ context.Context, _ *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
1✔
537
        return nil, status.Error(codes.Unimplemented, "NodeExpandVolume is not yet implemented")
1✔
538
}
1✔
539

540
// NodeGetVolumeStats get volume stats
541
func (d *Driver) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
4✔
542
        if len(req.VolumeId) == 0 {
5✔
543
                return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume ID was empty")
1✔
544
        }
1✔
545
        if len(req.VolumePath) == 0 {
4✔
546
                return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume path was empty")
1✔
547
        }
1✔
548

549
        // check if the volume stats is cached
550
        cache, err := d.volStatsCache.Get(ctx, req.VolumeId, azcache.CacheReadTypeDefault)
2✔
551
        if err != nil {
2✔
552
                return nil, status.Errorf(codes.Internal, "%v", err)
×
553
        }
×
554
        if cache != nil {
2✔
555
                resp := cache.(*csi.NodeGetVolumeStatsResponse)
×
556
                klog.V(6).Infof("NodeGetVolumeStats: volume stats for volume %s path %s is cached", req.VolumeId, req.VolumePath)
×
557
                return resp, nil
×
558
        }
×
559

560
        mc := metrics.NewMetricContext(blobCSIDriverName, "node_get_volume_stats", d.cloud.ResourceGroup, "", d.Name)
2✔
561
        mc.LogLevel = 6 // change log level
2✔
562
        isOperationSucceeded := false
2✔
563
        defer func() {
4✔
564
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, req.VolumeId)
2✔
565
        }()
2✔
566

567
        if _, err := os.Lstat(req.VolumePath); err != nil {
3✔
568
                if os.IsNotExist(err) {
2✔
569
                        return nil, status.Errorf(codes.NotFound, "path %s does not exist", req.VolumePath)
1✔
570
                }
1✔
571
                return nil, status.Errorf(codes.Internal, "failed to stat file %s: %v", req.VolumePath, err)
×
572
        }
573

574
        klog.V(6).Infof("NodeGetVolumeStats: begin to get VolumeStats on volume %s path %s", req.VolumeId, req.VolumePath)
1✔
575

1✔
576
        volumeMetrics, err := volume.NewMetricsStatFS(req.VolumePath).GetMetrics()
1✔
577
        if err != nil {
1✔
578
                return nil, status.Errorf(codes.Internal, "failed to get metrics: %v", err)
×
579
        }
×
580

581
        available, ok := volumeMetrics.Available.AsInt64()
1✔
582
        if !ok {
1✔
583
                return nil, status.Errorf(codes.Internal, "failed to transform volume available size(%v)", volumeMetrics.Available)
×
584
        }
×
585
        capacity, ok := volumeMetrics.Capacity.AsInt64()
1✔
586
        if !ok {
1✔
587
                return nil, status.Errorf(codes.Internal, "failed to transform volume capacity size(%v)", volumeMetrics.Capacity)
×
588
        }
×
589
        used, ok := volumeMetrics.Used.AsInt64()
1✔
590
        if !ok {
1✔
591
                return nil, status.Errorf(codes.Internal, "failed to transform volume used size(%v)", volumeMetrics.Used)
×
592
        }
×
593

594
        inodesFree, ok := volumeMetrics.InodesFree.AsInt64()
1✔
595
        if !ok {
1✔
596
                return nil, status.Errorf(codes.Internal, "failed to transform disk inodes free(%v)", volumeMetrics.InodesFree)
×
597
        }
×
598
        inodes, ok := volumeMetrics.Inodes.AsInt64()
1✔
599
        if !ok {
1✔
600
                return nil, status.Errorf(codes.Internal, "failed to transform disk inodes(%v)", volumeMetrics.Inodes)
×
601
        }
×
602
        inodesUsed, ok := volumeMetrics.InodesUsed.AsInt64()
1✔
603
        if !ok {
1✔
604
                return nil, status.Errorf(codes.Internal, "failed to transform disk inodes used(%v)", volumeMetrics.InodesUsed)
×
605
        }
×
606

607
        resp := &csi.NodeGetVolumeStatsResponse{
1✔
608
                Usage: []*csi.VolumeUsage{
1✔
609
                        {
1✔
610
                                Unit:      csi.VolumeUsage_BYTES,
1✔
611
                                Available: available,
1✔
612
                                Total:     capacity,
1✔
613
                                Used:      used,
1✔
614
                        },
1✔
615
                        {
1✔
616
                                Unit:      csi.VolumeUsage_INODES,
1✔
617
                                Available: inodesFree,
1✔
618
                                Total:     inodes,
1✔
619
                                Used:      inodesUsed,
1✔
620
                        },
1✔
621
                },
1✔
622
        }
1✔
623

1✔
624
        isOperationSucceeded = true
1✔
625
        klog.V(6).Infof("NodeGetVolumeStats: volume stats for volume %s path %s is %v", req.VolumeId, req.VolumePath, resp)
1✔
626
        // cache the volume stats per volume
1✔
627
        d.volStatsCache.Set(req.VolumeId, resp)
1✔
628
        return resp, nil
1✔
629
}
630

631
// ensureMountPoint: create mount point if not exists
632
// return <true, nil> if it's already a mounted point otherwise return <false, nil>
633
func (d *Driver) ensureMountPoint(target string, perm os.FileMode) (bool, error) {
11✔
634
        notMnt, err := d.mounter.IsLikelyNotMountPoint(target)
11✔
635
        if err != nil && !os.IsNotExist(err) {
13✔
636
                if IsCorruptedDir(target) {
2✔
637
                        notMnt = false
×
638
                        klog.Warningf("detected corrupted mount for targetPath [%s]", target)
×
639
                } else {
2✔
640
                        return !notMnt, err
2✔
641
                }
2✔
642
        }
643

644
        // Check all the mountpoints in case IsLikelyNotMountPoint
645
        // cannot handle --bind mount
646
        mountList, err := d.mounter.List()
9✔
647
        if err != nil {
9✔
648
                return !notMnt, err
×
649
        }
×
650

651
        targetAbs, err := filepath.Abs(target)
9✔
652
        if err != nil {
9✔
653
                return !notMnt, err
×
654
        }
×
655

656
        for _, mountPoint := range mountList {
9✔
657
                if mountPoint.Path == targetAbs {
×
658
                        notMnt = false
×
659
                        break
×
660
                }
661
        }
662

663
        if !notMnt {
11✔
664
                // testing original mount point, make sure the mount link is valid
2✔
665
                _, err := os.ReadDir(target)
2✔
666
                if err == nil {
3✔
667
                        klog.V(2).Infof("already mounted to target %s", target)
1✔
668
                        return !notMnt, nil
1✔
669
                }
1✔
670
                // mount link is invalid, now unmount and remount later
671
                klog.Warningf("ReadDir %s failed with %v, unmount this directory", target, err)
1✔
672
                if err := d.mounter.Unmount(target); err != nil {
1✔
673
                        klog.Errorf("Unmount directory %s failed with %v", target, err)
×
674
                        return !notMnt, err
×
675
                }
×
676
                notMnt = true
1✔
677
                return !notMnt, err
1✔
678
        }
679
        if err := volumehelper.MakeDir(target, perm); err != nil {
9✔
680
                klog.Errorf("MakeDir failed on target: %s (%v)", target, err)
2✔
681
                return !notMnt, err
2✔
682
        }
2✔
683
        return !notMnt, nil
5✔
684
}
685

686
func waitForMount(path string, intervel, timeout time.Duration) error {
2✔
687
        timeAfter := time.After(timeout)
2✔
688
        timeTick := time.Tick(intervel)
2✔
689

2✔
690
        for {
9✔
691
                select {
7✔
692
                case <-timeTick:
6✔
693
                        notMount, err := mount.New("").IsLikelyNotMountPoint(path)
6✔
694
                        if err != nil {
7✔
695
                                return err
1✔
696
                        }
1✔
697
                        if !notMount {
5✔
698
                                klog.V(2).Infof("blobfuse mount at %s success", path)
×
699
                                return nil
×
700
                        }
×
701
                case <-timeAfter:
1✔
702
                        return fmt.Errorf("timeout waiting for mount %s", path)
1✔
703
                }
704
        }
705
}
706

707
func checkGidPresentInMountFlags(mountFlags []string) bool {
4✔
708
        for _, mountFlag := range mountFlags {
6✔
709
                if strings.Contains(mountFlag, "gid=") {
4✔
710
                        return true
2✔
711
                }
2✔
712
        }
713
        return false
2✔
714
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc