• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 17509749312

06 Sep 2025 04:05AM UTC coverage: 80.485%. Remained the same
17509749312

push

github

web-flow
Merge pull request #2167 from kubernetes-sigs/dependabot/go_modules/github.com/onsi/ginkgo/v2-2.25.3

chore(deps): bump github.com/onsi/ginkgo/v2 from 2.25.2 to 2.25.3

2454 of 3049 relevant lines covered (80.49%)

8.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.88
/pkg/util/util.go
1
/*
2
Copyright 2019 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package util
18

19
import (
20
        "fmt"
21
        "os"
22
        "os/exec"
23
        "regexp"
24
        "strconv"
25
        "strings"
26
        "sync"
27
        "time"
28

29
        "github.com/go-ini/ini"
30
        "github.com/pkg/errors"
31
        v1 "k8s.io/api/core/v1"
32
        "k8s.io/client-go/kubernetes"
33
        "k8s.io/client-go/rest"
34
        "k8s.io/client-go/tools/clientcmd"
35
        "k8s.io/klog/v2"
36
        "k8s.io/kubernetes/pkg/volume"
37
)
38

39
const (
40
        GiB                  = 1024 * 1024 * 1024
41
        TiB                  = 1024 * GiB
42
        tagKeyValueDelimiter = "="
43
)
44

45
type AzcopyJobState string
46

47
const (
48
        AzcopyJobError                         AzcopyJobState = "Error"
49
        AzcopyJobNotFound                      AzcopyJobState = "NotFound"
50
        AzcopyJobRunning                       AzcopyJobState = "Running"
51
        AzcopyJobCompleted                     AzcopyJobState = "Completed"
52
        AzcopyJobCompletedWithErrors           AzcopyJobState = "CompletedWithErrors"
53
        AzcopyJobCompletedWithSkipped          AzcopyJobState = "CompletedWithSkipped"
54
        AzcopyJobCompletedWithErrorsAndSkipped AzcopyJobState = "CompletedWithErrorsAndSkipped"
55
)
56

57
// RoundUpBytes rounds up the volume size in bytes up to multiplications of GiB
58
// in the unit of Bytes
59
func RoundUpBytes(volumeSizeBytes int64) int64 {
1✔
60
        return roundUpSize(volumeSizeBytes, GiB) * GiB
1✔
61
}
1✔
62

63
// RoundUpGiB rounds up the volume size in bytes up to multiplications of GiB
64
// in the unit of GiB
65
func RoundUpGiB(volumeSizeBytes int64) int64 {
1✔
66
        return roundUpSize(volumeSizeBytes, GiB)
1✔
67
}
1✔
68

69
// BytesToGiB conversts Bytes to GiB
70
func BytesToGiB(volumeSizeBytes int64) int64 {
1✔
71
        return volumeSizeBytes / GiB
1✔
72
}
1✔
73

74
// GiBToBytes converts GiB to Bytes
75
func GiBToBytes(volumeSizeGiB int64) int64 {
1✔
76
        return volumeSizeGiB * GiB
1✔
77
}
1✔
78

79
// roundUpSize calculates how many allocation units are needed to accommodate
80
// a volume of given size. E.g. when user wants 1500MiB volume, while AWS EBS
81
// allocates volumes in gibibyte-sized chunks,
82
// RoundUpSize(1500 * 1024*1024, 1024*1024*1024) returns '2'
83
// (2 GiB is the smallest allocatable volume that can hold 1500MiB)
84
func roundUpSize(volumeSizeBytes int64, allocationUnitBytes int64) int64 {
2✔
85
        roundedUp := volumeSizeBytes / allocationUnitBytes
2✔
86
        if volumeSizeBytes%allocationUnitBytes > 0 {
4✔
87
                roundedUp++
2✔
88
        }
2✔
89
        return roundedUp
2✔
90
}
91

92
// GetMountOptions return options with string list separated by space
93
func GetMountOptions(options []string) string {
4✔
94
        if len(options) == 0 {
5✔
95
                return ""
1✔
96
        }
1✔
97
        str := options[0]
3✔
98
        for i := 1; i < len(options); i++ {
5✔
99
                str = str + " " + options[i]
2✔
100
        }
2✔
101
        return str
3✔
102
}
103

104
func MakeDir(pathname string, perm os.FileMode) error {
2✔
105
        if err := os.MkdirAll(pathname, perm); err != nil {
2✔
106
                if !os.IsExist(err) {
×
107
                        return err
×
108
                }
×
109
        }
110
        return nil
2✔
111
}
112

113
// LockMap used to lock on entries
114
type LockMap struct {
115
        sync.Mutex
116
        mutexMap map[string]*sync.Mutex
117
}
118

119
// NewLockMap returns a new lock map
120
func NewLockMap() *LockMap {
4✔
121
        return &LockMap{
4✔
122
                mutexMap: make(map[string]*sync.Mutex),
4✔
123
        }
4✔
124
}
4✔
125

126
// LockEntry acquires a lock associated with the specific entry
127
func (lm *LockMap) LockEntry(entry string) {
5✔
128
        lm.Lock()
5✔
129
        // check if entry does not exists, then add entry
5✔
130
        if _, exists := lm.mutexMap[entry]; !exists {
9✔
131
                lm.addEntry(entry)
4✔
132
        }
4✔
133

134
        lm.Unlock()
5✔
135
        lm.lockEntry(entry)
5✔
136
}
137

138
// UnlockEntry release the lock associated with the specific entry
139
func (lm *LockMap) UnlockEntry(entry string) {
5✔
140
        lm.Lock()
5✔
141
        defer lm.Unlock()
5✔
142

5✔
143
        if _, exists := lm.mutexMap[entry]; !exists {
6✔
144
                return
1✔
145
        }
1✔
146
        lm.unlockEntry(entry)
4✔
147
}
148

149
func (lm *LockMap) addEntry(entry string) {
4✔
150
        lm.mutexMap[entry] = &sync.Mutex{}
4✔
151
}
4✔
152

153
func (lm *LockMap) lockEntry(entry string) {
5✔
154
        lm.mutexMap[entry].Lock()
5✔
155
}
5✔
156

157
func (lm *LockMap) unlockEntry(entry string) {
4✔
158
        lm.mutexMap[entry].Unlock()
4✔
159
}
4✔
160

161
func ConvertTagsToMap(tags string, tagsDelimiter string) (map[string]string, error) {
10✔
162
        m := make(map[string]string)
10✔
163
        if tags == "" {
11✔
164
                return m, nil
1✔
165
        }
1✔
166
        if tagsDelimiter == "" {
14✔
167
                tagsDelimiter = ","
5✔
168
        }
5✔
169
        s := strings.Split(tags, tagsDelimiter)
9✔
170
        for _, tag := range s {
23✔
171
                kv := strings.SplitN(tag, tagKeyValueDelimiter, 2)
14✔
172
                if len(kv) != 2 {
16✔
173
                        return nil, fmt.Errorf("Tags '%s' are invalid, the format should like: 'key1=value1%skey2=value2'", tags, tagsDelimiter)
2✔
174
                }
2✔
175
                key := strings.TrimSpace(kv[0])
12✔
176
                if key == "" {
14✔
177
                        return nil, fmt.Errorf("Tags '%s' are invalid, the format should like: 'key1=value1%skey2=value2'", tags, tagsDelimiter)
2✔
178
                }
2✔
179
                value := strings.TrimSpace(kv[1])
10✔
180
                m[key] = value
10✔
181
        }
182
        return m, nil
5✔
183
}
184

185
type OsInfo struct {
186
        Distro  string
187
        Version string
188
}
189

190
const (
191
        keyID        = "ID"
192
        keyVersionID = "VERSION_ID"
193
)
194

195
func GetOSInfo(f interface{}) (*OsInfo, error) {
2✔
196
        cfg, err := ini.Load(f)
2✔
197
        if err != nil {
3✔
198
                return nil, errors.Wrapf(err, "failed to read %q", f)
1✔
199
        }
1✔
200

201
        oi := &OsInfo{}
1✔
202
        oi.Distro = cfg.Section("").Key(keyID).String()
1✔
203
        oi.Version = cfg.Section("").Key(keyVersionID).String()
1✔
204

1✔
205
        klog.V(2).Infof("get OS info: %v", oi)
1✔
206
        return oi, nil
1✔
207
}
208

209
func TrimDuplicatedSpace(s string) string {
1✔
210
        reg := regexp.MustCompile(`\s+`)
1✔
211
        s = reg.ReplaceAllString(s, " ")
1✔
212
        return s
1✔
213
}
1✔
214

215
type EXEC interface {
216
        RunCommand(string, []string) (string, error)
217
}
218

219
type ExecCommand struct {
220
}
221

222
func (ec *ExecCommand) RunCommand(cmdStr string, authEnv []string) (string, error) {
×
223
        cmd := exec.Command("sh", "-c", cmdStr)
×
224
        if len(authEnv) > 0 {
×
225
                cmd.Env = append(os.Environ(), authEnv...)
×
226
        }
×
227
        out, err := cmd.CombinedOutput()
×
228
        return string(out), err
×
229
}
230

231
type Azcopy struct {
232
        ExecCmd EXEC
233
}
234

235
// GetAzcopyJob get the azcopy job status if job existed
236
func (ac *Azcopy) GetAzcopyJob(dstBlobContainer string, authAzcopyEnv []string) (AzcopyJobState, string, error) {
10✔
237
        cmdStr := fmt.Sprintf("azcopy jobs list | grep %s -B 3", dstBlobContainer)
10✔
238
        // cmd output example:
10✔
239
        // JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9
10✔
240
        // Start Time: Monday, 07-Aug-23 03:29:54 UTC
10✔
241
        // Status: Completed (or Cancelled, InProgress)
10✔
242
        // Command: copy https://{accountName}.file.core.windows.net/{srcBlobContainer}{SAStoken} https://{accountName}.file.core.windows.net/{dstBlobContainer}{SAStoken} --recursive --check-length=false
10✔
243
        // --
10✔
244
        // JobId: b598cce3-9aa9-9640-7793-c2bf3c385a9a
10✔
245
        // Start Time: Wednesday, 09-Aug-23 09:09:03 UTC
10✔
246
        // Status: Cancelled
10✔
247
        // Command: copy https://{accountName}.file.core.windows.net/{srcBlobContainer}{SAStoken} https://{accountName}.file.core.windows.net/{dstBlobContainer}{SAStoken} --recursive --check-length=false
10✔
248
        out, err := ac.ExecCmd.RunCommand(cmdStr, authAzcopyEnv)
10✔
249
        // if grep command returns nothing, the exec will return exit status 1 error, so filter this error
10✔
250
        if err != nil && err.Error() != "exit status 1" {
11✔
251
                klog.Warningf("failed to get azcopy job with error: %v, jobState: %v", err, AzcopyJobError)
1✔
252
                return AzcopyJobError, "", fmt.Errorf("couldn't list jobs in azcopy %v", err)
1✔
253
        }
1✔
254
        jobid, jobState, err := parseAzcopyJobList(out)
9✔
255
        if err != nil || jobState == AzcopyJobError {
10✔
256
                klog.Warningf("failed to get azcopy job with error: %v, jobState: %v", err, jobState)
1✔
257
                return AzcopyJobError, "", fmt.Errorf("couldn't parse azcopy job list in azcopy %v", err)
1✔
258
        }
1✔
259
        if jobState == AzcopyJobCompleted || jobState == AzcopyJobCompletedWithErrors || jobState == AzcopyJobCompletedWithSkipped || jobState == AzcopyJobCompletedWithErrorsAndSkipped {
12✔
260
                return jobState, "100.0", err
4✔
261
        }
4✔
262
        if jobid == "" {
5✔
263
                return jobState, "", err
1✔
264
        }
1✔
265
        cmdPercentStr := fmt.Sprintf("azcopy jobs show %s | grep Percent", jobid)
3✔
266
        // cmd out example:
3✔
267
        // Percent Complete (approx): 100.0
3✔
268
        summary, err := ac.ExecCmd.RunCommand(cmdPercentStr, authAzcopyEnv)
3✔
269
        if err != nil {
4✔
270
                klog.Warningf("failed to get azcopy job with error: %v, jobState: %v", err, AzcopyJobError)
1✔
271
                return AzcopyJobError, "", fmt.Errorf("couldn't show jobs summary in azcopy %v", err)
1✔
272
        }
1✔
273
        jobState, percent, err := parseAzcopyJobShow(summary)
2✔
274
        if err != nil || jobState == AzcopyJobError {
3✔
275
                klog.Warningf("failed to get azcopy job with error: %v, jobState: %v", err, jobState)
1✔
276
                return AzcopyJobError, "", fmt.Errorf("couldn't parse azcopy job show in azcopy %v", err)
1✔
277
        }
1✔
278
        return jobState, percent, nil
1✔
279
}
280

281
func (ac *Azcopy) CleanJobs() (string, error) {
2✔
282
        return ac.ExecCmd.RunCommand("azcopy jobs clean", nil)
2✔
283
}
2✔
284

285
// parseAzcopyJobList parse command azcopy jobs list, get jobid and state from joblist
286
func parseAzcopyJobList(joblist string) (string, AzcopyJobState, error) {
18✔
287
        jobid := ""
18✔
288
        jobSegments := strings.Split(joblist, "JobId: ")
18✔
289
        if len(jobSegments) < 2 {
19✔
290
                return jobid, AzcopyJobNotFound, nil
1✔
291
        }
1✔
292
        jobSegments = jobSegments[1:]
17✔
293
        for _, job := range jobSegments {
34✔
294
                segments := strings.Split(job, "\n")
17✔
295
                if len(segments) < 4 {
19✔
296
                        return jobid, AzcopyJobError, fmt.Errorf("error parsing jobs list: %s", job)
2✔
297
                }
2✔
298
                statusSegments := strings.Split(segments[2], ": ")
15✔
299
                if len(statusSegments) < 2 {
16✔
300
                        return jobid, AzcopyJobError, fmt.Errorf("error parsing jobs list status: %s", segments[2])
1✔
301
                }
1✔
302
                status := statusSegments[1]
14✔
303
                switch status {
14✔
304
                case "InProgress":
4✔
305
                        jobid = segments[0]
4✔
306
                case "Completed":
2✔
307
                        return jobid, AzcopyJobCompleted, nil
2✔
308
                case "CompletedWithErrors":
2✔
309
                        return jobid, AzcopyJobCompletedWithErrors, nil
2✔
310
                case "CompletedWithSkipped":
2✔
311
                        return jobid, AzcopyJobCompletedWithSkipped, nil
2✔
312
                case "CompletedWithErrorsAndSkipped":
2✔
313
                        return jobid, AzcopyJobCompletedWithErrorsAndSkipped, nil
2✔
314
                }
315
        }
316
        if jobid == "" {
8✔
317
                return jobid, AzcopyJobNotFound, nil
2✔
318
        }
2✔
319
        return jobid, AzcopyJobRunning, nil
4✔
320
}
321

322
// parseAzcopyJobShow parse command azcopy jobs show jobid, get job state and copy percent
323
func parseAzcopyJobShow(jobshow string) (AzcopyJobState, string, error) {
4✔
324
        segments := strings.Split(jobshow, ": ")
4✔
325
        if len(segments) < 2 {
6✔
326
                return AzcopyJobError, "", fmt.Errorf("error parsing jobs summary: %s in Percent Complete (approx)", jobshow)
2✔
327
        }
2✔
328
        return AzcopyJobRunning, strings.ReplaceAll(segments[1], "\n", ""), nil
2✔
329
}
330

331
func GetKubeClient(kubeconfig string, kubeAPIQPS float64, kubeAPIBurst int, userAgent string) (kubernetes.Interface, error) {
4✔
332
        var err error
4✔
333
        var kubeCfg *rest.Config
4✔
334
        if kubeconfig == "no-need-kubeconfig" {
5✔
335
                klog.V(2).Infof("kubeconfig is set as no-need-kubeconfig, kubeClient will be nil")
1✔
336
                return nil, nil
1✔
337
        }
1✔
338
        if kubeCfg, err = clientcmd.BuildConfigFromFlags("", kubeconfig); err != nil {
5✔
339
                return nil, err
2✔
340
        }
2✔
341
        if kubeCfg == nil {
1✔
342
                if kubeCfg, err = rest.InClusterConfig(); err != nil {
×
343
                        return nil, err
×
344
                }
×
345
        }
346
        //kubeCfg should not be nil
347
        // set QPS and QPS Burst as higher values
348
        klog.V(2).Infof("set QPS(%f) and QPS Burst(%d) for driver kubeClient", float32(kubeAPIQPS), kubeAPIBurst)
1✔
349
        kubeCfg.QPS = float32(kubeAPIQPS)
1✔
350
        kubeCfg.Burst = kubeAPIBurst
1✔
351
        kubeCfg.UserAgent = userAgent
1✔
352
        return kubernetes.NewForConfig(kubeCfg)
1✔
353
}
354

355
type VolumeMounter struct {
356
        path       string
357
        attributes volume.Attributes
358
}
359

360
func (l *VolumeMounter) GetPath() string {
1✔
361
        return l.path
1✔
362
}
1✔
363

364
func (l *VolumeMounter) GetAttributes() volume.Attributes {
7✔
365
        return l.attributes
7✔
366
}
7✔
367

368
func (l *VolumeMounter) CanMount() error {
1✔
369
        return nil
1✔
370
}
1✔
371

372
func (l *VolumeMounter) SetUp(_ volume.MounterArgs) error {
1✔
373
        return nil
1✔
374
}
1✔
375

376
func (l *VolumeMounter) SetUpAt(_ string, _ volume.MounterArgs) error {
1✔
377
        return nil
1✔
378
}
1✔
379

380
func (l *VolumeMounter) GetMetrics() (*volume.Metrics, error) {
1✔
381
        return nil, nil
1✔
382
}
1✔
383

384
// SetVolumeOwnership would set gid for path recursively
385
func SetVolumeOwnership(path, gid, policy string) error {
6✔
386
        id, err := strconv.Atoi(gid)
6✔
387
        if err != nil {
8✔
388
                return fmt.Errorf("convert %s to int failed with %v", gid, err)
2✔
389
        }
2✔
390
        gidInt64 := int64(id)
4✔
391
        fsGroupChangePolicy := v1.FSGroupChangeOnRootMismatch
4✔
392
        if policy != "" {
6✔
393
                fsGroupChangePolicy = v1.PodFSGroupChangePolicy(policy)
2✔
394
        }
2✔
395
        return volume.NewVolumeOwnership(&VolumeMounter{path: path}, path, &gidInt64, &fsGroupChangePolicy, nil).ChangePermissions()
4✔
396
}
397

398
// ExecFunc returns a exec function's output and error
399
type ExecFunc func() (err error)
400

401
// TimeoutFunc returns output and error if an ExecFunc timeout
402
type TimeoutFunc func() (err error)
403

404
// WaitUntilTimeout waits for the exec function to complete or return timeout error
405
func WaitUntilTimeout(timeout time.Duration, execFunc ExecFunc, timeoutFunc TimeoutFunc) error {
3✔
406
        // Create a channel to receive the result of the azcopy exec function
3✔
407
        done := make(chan bool, 1)
3✔
408
        var err error
3✔
409

3✔
410
        // Start the azcopy exec function in a goroutine
3✔
411
        go func() {
6✔
412
                err = execFunc()
3✔
413
                done <- true
3✔
414
        }()
3✔
415

416
        // Wait for the function to complete or time out
417
        select {
3✔
418
        case <-done:
2✔
419
                return err
2✔
420
        case <-time.After(timeout):
1✔
421
                return timeoutFunc()
1✔
422
        }
423
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc