• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 4961304539

12 May 2023 05:20PM UTC coverage: 80.672%. First build
4961304539

push

github

GitHub
chore(deps): bump golang.org/x/net from 0.9.0 to 0.10.0

1824 of 2261 relevant lines covered (80.67%)

5.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.35
/pkg/blob/nodeserver.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package blob
18

19
import (
20
        "fmt"
21
        "io/fs"
22
        "os"
23
        "os/exec"
24
        "path/filepath"
25
        "strconv"
26
        "strings"
27
        "time"
28

29
        volumehelper "sigs.k8s.io/blob-csi-driver/pkg/util"
30

31
        "github.com/Azure/azure-sdk-for-go/storage"
32
        "github.com/container-storage-interface/spec/lib/go/csi"
33

34
        "k8s.io/apimachinery/pkg/util/wait"
35
        "k8s.io/klog/v2"
36
        "k8s.io/kubernetes/pkg/volume"
37
        "k8s.io/kubernetes/pkg/volume/util"
38
        mount "k8s.io/mount-utils"
39

40
        "google.golang.org/grpc/codes"
41
        "google.golang.org/grpc/status"
42

43
        "golang.org/x/net/context"
44
        "google.golang.org/grpc"
45
        mount_azure_blob "sigs.k8s.io/blob-csi-driver/pkg/blobfuse-proxy/pb"
46
)
47

48
const (
49
        waitForMountInterval = 20 * time.Millisecond
50
        waitForMountTimeout  = 60 * time.Second
51
)
52

53
type MountClient struct {
54
        service mount_azure_blob.MountServiceClient
55
}
56

57
// NewMountClient returns a new mount client
58
func NewMountClient(cc *grpc.ClientConn) *MountClient {
1✔
59
        service := mount_azure_blob.NewMountServiceClient(cc)
1✔
60
        return &MountClient{service}
1✔
61
}
1✔
62

63
// NodePublishVolume mount the volume from staging to target path
64
func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
8✔
65
        volCap := req.GetVolumeCapability()
8✔
66
        if volCap == nil {
9✔
67
                return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
1✔
68
        }
1✔
69
        volumeID := req.GetVolumeId()
7✔
70
        if len(req.GetVolumeId()) == 0 {
8✔
71
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
72
        }
1✔
73

74
        target := req.GetTargetPath()
6✔
75
        if len(target) == 0 {
7✔
76
                return nil, status.Error(codes.InvalidArgument, "Target path not provided")
1✔
77
        }
1✔
78

79
        mountPermissions := d.mountPermissions
5✔
80
        context := req.GetVolumeContext()
5✔
81
        if context != nil {
7✔
82
                if strings.EqualFold(context[ephemeralField], trueValue) {
2✔
83
                        setKeyValueInMap(context, secretNamespaceField, context[podNamespaceField])
×
84
                        if !d.allowInlineVolumeKeyAccessWithIdentity {
×
85
                                // only get storage account from secret
×
86
                                setKeyValueInMap(context, getAccountKeyFromSecretField, trueValue)
×
87
                                setKeyValueInMap(context, storageAccountField, "")
×
88
                        }
×
89
                        klog.V(2).Infof("NodePublishVolume: ephemeral volume(%s) mount on %s, VolumeContext: %v", volumeID, target, context)
×
90
                        _, err := d.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{
×
91
                                StagingTargetPath: target,
×
92
                                VolumeContext:     context,
×
93
                                VolumeCapability:  volCap,
×
94
                                VolumeId:          volumeID,
×
95
                        })
×
96
                        return &csi.NodePublishVolumeResponse{}, err
×
97
                }
98

99
                if perm := context[mountPermissionsField]; perm != "" {
4✔
100
                        var err error
2✔
101
                        if mountPermissions, err = strconv.ParseUint(perm, 8, 32); err != nil {
3✔
102
                                return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid mountPermissions %s", perm))
1✔
103
                        }
1✔
104
                }
105
        }
106

107
        source := req.GetStagingTargetPath()
4✔
108
        if len(source) == 0 {
5✔
109
                return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
1✔
110
        }
1✔
111

112
        mountOptions := []string{"bind"}
3✔
113
        if req.GetReadonly() {
6✔
114
                mountOptions = append(mountOptions, "ro")
3✔
115
        }
3✔
116

117
        mnt, err := d.ensureMountPoint(target, fs.FileMode(mountPermissions))
3✔
118
        if err != nil {
4✔
119
                return nil, status.Errorf(codes.Internal, "Could not mount target %q: %v", target, err)
1✔
120
        }
1✔
121
        if mnt {
2✔
122
                klog.V(2).Infof("NodePublishVolume: volume %s is already mounted on %s", volumeID, target)
×
123
                return &csi.NodePublishVolumeResponse{}, nil
×
124
        }
×
125

126
        klog.V(2).Infof("NodePublishVolume: volume %s mounting %s at %s with mountOptions: %v", volumeID, source, target, mountOptions)
2✔
127
        if d.enableBlobMockMount {
2✔
128
                klog.Warningf("NodePublishVolume: mock mount on volumeID(%s), this is only for TESTING!!!", volumeID)
×
129
                if err := volumehelper.MakeDir(target, os.FileMode(mountPermissions)); err != nil {
×
130
                        klog.Errorf("MakeDir failed on target: %s (%v)", target, err)
×
131
                        return nil, status.Errorf(codes.Internal, err.Error())
×
132
                }
×
133
                return &csi.NodePublishVolumeResponse{}, nil
×
134
        }
135

136
        if err := d.mounter.Mount(source, target, "", mountOptions); err != nil {
2✔
137
                if removeErr := os.Remove(target); removeErr != nil {
×
138
                        return nil, status.Errorf(codes.Internal, "Could not remove mount target %q: %v", target, removeErr)
×
139
                }
×
140
                return nil, status.Errorf(codes.Internal, "Could not mount %q at %q: %v", source, target, err)
×
141
        }
142
        klog.V(2).Infof("NodePublishVolume: volume %s mount %s at %s successfully", volumeID, source, target)
2✔
143

2✔
144
        return &csi.NodePublishVolumeResponse{}, nil
2✔
145
}
146

147
func (d *Driver) mountBlobfuseWithProxy(args, protocol string, authEnv []string) (string, error) {
1✔
148
        var resp *mount_azure_blob.MountAzureBlobResponse
1✔
149
        var output string
1✔
150
        connectionTimout := time.Duration(d.blobfuseProxyConnTimout) * time.Second
1✔
151
        ctx, cancel := context.WithTimeout(context.Background(), connectionTimout)
1✔
152
        defer cancel()
1✔
153
        klog.V(2).Infof("start connecting to blobfuse proxy, protocol: %s, args: %s", protocol, args)
1✔
154
        conn, err := grpc.DialContext(ctx, d.blobfuseProxyEndpoint, grpc.WithInsecure(), grpc.WithBlock())
1✔
155
        if err == nil {
1✔
156
                mountClient := NewMountClient(conn)
×
157
                mountreq := mount_azure_blob.MountAzureBlobRequest{
×
158
                        MountArgs: args,
×
159
                        Protocol:  protocol,
×
160
                        AuthEnv:   authEnv,
×
161
                }
×
162
                klog.V(2).Infof("begin to mount with blobfuse proxy, protocol: %s, args: %s", protocol, args)
×
163
                resp, err = mountClient.service.MountAzureBlob(context.TODO(), &mountreq)
×
164
                if err != nil {
×
165
                        klog.Error("GRPC call returned with an error:", err)
×
166
                }
×
167
                output = resp.GetOutput()
×
168
        }
169
        return output, err
1✔
170
}
171

172
func (d *Driver) mountBlobfuseInsideDriver(args string, protocol string, authEnv []string) (string, error) {
1✔
173
        var cmd *exec.Cmd
1✔
174

1✔
175
        args = volumehelper.TrimDuplicatedSpace(args)
1✔
176

1✔
177
        mountLog := "mount inside driver with"
1✔
178
        if protocol == Fuse2 {
1✔
179
                mountLog += " v2"
×
180
                args = "mount " + args
×
181
                cmd = exec.Command("blobfuse2", strings.Split(args, " ")...)
×
182
        } else {
1✔
183
                mountLog += " v1"
1✔
184
                cmd = exec.Command("blobfuse", strings.Split(args, " ")...)
1✔
185
        }
1✔
186
        klog.V(2).Infof("%s, protocol: %s, args: %s", mountLog, protocol, args)
1✔
187

1✔
188
        cmd.Env = append(os.Environ(), authEnv...)
1✔
189
        output, err := cmd.CombinedOutput()
1✔
190
        klog.V(2).Infof("mount output: %s\n", string(output))
1✔
191

1✔
192
        return string(output), err
1✔
193
}
194

195
// NodeUnpublishVolume unmount the volume from the target path
196
func (d *Driver) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
3✔
197
        volumeID := req.GetVolumeId()
3✔
198
        if len(volumeID) == 0 {
4✔
199
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
200
        }
1✔
201
        targetPath := req.GetTargetPath()
2✔
202
        if len(targetPath) == 0 {
3✔
203
                return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
1✔
204
        }
1✔
205

206
        klog.V(2).Infof("NodeUnpublishVolume: unmounting volume %s on %s", volumeID, targetPath)
1✔
207
        err := mount.CleanupMountPoint(targetPath, d.mounter, true /*extensiveMountPointCheck*/)
1✔
208
        if err != nil {
1✔
209
                return nil, status.Errorf(codes.Internal, "failed to unmount target %q: %v", targetPath, err)
×
210
        }
×
211
        klog.V(2).Infof("NodeUnpublishVolume: unmount volume %s on %s successfully", volumeID, targetPath)
1✔
212

1✔
213
        return &csi.NodeUnpublishVolumeResponse{}, nil
1✔
214
}
215

216
// NodeStageVolume mount the volume to a staging path
217
func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
9✔
218
        volumeID := req.GetVolumeId()
9✔
219
        if len(volumeID) == 0 {
10✔
220
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
221
        }
1✔
222
        targetPath := req.GetStagingTargetPath()
8✔
223
        if len(targetPath) == 0 {
9✔
224
                return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
1✔
225
        }
1✔
226
        volumeCapability := req.GetVolumeCapability()
7✔
227
        if volumeCapability == nil {
8✔
228
                return nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
1✔
229
        }
1✔
230

231
        if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
8✔
232
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
2✔
233
        }
2✔
234
        defer d.volumeLocks.Release(volumeID)
4✔
235

4✔
236
        mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
4✔
237
        attrib := req.GetVolumeContext()
4✔
238
        secrets := req.GetSecrets()
4✔
239

4✔
240
        var serverAddress, storageEndpointSuffix, protocol, ephemeralVolMountOptions string
4✔
241
        var ephemeralVol, isHnsEnabled bool
4✔
242

4✔
243
        containerNameReplaceMap := map[string]string{}
4✔
244

4✔
245
        mountPermissions := d.mountPermissions
4✔
246
        performChmodOp := (mountPermissions > 0)
4✔
247
        for k, v := range attrib {
10✔
248
                switch strings.ToLower(k) {
6✔
249
                case serverNameField:
×
250
                        serverAddress = v
×
251
                case protocolField:
2✔
252
                        protocol = v
2✔
253
                case storageEndpointSuffixField:
×
254
                        storageEndpointSuffix = v
×
255
                case ephemeralField:
×
256
                        ephemeralVol = strings.EqualFold(v, trueValue)
×
257
                case mountOptionsField:
×
258
                        ephemeralVolMountOptions = v
×
259
                case isHnsEnabledField:
×
260
                        isHnsEnabled = strings.EqualFold(v, trueValue)
×
261
                case pvcNamespaceKey:
×
262
                        containerNameReplaceMap[pvcNamespaceMetadata] = v
×
263
                case pvcNameKey:
×
264
                        containerNameReplaceMap[pvcNameMetadata] = v
×
265
                case pvNameKey:
×
266
                        containerNameReplaceMap[pvNameMetadata] = v
×
267
                case mountPermissionsField:
4✔
268
                        if v != "" {
8✔
269
                                var err error
4✔
270
                                var perm uint64
4✔
271
                                if perm, err = strconv.ParseUint(v, 8, 32); err != nil {
5✔
272
                                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid mountPermissions %s", v))
1✔
273
                                }
1✔
274
                                if perm == 0 {
3✔
275
                                        performChmodOp = false
×
276
                                } else {
3✔
277
                                        mountPermissions = perm
3✔
278
                                }
3✔
279
                        }
280
                }
281
        }
282

283
        mnt, err := d.ensureMountPoint(targetPath, fs.FileMode(mountPermissions))
3✔
284
        if err != nil {
4✔
285
                return nil, status.Errorf(codes.Internal, "Could not mount target %q: %v", targetPath, err)
1✔
286
        }
1✔
287
        if mnt {
2✔
288
                klog.V(2).Infof("NodeStageVolume: volume %s is already mounted on %s", volumeID, targetPath)
×
289
                return &csi.NodeStageVolumeResponse{}, nil
×
290
        }
×
291

292
        _, accountName, _, containerName, authEnv, err := d.GetAuthEnv(ctx, volumeID, protocol, attrib, secrets)
2✔
293
        if err != nil {
2✔
294
                return nil, status.Errorf(codes.Internal, err.Error())
×
295
        }
×
296

297
        // replace pv/pvc name namespace metadata in subDir
298
        containerName = replaceWithMap(containerName, containerNameReplaceMap)
2✔
299

2✔
300
        if strings.TrimSpace(storageEndpointSuffix) == "" {
4✔
301
                if d.cloud.Environment.StorageEndpointSuffix != "" {
2✔
302
                        storageEndpointSuffix = d.cloud.Environment.StorageEndpointSuffix
×
303
                } else {
2✔
304
                        storageEndpointSuffix = storage.DefaultBaseURL
2✔
305
                }
2✔
306
        }
307

308
        if strings.TrimSpace(serverAddress) == "" {
4✔
309
                // server address is "accountname.blob.core.windows.net" by default
2✔
310
                serverAddress = fmt.Sprintf("%s.blob.%s", accountName, storageEndpointSuffix)
2✔
311
        }
2✔
312

313
        if protocol == NFS {
3✔
314
                klog.V(2).Infof("target %v\nprotocol %v\n\nvolumeId %v\ncontext %v\nmountflags %v\nserverAddress %v",
1✔
315
                        targetPath, protocol, volumeID, attrib, mountFlags, serverAddress)
1✔
316

1✔
317
                source := fmt.Sprintf("%s:/%s/%s", serverAddress, accountName, containerName)
1✔
318
                mountOptions := util.JoinMountOptions(mountFlags, []string{"sec=sys,vers=3,nolock"})
1✔
319
                if err := wait.PollImmediate(1*time.Second, 2*time.Minute, func() (bool, error) {
2✔
320
                        return true, d.mounter.MountSensitive(source, targetPath, NFS, mountOptions, []string{})
1✔
321
                }); err != nil {
1✔
322
                        var helpLinkMsg string
×
323
                        if d.appendMountErrorHelpLink {
×
324
                                helpLinkMsg = "\nPlease refer to http://aka.ms/blobmounterror for possible causes and solutions for mount errors."
×
325
                        }
×
326
                        return nil, status.Error(codes.Internal, fmt.Sprintf("volume(%s) mount %q on %q failed with %v%s", volumeID, source, targetPath, err, helpLinkMsg))
×
327
                }
328

329
                if performChmodOp {
1✔
330
                        if err := chmodIfPermissionMismatch(targetPath, os.FileMode(mountPermissions)); err != nil {
×
331
                                return nil, status.Error(codes.Internal, err.Error())
×
332
                        }
×
333
                } else {
1✔
334
                        klog.V(2).Infof("skip chmod on targetPath(%s) since mountPermissions is set as 0", targetPath)
1✔
335
                }
1✔
336

337
                klog.V(2).Infof("volume(%s) mount %s on %s succeeded", volumeID, source, targetPath)
1✔
338
                return &csi.NodeStageVolumeResponse{}, nil
1✔
339
        }
340

341
        // Get mountOptions that the volume will be formatted and mounted with
342
        mountOptions := mountFlags
1✔
343
        if ephemeralVol {
1✔
344
                mountOptions = util.JoinMountOptions(mountOptions, strings.Split(ephemeralVolMountOptions, ","))
×
345
        }
×
346
        if isHnsEnabled {
1✔
347
                mountOptions = util.JoinMountOptions(mountOptions, []string{"--use-adls=true"})
×
348
        }
×
349
        tmpPath := fmt.Sprintf("%s/%s", "/mnt", volumeID)
1✔
350
        if d.appendTimeStampInCacheDir {
1✔
351
                tmpPath += fmt.Sprintf("#%d", time.Now().Unix())
×
352
        }
×
353
        mountOptions = appendDefaultMountOptions(mountOptions, tmpPath, containerName)
1✔
354

1✔
355
        args := targetPath
1✔
356
        for _, opt := range mountOptions {
7✔
357
                args = args + " " + opt
6✔
358
        }
6✔
359

360
        klog.V(2).Infof("target %v\nprotocol %v\n\nvolumeId %v\ncontext %v\nmountflags %v\nmountOptions %v\nargs %v\nserverAddress %v",
1✔
361
                targetPath, protocol, volumeID, attrib, mountFlags, mountOptions, args, serverAddress)
1✔
362

1✔
363
        authEnv = append(authEnv, "AZURE_STORAGE_ACCOUNT="+accountName, "AZURE_STORAGE_BLOB_ENDPOINT="+serverAddress)
1✔
364
        if d.enableBlobMockMount {
2✔
365
                klog.Warningf("NodeStageVolume: mock mount on volumeID(%s), this is only for TESTING!!!", volumeID)
1✔
366
                if err := volumehelper.MakeDir(targetPath, os.FileMode(mountPermissions)); err != nil {
1✔
367
                        klog.Errorf("MakeDir failed on target: %s (%v)", targetPath, err)
×
368
                        return nil, status.Errorf(codes.Internal, err.Error())
×
369
                }
×
370
                return &csi.NodeStageVolumeResponse{}, nil
1✔
371
        }
372

373
        var output string
×
374
        if d.enableBlobfuseProxy {
×
375
                output, err = d.mountBlobfuseWithProxy(args, protocol, authEnv)
×
376
        } else {
×
377
                output, err = d.mountBlobfuseInsideDriver(args, protocol, authEnv)
×
378
        }
×
379

380
        if err != nil {
×
381
                var helpLinkMsg string
×
382
                if d.appendMountErrorHelpLink {
×
383
                        helpLinkMsg = "\nPlease refer to http://aka.ms/blobmounterror for possible causes and solutions for mount errors."
×
384
                }
×
385
                err = status.Errorf(codes.Internal, "Mount failed with error: %v, output: %v%s", err, output, helpLinkMsg)
×
386
                klog.Errorf("%v", err)
×
387
                notMnt, mntErr := d.mounter.IsLikelyNotMountPoint(targetPath)
×
388
                if mntErr != nil {
×
389
                        klog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
×
390
                        return nil, err
×
391
                }
×
392
                if !notMnt {
×
393
                        if mntErr = d.mounter.Unmount(targetPath); mntErr != nil {
×
394
                                klog.Errorf("Failed to unmount: %v", mntErr)
×
395
                                return nil, err
×
396
                        }
×
397
                        notMnt, mntErr := d.mounter.IsLikelyNotMountPoint(targetPath)
×
398
                        if mntErr != nil {
×
399
                                klog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
×
400
                                return nil, err
×
401
                        }
×
402
                        if !notMnt {
×
403
                                // This is very odd, we don't expect it.  We'll try again next sync loop.
×
404
                                klog.Errorf("%s is still mounted, despite call to unmount().  Will try again next sync loop.", targetPath)
×
405
                                return nil, err
×
406
                        }
×
407
                }
408
                os.Remove(targetPath)
×
409
                return nil, err
×
410
        }
411

412
        // wait a few seconds to make sure blobfuse mount is successful
413
        // please refer to https://github.com/Azure/azure-storage-fuse/pull/1088 for more details
414
        if err := waitForMount(targetPath, waitForMountInterval, waitForMountTimeout); err != nil {
×
415
                return nil, fmt.Errorf("failed to wait for mount: %w", err)
×
416
        }
×
417

418
        klog.V(2).Infof("volume(%s) mount on %q succeeded", volumeID, targetPath)
×
419
        return &csi.NodeStageVolumeResponse{}, nil
×
420
}
421

422
// NodeUnstageVolume unmount the volume from the staging path
423
func (d *Driver) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
3✔
424
        volumeID := req.GetVolumeId()
3✔
425
        if len(volumeID) == 0 {
4✔
426
                return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
1✔
427
        }
1✔
428

429
        stagingTargetPath := req.GetStagingTargetPath()
2✔
430
        if len(stagingTargetPath) == 0 {
3✔
431
                return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
1✔
432
        }
1✔
433

434
        if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
1✔
435
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
×
436
        }
×
437
        defer d.volumeLocks.Release(volumeID)
1✔
438

1✔
439
        klog.V(2).Infof("NodeUnstageVolume: volume %s unmounting on %s", volumeID, stagingTargetPath)
1✔
440
        err := mount.CleanupMountPoint(stagingTargetPath, d.mounter, true /*extensiveMountPointCheck*/)
1✔
441
        if err != nil {
1✔
442
                return nil, status.Errorf(codes.Internal, "failed to unmount staging target %q: %v", stagingTargetPath, err)
×
443
        }
×
444
        klog.V(2).Infof("NodeUnstageVolume: volume %s unmount on %s successfully", volumeID, stagingTargetPath)
1✔
445

1✔
446
        return &csi.NodeUnstageVolumeResponse{}, nil
1✔
447
}
448

449
// NodeGetCapabilities return the capabilities of the Node plugin
450
func (d *Driver) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
1✔
451
        return &csi.NodeGetCapabilitiesResponse{
1✔
452
                Capabilities: d.NSCap,
1✔
453
        }, nil
1✔
454
}
1✔
455

456
// NodeGetInfo return info of the node on which this plugin is running
457
func (d *Driver) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
1✔
458
        return &csi.NodeGetInfoResponse{
1✔
459
                NodeId: d.NodeID,
1✔
460
        }, nil
1✔
461
}
1✔
462

463
// NodeExpandVolume node expand volume
464
func (d *Driver) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
1✔
465
        return nil, status.Error(codes.Unimplemented, "NodeExpandVolume is not yet implemented")
1✔
466
}
1✔
467

468
// NodeGetVolumeStats get volume stats
469
func (d *Driver) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
4✔
470
        if len(req.VolumeId) == 0 {
5✔
471
                return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume ID was empty")
1✔
472
        }
1✔
473
        if len(req.VolumePath) == 0 {
4✔
474
                return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume path was empty")
1✔
475
        }
1✔
476

477
        if _, err := os.Lstat(req.VolumePath); err != nil {
3✔
478
                if os.IsNotExist(err) {
2✔
479
                        return nil, status.Errorf(codes.NotFound, "path %s does not exist", req.VolumePath)
1✔
480
                }
1✔
481
                return nil, status.Errorf(codes.Internal, "failed to stat file %s: %v", req.VolumePath, err)
×
482
        }
483

484
        volumeMetrics, err := volume.NewMetricsStatFS(req.VolumePath).GetMetrics()
1✔
485
        if err != nil {
1✔
486
                return nil, status.Errorf(codes.Internal, "failed to get metrics: %v", err)
×
487
        }
×
488

489
        available, ok := volumeMetrics.Available.AsInt64()
1✔
490
        if !ok {
1✔
491
                return nil, status.Errorf(codes.Internal, "failed to transform volume available size(%v)", volumeMetrics.Available)
×
492
        }
×
493
        capacity, ok := volumeMetrics.Capacity.AsInt64()
1✔
494
        if !ok {
1✔
495
                return nil, status.Errorf(codes.Internal, "failed to transform volume capacity size(%v)", volumeMetrics.Capacity)
×
496
        }
×
497
        used, ok := volumeMetrics.Used.AsInt64()
1✔
498
        if !ok {
1✔
499
                return nil, status.Errorf(codes.Internal, "failed to transform volume used size(%v)", volumeMetrics.Used)
×
500
        }
×
501

502
        inodesFree, ok := volumeMetrics.InodesFree.AsInt64()
1✔
503
        if !ok {
1✔
504
                return nil, status.Errorf(codes.Internal, "failed to transform disk inodes free(%v)", volumeMetrics.InodesFree)
×
505
        }
×
506
        inodes, ok := volumeMetrics.Inodes.AsInt64()
1✔
507
        if !ok {
1✔
508
                return nil, status.Errorf(codes.Internal, "failed to transform disk inodes(%v)", volumeMetrics.Inodes)
×
509
        }
×
510
        inodesUsed, ok := volumeMetrics.InodesUsed.AsInt64()
1✔
511
        if !ok {
1✔
512
                return nil, status.Errorf(codes.Internal, "failed to transform disk inodes used(%v)", volumeMetrics.InodesUsed)
×
513
        }
×
514

515
        return &csi.NodeGetVolumeStatsResponse{
1✔
516
                Usage: []*csi.VolumeUsage{
1✔
517
                        {
1✔
518
                                Unit:      csi.VolumeUsage_BYTES,
1✔
519
                                Available: available,
1✔
520
                                Total:     capacity,
1✔
521
                                Used:      used,
1✔
522
                        },
1✔
523
                        {
1✔
524
                                Unit:      csi.VolumeUsage_INODES,
1✔
525
                                Available: inodesFree,
1✔
526
                                Total:     inodes,
1✔
527
                                Used:      inodesUsed,
1✔
528
                        },
1✔
529
                },
1✔
530
        }, nil
1✔
531
}
532

533
// ensureMountPoint: create mount point if not exists
534
// return <true, nil> if it's already a mounted point otherwise return <false, nil>
535
func (d *Driver) ensureMountPoint(target string, perm os.FileMode) (bool, error) {
11✔
536
        notMnt, err := d.mounter.IsLikelyNotMountPoint(target)
11✔
537
        if err != nil && !os.IsNotExist(err) {
13✔
538
                if IsCorruptedDir(target) {
2✔
539
                        notMnt = false
×
540
                        klog.Warningf("detected corrupted mount for targetPath [%s]", target)
×
541
                } else {
2✔
542
                        return !notMnt, err
2✔
543
                }
2✔
544
        }
545

546
        // Check all the mountpoints in case IsLikelyNotMountPoint
547
        // cannot handle --bind mount
548
        mountList, err := d.mounter.List()
9✔
549
        if err != nil {
9✔
550
                return !notMnt, err
×
551
        }
×
552

553
        targetAbs, err := filepath.Abs(target)
9✔
554
        if err != nil {
9✔
555
                return !notMnt, err
×
556
        }
×
557

558
        for _, mountPoint := range mountList {
9✔
559
                if mountPoint.Path == targetAbs {
×
560
                        notMnt = false
×
561
                        break
×
562
                }
563
        }
564

565
        if !notMnt {
11✔
566
                // testing original mount point, make sure the mount link is valid
2✔
567
                _, err := os.ReadDir(target)
2✔
568
                if err == nil {
3✔
569
                        klog.V(2).Infof("already mounted to target %s", target)
1✔
570
                        return !notMnt, nil
1✔
571
                }
1✔
572
                // mount link is invalid, now unmount and remount later
573
                klog.Warningf("ReadDir %s failed with %v, unmount this directory", target, err)
1✔
574
                if err := d.mounter.Unmount(target); err != nil {
1✔
575
                        klog.Errorf("Unmount directory %s failed with %v", target, err)
×
576
                        return !notMnt, err
×
577
                }
×
578
                notMnt = true
1✔
579
                return !notMnt, err
1✔
580
        }
581
        if err := volumehelper.MakeDir(target, perm); err != nil {
9✔
582
                klog.Errorf("MakeDir failed on target: %s (%v)", target, err)
2✔
583
                return !notMnt, err
2✔
584
        }
2✔
585
        return !notMnt, nil
5✔
586
}
587

588
func waitForMount(path string, intervel, timeout time.Duration) error {
2✔
589
        timeAfter := time.After(timeout)
2✔
590
        timeTick := time.Tick(intervel)
2✔
591

2✔
592
        for {
12✔
593
                select {
10✔
594
                case <-timeTick:
9✔
595
                        notMount, err := mount.New("").IsLikelyNotMountPoint(path)
9✔
596
                        if err != nil {
10✔
597
                                return err
1✔
598
                        }
1✔
599
                        if !notMount {
8✔
600
                                klog.V(2).Infof("blobfuse mount at %s success", path)
×
601
                                return nil
×
602
                        }
×
603
                case <-timeAfter:
1✔
604
                        return fmt.Errorf("timeout waiting for mount %s", path)
1✔
605
                }
606
        }
607
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc