• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 16359912094

18 Jul 2025 01:28AM UTC coverage: 80.461%. Remained the same
16359912094

Pull #2086

github

andyzhangx
feat: install blobfuse 2.5.0 as default version
Pull Request #2086: feat: install blobfuse 2.5.0 as default version

2446 of 3040 relevant lines covered (80.46%)

8.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.24
/pkg/blob/nodeserver.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package blob
18

19
import (
20
        "fmt"
21
        "io/fs"
22
        "os"
23
        "os/exec"
24
        "path/filepath"
25
        "strconv"
26
        "strings"
27
        "time"
28

29
        volumehelper "sigs.k8s.io/blob-csi-driver/pkg/util"
30
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
31
        "sigs.k8s.io/cloud-provider-azure/pkg/metrics"
32

33
        "github.com/container-storage-interface/spec/lib/go/csi"
34

35
        "k8s.io/klog/v2"
36
        "k8s.io/kubernetes/pkg/volume"
37
        "k8s.io/kubernetes/pkg/volume/util"
38
        mount "k8s.io/mount-utils"
39

40
        "google.golang.org/grpc/codes"
41
        "google.golang.org/grpc/status"
42

43
        "golang.org/x/net/context"
44
        "google.golang.org/grpc"
45
        mount_azure_blob "sigs.k8s.io/blob-csi-driver/pkg/blobfuse-proxy/pb"
46
)
47

48
const (
49
        waitForMountInterval = 20 * time.Millisecond
50
        waitForMountTimeout  = 60 * time.Second
51
)
52

53
type MountClient struct {
54
        service mount_azure_blob.MountServiceClient
55
}
56

57
// NewMountClient returns a new mount client
58
func NewMountClient(cc *grpc.ClientConn) *MountClient {
3✔
59
        service := mount_azure_blob.NewMountServiceClient(cc)
3✔
60
        return &MountClient{service}
3✔
61
}
3✔
62

63
// NodePublishVolume mount the volume from staging to target path
64
func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
15✔
65
        volCap := req.GetVolumeCapability()
15✔
66
        if volCap == nil {
16✔
67
                return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
1✔
68
        }
1✔
69
        volumeID := req.GetVolumeId()
14✔
70
        if len(req.GetVolumeId()) == 0 {
15✔
71
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
72
        }
1✔
73

74
        target := req.GetTargetPath()
13✔
75
        if len(target) == 0 {
14✔
76
                return nil, status.Error(codes.InvalidArgument, "Target path not provided")
1✔
77
        }
1✔
78

79
        mountPermissions := d.mountPermissions
12✔
80
        context := req.GetVolumeContext()
12✔
81
        if context != nil {
18✔
82
                // token request
6✔
83
                if context[serviceAccountTokenField] != "" && getValueInMap(context, clientIDField) != "" {
7✔
84
                        klog.V(2).Infof("NodePublishVolume: volume(%s) mount on %s with service account token, clientID: %s", volumeID, target, getValueInMap(context, clientIDField))
1✔
85
                        _, err := d.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{
1✔
86
                                StagingTargetPath: target,
1✔
87
                                VolumeContext:     context,
1✔
88
                                VolumeCapability:  volCap,
1✔
89
                                VolumeId:          volumeID,
1✔
90
                        })
1✔
91
                        return &csi.NodePublishVolumeResponse{}, err
1✔
92
                }
1✔
93

94
                // ephemeral volume
95
                if strings.EqualFold(context[ephemeralField], trueValue) {
6✔
96
                        setKeyValueInMap(context, secretNamespaceField, context[podNamespaceField])
1✔
97
                        if !d.allowInlineVolumeKeyAccessWithIdentity {
2✔
98
                                // only get storage account from secret
1✔
99
                                setKeyValueInMap(context, getAccountKeyFromSecretField, trueValue)
1✔
100
                                setKeyValueInMap(context, storageAccountField, "")
1✔
101
                        }
1✔
102
                        klog.V(2).Infof("NodePublishVolume: ephemeral volume(%s) mount on %s", volumeID, target)
1✔
103
                        _, err := d.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{
1✔
104
                                StagingTargetPath: target,
1✔
105
                                VolumeContext:     context,
1✔
106
                                VolumeCapability:  volCap,
1✔
107
                                VolumeId:          volumeID,
1✔
108
                        })
1✔
109
                        return &csi.NodePublishVolumeResponse{}, err
1✔
110
                }
111

112
                if perm := getValueInMap(context, mountPermissionsField); perm != "" {
8✔
113
                        var err error
4✔
114
                        if mountPermissions, err = strconv.ParseUint(perm, 8, 32); err != nil {
5✔
115
                                return nil, status.Errorf(codes.InvalidArgument, "invalid mountPermissions %s", perm)
1✔
116
                        }
1✔
117
                }
118
        }
119

120
        source := req.GetStagingTargetPath()
9✔
121
        if len(source) == 0 {
10✔
122
                return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
1✔
123
        }
1✔
124

125
        mountOptions := []string{"bind"}
8✔
126
        if req.GetReadonly() {
11✔
127
                mountOptions = append(mountOptions, "ro")
3✔
128
        }
3✔
129

130
        mnt, err := d.ensureMountPoint(target, fs.FileMode(mountPermissions))
8✔
131
        if err != nil {
10✔
132
                return nil, status.Errorf(codes.Internal, "Could not mount target %q: %v", target, err)
2✔
133
        }
2✔
134
        if mnt {
7✔
135
                klog.V(2).Infof("NodePublishVolume: volume %s is already mounted on %s", volumeID, target)
1✔
136
                return &csi.NodePublishVolumeResponse{}, nil
1✔
137
        }
1✔
138

139
        klog.V(2).Infof("NodePublishVolume: volume %s mounting %s at %s with mountOptions: %v", volumeID, source, target, mountOptions)
5✔
140
        if d.enableBlobMockMount {
6✔
141
                klog.Warningf("NodePublishVolume: mock mount on volumeID(%s), this is only for TESTING!!!", volumeID)
1✔
142
                if err := volumehelper.MakeDir(target, os.FileMode(mountPermissions)); err != nil {
1✔
143
                        klog.Errorf("MakeDir failed on target: %s (%v)", target, err)
×
144
                        return nil, status.Errorf(codes.Internal, "%v", err)
×
145
                }
×
146
                return &csi.NodePublishVolumeResponse{}, nil
1✔
147
        }
148

149
        if err := d.mounter.Mount(source, target, "", mountOptions); err != nil {
6✔
150
                if removeErr := os.Remove(target); removeErr != nil {
2✔
151
                        return nil, status.Errorf(codes.Internal, "Could not remove mount target %q: %v", target, removeErr)
×
152
                }
×
153
                return nil, status.Errorf(codes.Internal, "Could not mount %q at %q: %v", source, target, err)
2✔
154
        }
155
        klog.V(2).Infof("NodePublishVolume: volume %s mount %s at %s successfully", volumeID, source, target)
2✔
156

2✔
157
        return &csi.NodePublishVolumeResponse{}, nil
2✔
158
}
159

160
func (d *Driver) mountBlobfuseWithProxy(args, protocol string, authEnv []string) (string, error) {
3✔
161
        var resp *mount_azure_blob.MountAzureBlobResponse
3✔
162
        var output string
3✔
163
        connectionTimout := time.Duration(d.blobfuseProxyConnTimout) * time.Second
3✔
164
        ctx, cancel := context.WithTimeout(context.Background(), connectionTimout)
3✔
165
        defer cancel()
3✔
166
        klog.V(2).Infof("start connecting to blobfuse proxy, protocol: %s, args: %s", protocol, args)
3✔
167
        conn, err := grpc.DialContext(ctx, d.blobfuseProxyEndpoint, grpc.WithInsecure(), grpc.WithBlock())
3✔
168
        if err != nil {
4✔
169
                klog.Errorf("failed to connect to blobfuse proxy: %v", err)
1✔
170
                return "", err
1✔
171
        }
1✔
172
        defer func() {
4✔
173
                if err := conn.Close(); err != nil {
2✔
174
                        klog.Errorf("failed to close connection to blobfuse proxy: %v", err)
×
175
                }
×
176
        }()
177

178
        mountClient := NewMountClient(conn)
2✔
179
        mountreq := mount_azure_blob.MountAzureBlobRequest{
2✔
180
                MountArgs: args,
2✔
181
                Protocol:  protocol,
2✔
182
                AuthEnv:   authEnv,
2✔
183
        }
2✔
184
        klog.V(2).Infof("begin to mount with blobfuse proxy, protocol: %s, args: %s", protocol, args)
2✔
185
        resp, err = mountClient.service.MountAzureBlob(context.TODO(), &mountreq)
2✔
186
        if err != nil {
3✔
187
                klog.Error("GRPC call returned with an error:", err)
1✔
188
        }
1✔
189
        output = resp.GetOutput()
2✔
190

2✔
191
        return output, err
2✔
192
}
193

194
func (d *Driver) mountBlobfuseInsideDriver(args string, protocol string, authEnv []string) (string, error) {
1✔
195
        var cmd *exec.Cmd
1✔
196

1✔
197
        args = volumehelper.TrimDuplicatedSpace(args)
1✔
198

1✔
199
        mountLog := "mount inside driver with"
1✔
200
        if protocol == Fuse2 {
1✔
201
                mountLog += " v2"
×
202
                args = "mount " + args
×
203
                cmd = exec.Command("blobfuse2", strings.Split(args, " ")...)
×
204
        } else {
1✔
205
                mountLog += " v1"
1✔
206
                cmd = exec.Command("blobfuse", strings.Split(args, " ")...)
1✔
207
        }
1✔
208
        klog.V(2).Infof("%s, protocol: %s, args: %s", mountLog, protocol, args)
1✔
209

1✔
210
        cmd.Env = append(os.Environ(), authEnv...)
1✔
211
        output, err := cmd.CombinedOutput()
1✔
212
        klog.V(2).Infof("mount output: %s\n", string(output))
1✔
213

1✔
214
        return string(output), err
1✔
215
}
216

217
// NodeUnpublishVolume unmount the volume from the target path
218
func (d *Driver) NodeUnpublishVolume(_ context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
3✔
219
        volumeID := req.GetVolumeId()
3✔
220
        if len(volumeID) == 0 {
4✔
221
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
222
        }
1✔
223
        targetPath := req.GetTargetPath()
2✔
224
        if len(targetPath) == 0 {
3✔
225
                return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
1✔
226
        }
1✔
227

228
        klog.V(2).Infof("NodeUnpublishVolume: unmounting volume %s on %s", volumeID, targetPath)
1✔
229
        err := mount.CleanupMountPoint(targetPath, d.mounter, true /*extensiveMountPointCheck*/)
1✔
230
        if err != nil {
1✔
231
                return nil, status.Errorf(codes.Internal, "failed to unmount target %q: %v", targetPath, err)
×
232
        }
×
233
        klog.V(2).Infof("NodeUnpublishVolume: unmount volume %s on %s successfully", volumeID, targetPath)
1✔
234

1✔
235
        return &csi.NodeUnpublishVolumeResponse{}, nil
1✔
236
}
237

238
// NodeStageVolume mount the volume to a staging path
239
func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
12✔
240
        volumeID := req.GetVolumeId()
12✔
241
        if len(volumeID) == 0 {
13✔
242
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
243
        }
1✔
244
        targetPath := req.GetStagingTargetPath()
11✔
245
        if len(targetPath) == 0 {
12✔
246
                return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
1✔
247
        }
1✔
248
        volumeCapability := req.GetVolumeCapability()
10✔
249
        if volumeCapability == nil {
11✔
250
                return nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
1✔
251
        }
1✔
252

253
        lockKey := fmt.Sprintf("%s-%s", volumeID, targetPath)
9✔
254
        if acquired := d.volumeLocks.TryAcquire(lockKey); !acquired {
11✔
255
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
2✔
256
        }
2✔
257
        defer d.volumeLocks.Release(lockKey)
7✔
258

7✔
259
        mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
7✔
260
        volumeMountGroup := req.GetVolumeCapability().GetMount().GetVolumeMountGroup()
7✔
261
        attrib := req.GetVolumeContext()
7✔
262
        secrets := req.GetSecrets()
7✔
263

7✔
264
        if getValueInMap(attrib, clientIDField) != "" && attrib[serviceAccountTokenField] == "" {
7✔
265
                klog.V(2).Infof("Skip NodeStageVolume for volume(%s) since clientID %s is provided but service account token is empty", volumeID, getValueInMap(attrib, clientIDField))
×
266
                return &csi.NodeStageVolumeResponse{}, nil
×
267
        }
×
268

269
        var serverAddress, storageEndpointSuffix, protocol, ephemeralVolMountOptions string
7✔
270
        var ephemeralVol, isHnsEnabled bool
7✔
271

7✔
272
        containerNameReplaceMap := map[string]string{}
7✔
273

7✔
274
        fsGroupChangePolicy := d.fsGroupChangePolicy
7✔
275

7✔
276
        mountPermissions := d.mountPermissions
7✔
277
        performChmodOp := (mountPermissions > 0)
7✔
278
        for k, v := range attrib {
24✔
279
                switch strings.ToLower(k) {
17✔
280
                case serverNameField:
×
281
                        serverAddress = v
×
282
                case protocolField:
2✔
283
                        protocol = v
2✔
284
                case storageEndpointSuffixField:
×
285
                        storageEndpointSuffix = v
×
286
                case ephemeralField:
1✔
287
                        ephemeralVol = strings.EqualFold(v, trueValue)
1✔
288
                case mountOptionsField:
×
289
                        ephemeralVolMountOptions = v
×
290
                case isHnsEnabledField:
×
291
                        isHnsEnabled = strings.EqualFold(v, trueValue)
×
292
                case pvcNamespaceKey:
×
293
                        containerNameReplaceMap[pvcNamespaceMetadata] = v
×
294
                case pvcNameKey:
×
295
                        containerNameReplaceMap[pvcNameMetadata] = v
×
296
                case pvNameKey:
×
297
                        containerNameReplaceMap[pvNameMetadata] = v
×
298
                case mountPermissionsField:
4✔
299
                        if v != "" {
8✔
300
                                var err error
4✔
301
                                var perm uint64
4✔
302
                                if perm, err = strconv.ParseUint(v, 8, 32); err != nil {
5✔
303
                                        return nil, status.Errorf(codes.InvalidArgument, "invalid mountPermissions %s", v)
1✔
304
                                }
1✔
305
                                if perm == 0 {
3✔
306
                                        performChmodOp = false
×
307
                                } else {
3✔
308
                                        mountPermissions = perm
3✔
309
                                }
3✔
310
                        }
311
                case fsGroupChangePolicyField:
1✔
312
                        fsGroupChangePolicy = v
1✔
313
                }
314
        }
315

316
        if !isSupportedFSGroupChangePolicy(fsGroupChangePolicy) {
7✔
317
                return nil, status.Errorf(codes.InvalidArgument, "fsGroupChangePolicy(%s) is not supported, supported fsGroupChangePolicy list: %v", fsGroupChangePolicy, supportedFSGroupChangePolicyList)
1✔
318
        }
1✔
319

320
        mnt, err := d.ensureMountPoint(targetPath, fs.FileMode(mountPermissions))
5✔
321
        if err != nil {
6✔
322
                return nil, status.Errorf(codes.Internal, "Could not mount target %q: %v", targetPath, err)
1✔
323
        }
1✔
324

325
        _, accountName, _, containerName, authEnv, err := d.GetAuthEnv(ctx, volumeID, protocol, attrib, secrets)
4✔
326
        if err != nil && !mnt {
4✔
327
                return nil, status.Errorf(codes.Internal, "%v", err)
×
328
        }
×
329

330
        if mnt {
4✔
331
                klog.V(2).Infof("NodeStageVolume: volume %s is already mounted on %s", volumeID, targetPath)
×
332
                return &csi.NodeStageVolumeResponse{}, nil
×
333
        }
×
334

335
        mc := metrics.NewMetricContext(blobCSIDriverName, "node_stage_volume", d.cloud.ResourceGroup, "", d.Name)
4✔
336
        isOperationSucceeded := false
4✔
337
        defer func() {
8✔
338
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
4✔
339
        }()
4✔
340

341
        // replace pv/pvc name namespace metadata in subDir
342
        containerName = replaceWithMap(containerName, containerNameReplaceMap)
4✔
343

4✔
344
        if strings.TrimSpace(storageEndpointSuffix) == "" {
8✔
345
                storageEndpointSuffix = d.getStorageEndPointSuffix()
4✔
346
        }
4✔
347

348
        if strings.TrimSpace(serverAddress) == "" {
8✔
349
                // server address is "accountname.blob.core.windows.net" by default
4✔
350
                serverAddress = fmt.Sprintf("%s.blob.%s", accountName, storageEndpointSuffix)
4✔
351
        }
4✔
352

353
        if isReadOnlyFromCapability(volumeCapability) {
4✔
354
                if isNFSProtocol(protocol) {
×
355
                        mountFlags = util.JoinMountOptions(mountFlags, []string{"ro"})
×
356
                } else {
×
357
                        mountFlags = util.JoinMountOptions(mountFlags, []string{"-o ro"})
×
358
                }
×
359
                klog.V(2).Infof("CSI volume is read-only, mounting with extra option ro")
×
360
        }
361

362
        if isNFSProtocol(protocol) {
5✔
363
                klog.V(2).Infof("target %v\nprotocol %v\n\nvolumeId %v\nmountflags %v\nserverAddress %v",
1✔
364
                        targetPath, protocol, volumeID, mountFlags, serverAddress)
1✔
365

1✔
366
                mountType := AZNFS
1✔
367
                if !d.enableAznfsMount || protocol == NFSv3 {
2✔
368
                        mountType = NFS
1✔
369
                }
1✔
370

371
                if storageEndpointSuffix != "" && mountType == AZNFS {
1✔
372
                        aznfsEndpoint := strings.Replace(storageEndpointSuffix, "core.", "", 1)
×
373
                        klog.V(2).Infof("set AZURE_ENDPOINT_OVERRIDE to %s", aznfsEndpoint)
×
374
                        os.Setenv("AZURE_ENDPOINT_OVERRIDE", aznfsEndpoint)
×
375
                }
×
376

377
                source := fmt.Sprintf("%s:/%s/%s", serverAddress, accountName, containerName)
1✔
378
                mountOptions := util.JoinMountOptions(mountFlags, []string{"sec=sys,vers=3,nolock"})
1✔
379
                execFunc := func() error { return d.mounter.MountSensitive(source, targetPath, mountType, mountOptions, []string{}) }
2✔
380
                timeoutFunc := func() error { return fmt.Errorf("time out") }
1✔
381
                if err := volumehelper.WaitUntilTimeout(90*time.Second, execFunc, timeoutFunc); err != nil {
1✔
382
                        var helpLinkMsg string
×
383
                        if d.appendMountErrorHelpLink {
×
384
                                helpLinkMsg = "\nPlease refer to http://aka.ms/blobmounterror for possible causes and solutions for mount errors."
×
385
                        }
×
386
                        return nil, status.Error(codes.Internal, fmt.Sprintf("volume(%s) mount %q on %q failed with %v%s", volumeID, source, targetPath, err, helpLinkMsg))
×
387
                }
388

389
                if performChmodOp {
1✔
390
                        if err := chmodIfPermissionMismatch(targetPath, os.FileMode(mountPermissions)); err != nil {
×
391
                                return nil, status.Error(codes.Internal, err.Error())
×
392
                        }
×
393
                } else {
1✔
394
                        klog.V(2).Infof("skip chmod on targetPath(%s) since mountPermissions is set as 0", targetPath)
1✔
395
                }
1✔
396

397
                if volumeMountGroup != "" && fsGroupChangePolicy != FSGroupChangeNone {
1✔
398
                        klog.V(2).Infof("set gid of volume(%s) as %s using fsGroupChangePolicy(%s)", volumeID, volumeMountGroup, fsGroupChangePolicy)
×
399
                        if err := volumehelper.SetVolumeOwnership(targetPath, volumeMountGroup, fsGroupChangePolicy); err != nil {
×
400
                                return nil, status.Error(codes.Internal, fmt.Sprintf("SetVolumeOwnership with volume(%s) on %s failed with %v", volumeID, targetPath, err))
×
401
                        }
×
402
                }
403

404
                isOperationSucceeded = true
1✔
405
                klog.V(2).Infof("volume(%s) mount %s on %s succeeded", volumeID, source, targetPath)
1✔
406
                return &csi.NodeStageVolumeResponse{}, nil
1✔
407
        }
408

409
        // Get mountOptions that the volume will be formatted and mounted with
410
        mountOptions := mountFlags
3✔
411
        if ephemeralVol {
4✔
412
                mountOptions = util.JoinMountOptions(mountOptions, strings.Split(ephemeralVolMountOptions, ","))
1✔
413
        }
1✔
414
        if isHnsEnabled {
3✔
415
                mountOptions = util.JoinMountOptions(mountOptions, []string{"--use-adls=true"})
×
416
        }
×
417

418
        if !checkGidPresentInMountFlags(mountFlags) && volumeMountGroup != "" {
3✔
419
                klog.V(2).Infof("append volumeMountGroup %s", volumeMountGroup)
×
420
                mountOptions = append(mountOptions, fmt.Sprintf("-o gid=%s", volumeMountGroup))
×
421
        }
×
422

423
        tmpPath := fmt.Sprintf("%s/%s", "/mnt", volumeID)
3✔
424
        if d.appendTimeStampInCacheDir {
3✔
425
                tmpPath += fmt.Sprintf("#%d", time.Now().Unix())
×
426
        }
×
427
        mountOptions = appendDefaultMountOptions(mountOptions, tmpPath, containerName)
3✔
428

3✔
429
        args := targetPath
3✔
430
        for _, opt := range mountOptions {
22✔
431
                args = args + " " + opt
19✔
432
        }
19✔
433

434
        klog.V(2).Infof("target %v protocol %v volumeId %v\nmountflags %v\nmountOptions %v volumeMountGroup %s\nargs %v\nserverAddress %v",
3✔
435
                targetPath, protocol, volumeID, mountFlags, mountOptions, volumeMountGroup, args, serverAddress)
3✔
436

3✔
437
        authEnv = append(authEnv, "AZURE_STORAGE_ACCOUNT="+accountName, "AZURE_STORAGE_BLOB_ENDPOINT="+serverAddress)
3✔
438
        if d.enableBlobMockMount {
6✔
439
                klog.Warningf("NodeStageVolume: mock mount on volumeID(%s), this is only for TESTING!!!", volumeID)
3✔
440
                if err := volumehelper.MakeDir(targetPath, os.FileMode(mountPermissions)); err != nil {
3✔
441
                        klog.Errorf("MakeDir failed on target: %s (%v)", targetPath, err)
×
442
                        return nil, status.Errorf(codes.Internal, "%v", err)
×
443
                }
×
444
                return &csi.NodeStageVolumeResponse{}, nil
3✔
445
        }
446

447
        var output string
×
448
        if d.enableBlobfuseProxy {
×
449
                output, err = d.mountBlobfuseWithProxy(args, protocol, authEnv)
×
450
        } else {
×
451
                output, err = d.mountBlobfuseInsideDriver(args, protocol, authEnv)
×
452
        }
×
453

454
        if err != nil {
×
455
                var helpLinkMsg string
×
456
                if d.appendMountErrorHelpLink {
×
457
                        helpLinkMsg = "\nPlease refer to http://aka.ms/blobmounterror for possible causes and solutions for mount errors."
×
458
                }
×
459
                err = status.Errorf(codes.Internal, "Mount failed with error: %v, output: %v%s", err, output, helpLinkMsg)
×
460
                klog.Errorf("%v", err)
×
461
                notMnt, mntErr := d.mounter.IsLikelyNotMountPoint(targetPath)
×
462
                if mntErr != nil {
×
463
                        klog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
×
464
                        return nil, err
×
465
                }
×
466
                if !notMnt {
×
467
                        if mntErr = d.mounter.Unmount(targetPath); mntErr != nil {
×
468
                                klog.Errorf("Failed to unmount: %v", mntErr)
×
469
                                return nil, err
×
470
                        }
×
471
                        notMnt, mntErr := d.mounter.IsLikelyNotMountPoint(targetPath)
×
472
                        if mntErr != nil {
×
473
                                klog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
×
474
                                return nil, err
×
475
                        }
×
476
                        if !notMnt {
×
477
                                // This is very odd, we don't expect it.  We'll try again next sync loop.
×
478
                                klog.Errorf("%s is still mounted, despite call to unmount().  Will try again next sync loop.", targetPath)
×
479
                                return nil, err
×
480
                        }
×
481
                }
482
                os.Remove(targetPath)
×
483
                return nil, err
×
484
        }
485

486
        // wait a few seconds to make sure blobfuse mount is successful
487
        // please refer to https://github.com/Azure/azure-storage-fuse/pull/1088 for more details
488
        if err := waitForMount(targetPath, waitForMountInterval, waitForMountTimeout); err != nil {
×
489
                return nil, fmt.Errorf("failed to wait for mount: %w", err)
×
490
        }
×
491

492
        klog.V(2).Infof("volume(%s) mount on %q succeeded", volumeID, targetPath)
×
493
        isOperationSucceeded = true
×
494
        return &csi.NodeStageVolumeResponse{}, nil
×
495
}
496

497
// NodeUnstageVolume unmount the volume from the staging path
498
func (d *Driver) NodeUnstageVolume(_ context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
3✔
499
        volumeID := req.GetVolumeId()
3✔
500
        if len(volumeID) == 0 {
4✔
501
                return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
1✔
502
        }
1✔
503

504
        stagingTargetPath := req.GetStagingTargetPath()
2✔
505
        if len(stagingTargetPath) == 0 {
3✔
506
                return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
1✔
507
        }
1✔
508

509
        lockKey := fmt.Sprintf("%s-%s", volumeID, stagingTargetPath)
1✔
510
        if acquired := d.volumeLocks.TryAcquire(lockKey); !acquired {
1✔
511
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
×
512
        }
×
513
        defer d.volumeLocks.Release(lockKey)
1✔
514

1✔
515
        mc := metrics.NewMetricContext(blobCSIDriverName, "node_unstage_volume", d.cloud.ResourceGroup, "", d.Name)
1✔
516
        isOperationSucceeded := false
1✔
517
        defer func() {
2✔
518
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
1✔
519
        }()
1✔
520

521
        klog.V(2).Infof("NodeUnstageVolume: volume %s unmounting on %s", volumeID, stagingTargetPath)
1✔
522
        err := mount.CleanupMountPoint(stagingTargetPath, d.mounter, true /*extensiveMountPointCheck*/)
1✔
523
        if err != nil {
1✔
524
                return nil, status.Errorf(codes.Internal, "failed to unmount staging target %q: %v", stagingTargetPath, err)
×
525
        }
×
526
        klog.V(2).Infof("NodeUnstageVolume: volume %s unmount on %s successfully", volumeID, stagingTargetPath)
1✔
527

1✔
528
        isOperationSucceeded = true
1✔
529
        return &csi.NodeUnstageVolumeResponse{}, nil
1✔
530
}
531

532
// NodeGetCapabilities return the capabilities of the Node plugin
533
func (d *Driver) NodeGetCapabilities(_ context.Context, _ *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
1✔
534
        return &csi.NodeGetCapabilitiesResponse{
1✔
535
                Capabilities: d.NSCap,
1✔
536
        }, nil
1✔
537
}
1✔
538

539
// NodeGetInfo return info of the node on which this plugin is running
540
func (d *Driver) NodeGetInfo(_ context.Context, _ *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
1✔
541
        return &csi.NodeGetInfoResponse{
1✔
542
                NodeId: d.NodeID,
1✔
543
        }, nil
1✔
544
}
1✔
545

546
// NodeExpandVolume node expand volume
547
func (d *Driver) NodeExpandVolume(_ context.Context, _ *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
1✔
548
        return nil, status.Error(codes.Unimplemented, "NodeExpandVolume is not yet implemented")
1✔
549
}
1✔
550

551
// NodeGetVolumeStats get volume stats
552
func (d *Driver) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
4✔
553
        if len(req.VolumeId) == 0 {
5✔
554
                return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume ID was empty")
1✔
555
        }
1✔
556
        if len(req.VolumePath) == 0 {
4✔
557
                return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume path was empty")
1✔
558
        }
1✔
559

560
        // check if the volume stats is cached
561
        cache, err := d.volStatsCache.Get(ctx, req.VolumeId, azcache.CacheReadTypeDefault)
2✔
562
        if err != nil {
2✔
563
                return nil, status.Errorf(codes.Internal, "%v", err)
×
564
        }
×
565
        if cache != nil {
2✔
566
                resp := cache.(*csi.NodeGetVolumeStatsResponse)
×
567
                klog.V(6).Infof("NodeGetVolumeStats: volume stats for volume %s path %s is cached", req.VolumeId, req.VolumePath)
×
568
                return resp, nil
×
569
        }
×
570

571
        mc := metrics.NewMetricContext(blobCSIDriverName, "node_get_volume_stats", d.cloud.ResourceGroup, "", d.Name)
2✔
572
        mc.LogLevel = 6 // change log level
2✔
573
        isOperationSucceeded := false
2✔
574
        defer func() {
4✔
575
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, req.VolumeId)
2✔
576
        }()
2✔
577

578
        if _, err := os.Lstat(req.VolumePath); err != nil {
3✔
579
                if os.IsNotExist(err) {
2✔
580
                        return nil, status.Errorf(codes.NotFound, "path %s does not exist", req.VolumePath)
1✔
581
                }
1✔
582
                return nil, status.Errorf(codes.Internal, "failed to stat file %s: %v", req.VolumePath, err)
×
583
        }
584

585
        klog.V(6).Infof("NodeGetVolumeStats: begin to get VolumeStats on volume %s path %s", req.VolumeId, req.VolumePath)
1✔
586

1✔
587
        volumeMetrics, err := volume.NewMetricsStatFS(req.VolumePath).GetMetrics()
1✔
588
        if err != nil {
1✔
589
                return nil, status.Errorf(codes.Internal, "failed to get metrics: %v", err)
×
590
        }
×
591

592
        available, ok := volumeMetrics.Available.AsInt64()
1✔
593
        if !ok {
1✔
594
                return nil, status.Errorf(codes.Internal, "failed to transform volume available size(%v)", volumeMetrics.Available)
×
595
        }
×
596
        capacity, ok := volumeMetrics.Capacity.AsInt64()
1✔
597
        if !ok {
1✔
598
                return nil, status.Errorf(codes.Internal, "failed to transform volume capacity size(%v)", volumeMetrics.Capacity)
×
599
        }
×
600
        used, ok := volumeMetrics.Used.AsInt64()
1✔
601
        if !ok {
1✔
602
                return nil, status.Errorf(codes.Internal, "failed to transform volume used size(%v)", volumeMetrics.Used)
×
603
        }
×
604

605
        inodesFree, ok := volumeMetrics.InodesFree.AsInt64()
1✔
606
        if !ok {
1✔
607
                return nil, status.Errorf(codes.Internal, "failed to transform disk inodes free(%v)", volumeMetrics.InodesFree)
×
608
        }
×
609
        inodes, ok := volumeMetrics.Inodes.AsInt64()
1✔
610
        if !ok {
1✔
611
                return nil, status.Errorf(codes.Internal, "failed to transform disk inodes(%v)", volumeMetrics.Inodes)
×
612
        }
×
613
        inodesUsed, ok := volumeMetrics.InodesUsed.AsInt64()
1✔
614
        if !ok {
1✔
615
                return nil, status.Errorf(codes.Internal, "failed to transform disk inodes used(%v)", volumeMetrics.InodesUsed)
×
616
        }
×
617

618
        resp := &csi.NodeGetVolumeStatsResponse{
1✔
619
                Usage: []*csi.VolumeUsage{
1✔
620
                        {
1✔
621
                                Unit:      csi.VolumeUsage_BYTES,
1✔
622
                                Available: available,
1✔
623
                                Total:     capacity,
1✔
624
                                Used:      used,
1✔
625
                        },
1✔
626
                        {
1✔
627
                                Unit:      csi.VolumeUsage_INODES,
1✔
628
                                Available: inodesFree,
1✔
629
                                Total:     inodes,
1✔
630
                                Used:      inodesUsed,
1✔
631
                        },
1✔
632
                },
1✔
633
        }
1✔
634

1✔
635
        isOperationSucceeded = true
1✔
636
        klog.V(6).Infof("NodeGetVolumeStats: volume stats for volume %s path %s is %v", req.VolumeId, req.VolumePath, resp)
1✔
637
        // cache the volume stats per volume
1✔
638
        d.volStatsCache.Set(req.VolumeId, resp)
1✔
639
        return resp, nil
1✔
640
}
641

642
// ensureMountPoint: create mount point if not exists
643
// return <true, nil> if it's already a mounted point otherwise return <false, nil>
644
func (d *Driver) ensureMountPoint(target string, perm os.FileMode) (bool, error) {
18✔
645
        notMnt, err := d.mounter.IsLikelyNotMountPoint(target)
18✔
646
        if err != nil && !os.IsNotExist(err) {
20✔
647
                if IsCorruptedDir(target) {
2✔
648
                        notMnt = false
×
649
                        klog.Warningf("detected corrupted mount for targetPath [%s]", target)
×
650
                } else {
2✔
651
                        return !notMnt, err
2✔
652
                }
2✔
653
        }
654

655
        // Check all the mountpoints in case IsLikelyNotMountPoint
656
        // cannot handle --bind mount
657
        mountList, err := d.mounter.List()
16✔
658
        if err != nil {
16✔
659
                return !notMnt, err
×
660
        }
×
661

662
        targetAbs, err := filepath.Abs(target)
16✔
663
        if err != nil {
16✔
664
                return !notMnt, err
×
665
        }
×
666

667
        for _, mountPoint := range mountList {
16✔
668
                if mountPoint.Path == targetAbs {
×
669
                        notMnt = false
×
670
                        break
×
671
                }
672
        }
673

674
        if !notMnt {
19✔
675
                // testing original mount point, make sure the mount link is valid
3✔
676
                _, err := os.ReadDir(target)
3✔
677
                if err == nil {
5✔
678
                        klog.V(2).Infof("already mounted to target %s", target)
2✔
679
                        return !notMnt, nil
2✔
680
                }
2✔
681
                // mount link is invalid, now unmount and remount later
682
                klog.Warningf("ReadDir %s failed with %v, unmount this directory", target, err)
1✔
683
                if err := d.mounter.Unmount(target); err != nil {
1✔
684
                        klog.Errorf("Unmount directory %s failed with %v", target, err)
×
685
                        return !notMnt, err
×
686
                }
×
687
                notMnt = true
1✔
688
                return !notMnt, err
1✔
689
        }
690
        if err := volumehelper.MakeDir(target, perm); err != nil {
16✔
691
                klog.Errorf("MakeDir failed on target: %s (%v)", target, err)
3✔
692
                return !notMnt, err
3✔
693
        }
3✔
694
        return !notMnt, nil
10✔
695
}
696

697
func waitForMount(path string, intervel, timeout time.Duration) error {
2✔
698
        timeAfter := time.After(timeout)
2✔
699
        timeTick := time.Tick(intervel)
2✔
700

2✔
701
        for {
9✔
702
                select {
7✔
703
                case <-timeTick:
6✔
704
                        notMount, err := mount.New("").IsLikelyNotMountPoint(path)
6✔
705
                        if err != nil {
7✔
706
                                return err
1✔
707
                        }
1✔
708
                        if !notMount {
5✔
709
                                klog.V(2).Infof("blobfuse mount at %s success", path)
×
710
                                return nil
×
711
                        }
×
712
                case <-timeAfter:
1✔
713
                        return fmt.Errorf("timeout waiting for mount %s", path)
1✔
714
                }
715
        }
716
}
717

718
func checkGidPresentInMountFlags(mountFlags []string) bool {
6✔
719
        for _, mountFlag := range mountFlags {
8✔
720
                if strings.Contains(mountFlag, "gid=") {
4✔
721
                        return true
2✔
722
                }
2✔
723
        }
724
        return false
4✔
725
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc