• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 13386305256

18 Feb 2025 08:27AM UTC coverage: 77.861%. Remained the same
13386305256

Pull #1839

github

andyzhangx
feat: optimize azcopy perf in volume cloning scenario
Pull Request #1839: feat: optimize azcopy perf in volume cloning scenario

2286 of 2936 relevant lines covered (77.86%)

7.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.73
/pkg/util/util.go
1
/*
2
Copyright 2019 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package util
18

19
import (
20
        "fmt"
21
        "os"
22
        "os/exec"
23
        "regexp"
24
        "strconv"
25
        "strings"
26
        "sync"
27
        "time"
28

29
        "github.com/go-ini/ini"
30
        "github.com/pkg/errors"
31
        v1 "k8s.io/api/core/v1"
32
        "k8s.io/client-go/kubernetes"
33
        "k8s.io/client-go/rest"
34
        "k8s.io/client-go/tools/clientcmd"
35
        "k8s.io/klog/v2"
36
        "k8s.io/kubernetes/pkg/volume"
37
)
38

39
const (
40
        GiB                  = 1024 * 1024 * 1024
41
        TiB                  = 1024 * GiB
42
        tagKeyValueDelimiter = "="
43
)
44

45
type AzcopyJobState string
46

47
const (
48
        AzcopyJobError     AzcopyJobState = "Error"
49
        AzcopyJobNotFound  AzcopyJobState = "NotFound"
50
        AzcopyJobRunning   AzcopyJobState = "Running"
51
        AzcopyJobCompleted AzcopyJobState = "Completed"
52
)
53

54
// RoundUpBytes rounds up the volume size in bytes up to multiplications of GiB
55
// in the unit of Bytes
56
func RoundUpBytes(volumeSizeBytes int64) int64 {
1✔
57
        return roundUpSize(volumeSizeBytes, GiB) * GiB
1✔
58
}
1✔
59

60
// RoundUpGiB rounds up the volume size in bytes up to multiplications of GiB
61
// in the unit of GiB
62
func RoundUpGiB(volumeSizeBytes int64) int64 {
1✔
63
        return roundUpSize(volumeSizeBytes, GiB)
1✔
64
}
1✔
65

66
// BytesToGiB conversts Bytes to GiB
67
func BytesToGiB(volumeSizeBytes int64) int64 {
1✔
68
        return volumeSizeBytes / GiB
1✔
69
}
1✔
70

71
// GiBToBytes converts GiB to Bytes
72
func GiBToBytes(volumeSizeGiB int64) int64 {
1✔
73
        return volumeSizeGiB * GiB
1✔
74
}
1✔
75

76
// roundUpSize calculates how many allocation units are needed to accommodate
77
// a volume of given size. E.g. when user wants 1500MiB volume, while AWS EBS
78
// allocates volumes in gibibyte-sized chunks,
79
// RoundUpSize(1500 * 1024*1024, 1024*1024*1024) returns '2'
80
// (2 GiB is the smallest allocatable volume that can hold 1500MiB)
81
func roundUpSize(volumeSizeBytes int64, allocationUnitBytes int64) int64 {
2✔
82
        roundedUp := volumeSizeBytes / allocationUnitBytes
2✔
83
        if volumeSizeBytes%allocationUnitBytes > 0 {
4✔
84
                roundedUp++
2✔
85
        }
2✔
86
        return roundedUp
2✔
87
}
88

89
// GetMountOptions return options with string list separated by space
90
func GetMountOptions(options []string) string {
4✔
91
        if len(options) == 0 {
5✔
92
                return ""
1✔
93
        }
1✔
94
        str := options[0]
3✔
95
        for i := 1; i < len(options); i++ {
5✔
96
                str = str + " " + options[i]
2✔
97
        }
2✔
98
        return str
3✔
99
}
100

101
func MakeDir(pathname string, perm os.FileMode) error {
2✔
102
        if err := os.MkdirAll(pathname, perm); err != nil {
2✔
103
                if !os.IsExist(err) {
×
104
                        return err
×
105
                }
×
106
        }
107
        return nil
2✔
108
}
109

110
// LockMap used to lock on entries
111
type LockMap struct {
112
        sync.Mutex
113
        mutexMap map[string]*sync.Mutex
114
}
115

116
// NewLockMap returns a new lock map
117
func NewLockMap() *LockMap {
4✔
118
        return &LockMap{
4✔
119
                mutexMap: make(map[string]*sync.Mutex),
4✔
120
        }
4✔
121
}
4✔
122

123
// LockEntry acquires a lock associated with the specific entry
124
func (lm *LockMap) LockEntry(entry string) {
5✔
125
        lm.Lock()
5✔
126
        // check if entry does not exists, then add entry
5✔
127
        if _, exists := lm.mutexMap[entry]; !exists {
9✔
128
                lm.addEntry(entry)
4✔
129
        }
4✔
130

131
        lm.Unlock()
5✔
132
        lm.lockEntry(entry)
5✔
133
}
134

135
// UnlockEntry release the lock associated with the specific entry
136
func (lm *LockMap) UnlockEntry(entry string) {
5✔
137
        lm.Lock()
5✔
138
        defer lm.Unlock()
5✔
139

5✔
140
        if _, exists := lm.mutexMap[entry]; !exists {
6✔
141
                return
1✔
142
        }
1✔
143
        lm.unlockEntry(entry)
4✔
144
}
145

146
func (lm *LockMap) addEntry(entry string) {
4✔
147
        lm.mutexMap[entry] = &sync.Mutex{}
4✔
148
}
4✔
149

150
func (lm *LockMap) lockEntry(entry string) {
5✔
151
        lm.mutexMap[entry].Lock()
5✔
152
}
5✔
153

154
func (lm *LockMap) unlockEntry(entry string) {
4✔
155
        lm.mutexMap[entry].Unlock()
4✔
156
}
4✔
157

158
func ConvertTagsToMap(tags string, tagsDelimiter string) (map[string]string, error) {
10✔
159
        m := make(map[string]string)
10✔
160
        if tags == "" {
11✔
161
                return m, nil
1✔
162
        }
1✔
163
        if tagsDelimiter == "" {
14✔
164
                tagsDelimiter = ","
5✔
165
        }
5✔
166
        s := strings.Split(tags, tagsDelimiter)
9✔
167
        for _, tag := range s {
23✔
168
                kv := strings.SplitN(tag, tagKeyValueDelimiter, 2)
14✔
169
                if len(kv) != 2 {
16✔
170
                        return nil, fmt.Errorf("Tags '%s' are invalid, the format should like: 'key1=value1%skey2=value2'", tags, tagsDelimiter)
2✔
171
                }
2✔
172
                key := strings.TrimSpace(kv[0])
12✔
173
                if key == "" {
14✔
174
                        return nil, fmt.Errorf("Tags '%s' are invalid, the format should like: 'key1=value1%skey2=value2'", tags, tagsDelimiter)
2✔
175
                }
2✔
176
                value := strings.TrimSpace(kv[1])
10✔
177
                m[key] = value
10✔
178
        }
179
        return m, nil
5✔
180
}
181

182
type OsInfo struct {
183
        Distro  string
184
        Version string
185
}
186

187
const (
188
        keyID        = "ID"
189
        keyVersionID = "VERSION_ID"
190
)
191

192
func GetOSInfo(f interface{}) (*OsInfo, error) {
2✔
193
        cfg, err := ini.Load(f)
2✔
194
        if err != nil {
3✔
195
                return nil, errors.Wrapf(err, "failed to read %q", f)
1✔
196
        }
1✔
197

198
        oi := &OsInfo{}
1✔
199
        oi.Distro = cfg.Section("").Key(keyID).String()
1✔
200
        oi.Version = cfg.Section("").Key(keyVersionID).String()
1✔
201

1✔
202
        klog.V(2).Infof("get OS info: %v", oi)
1✔
203
        return oi, nil
1✔
204
}
205

206
func TrimDuplicatedSpace(s string) string {
1✔
207
        reg := regexp.MustCompile(`\s+`)
1✔
208
        s = reg.ReplaceAllString(s, " ")
1✔
209
        return s
1✔
210
}
1✔
211

212
type EXEC interface {
213
        RunCommand(string, []string) (string, error)
214
}
215

216
type ExecCommand struct {
217
}
218

219
func (ec *ExecCommand) RunCommand(cmdStr string, authEnv []string) (string, error) {
×
220
        cmd := exec.Command("sh", "-c", cmdStr)
×
221
        if len(authEnv) > 0 {
×
222
                cmd.Env = append(os.Environ(), authEnv...)
×
223
        }
×
224
        out, err := cmd.CombinedOutput()
×
225
        return string(out), err
×
226
}
227

228
type Azcopy struct {
229
        ExecCmd EXEC
230
}
231

232
// GetAzcopyJob get the azcopy job status if job existed
233
func (ac *Azcopy) GetAzcopyJob(dstBlobContainer string, authAzcopyEnv []string) (AzcopyJobState, string, error) {
7✔
234
        cmdStr := fmt.Sprintf("azcopy jobs list | grep %s -B 3", dstBlobContainer)
7✔
235
        // cmd output example:
7✔
236
        // JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9
7✔
237
        // Start Time: Monday, 07-Aug-23 03:29:54 UTC
7✔
238
        // Status: Completed (or Cancelled, InProgress)
7✔
239
        // Command: copy https://{accountName}.file.core.windows.net/{srcBlobContainer}{SAStoken} https://{accountName}.file.core.windows.net/{dstBlobContainer}{SAStoken} --recursive --check-length=false
7✔
240
        // --
7✔
241
        // JobId: b598cce3-9aa9-9640-7793-c2bf3c385a9a
7✔
242
        // Start Time: Wednesday, 09-Aug-23 09:09:03 UTC
7✔
243
        // Status: Cancelled
7✔
244
        // Command: copy https://{accountName}.file.core.windows.net/{srcBlobContainer}{SAStoken} https://{accountName}.file.core.windows.net/{dstBlobContainer}{SAStoken} --recursive --check-length=false
7✔
245
        if ac.ExecCmd == nil {
7✔
246
                ac.ExecCmd = &ExecCommand{}
×
247
        }
×
248
        out, err := ac.ExecCmd.RunCommand(cmdStr, authAzcopyEnv)
7✔
249
        // if grep command returns nothing, the exec will return exit status 1 error, so filter this error
7✔
250
        if err != nil && err.Error() != "exit status 1" {
8✔
251
                klog.Warningf("failed to get azcopy job with error: %v, jobState: %v", err, AzcopyJobError)
1✔
252
                return AzcopyJobError, "", fmt.Errorf("couldn't list jobs in azcopy %v", err)
1✔
253
        }
1✔
254
        jobid, jobState, err := parseAzcopyJobList(out)
6✔
255
        if err != nil || jobState == AzcopyJobError {
7✔
256
                klog.Warningf("failed to get azcopy job with error: %v, jobState: %v", err, jobState)
1✔
257
                return AzcopyJobError, "", fmt.Errorf("couldn't parse azcopy job list in azcopy %v", err)
1✔
258
        }
1✔
259
        if jobState == AzcopyJobCompleted {
6✔
260
                return jobState, "100.0", err
1✔
261
        }
1✔
262
        if jobid == "" {
5✔
263
                return jobState, "", err
1✔
264
        }
1✔
265
        cmdPercentStr := fmt.Sprintf("azcopy jobs show %s | grep Percent", jobid)
3✔
266
        // cmd out example:
3✔
267
        // Percent Complete (approx): 100.0
3✔
268
        summary, err := ac.ExecCmd.RunCommand(cmdPercentStr, authAzcopyEnv)
3✔
269
        if err != nil {
4✔
270
                klog.Warningf("failed to get azcopy job with error: %v, jobState: %v", err, AzcopyJobError)
1✔
271
                return AzcopyJobError, "", fmt.Errorf("couldn't show jobs summary in azcopy %v", err)
1✔
272
        }
1✔
273
        jobState, percent, err := parseAzcopyJobShow(summary)
2✔
274
        if err != nil || jobState == AzcopyJobError {
3✔
275
                klog.Warningf("failed to get azcopy job with error: %v, jobState: %v", err, jobState)
1✔
276
                return AzcopyJobError, "", fmt.Errorf("couldn't parse azcopy job show in azcopy %v", err)
1✔
277
        }
1✔
278
        return jobState, percent, nil
1✔
279
}
280

281
// TestListJobs test azcopy jobs list command with authAzcopyEnv
282
func (ac *Azcopy) TestListJobs(accountName, storageEndpointSuffix string, authAzcopyEnv []string) (string, error) {
×
283
        cmdStr := fmt.Sprintf("azcopy list %s", fmt.Sprintf("https://%s.blob.%s", accountName, storageEndpointSuffix))
×
284
        if ac.ExecCmd == nil {
×
285
                ac.ExecCmd = &ExecCommand{}
×
286
        }
×
287
        return ac.ExecCmd.RunCommand(cmdStr, authAzcopyEnv)
×
288
}
289

290
// parseAzcopyJobList parse command azcopy jobs list, get jobid and state from joblist
291
func parseAzcopyJobList(joblist string) (string, AzcopyJobState, error) {
12✔
292
        jobid := ""
12✔
293
        jobSegments := strings.Split(joblist, "JobId: ")
12✔
294
        if len(jobSegments) < 2 {
13✔
295
                return jobid, AzcopyJobNotFound, nil
1✔
296
        }
1✔
297
        jobSegments = jobSegments[1:]
11✔
298
        for _, job := range jobSegments {
22✔
299
                segments := strings.Split(job, "\n")
11✔
300
                if len(segments) < 4 {
13✔
301
                        return jobid, AzcopyJobError, fmt.Errorf("error parsing jobs list: %s", job)
2✔
302
                }
2✔
303
                statusSegments := strings.Split(segments[2], ": ")
9✔
304
                if len(statusSegments) < 2 {
10✔
305
                        return jobid, AzcopyJobError, fmt.Errorf("error parsing jobs list status: %s", segments[2])
1✔
306
                }
1✔
307
                status := statusSegments[1]
8✔
308
                switch status {
8✔
309
                case "InProgress":
4✔
310
                        jobid = segments[0]
4✔
311
                case "Completed":
2✔
312
                        return jobid, AzcopyJobCompleted, nil
2✔
313
                }
314
        }
315
        if jobid == "" {
8✔
316
                return jobid, AzcopyJobNotFound, nil
2✔
317
        }
2✔
318
        return jobid, AzcopyJobRunning, nil
4✔
319
}
320

321
// parseAzcopyJobShow parse command azcopy jobs show jobid, get job state and copy percent
322
func parseAzcopyJobShow(jobshow string) (AzcopyJobState, string, error) {
4✔
323
        segments := strings.Split(jobshow, ": ")
4✔
324
        if len(segments) < 2 {
6✔
325
                return AzcopyJobError, "", fmt.Errorf("error parsing jobs summary: %s in Percent Complete (approx)", jobshow)
2✔
326
        }
2✔
327
        return AzcopyJobRunning, strings.ReplaceAll(segments[1], "\n", ""), nil
2✔
328
}
329

330
func GetKubeClient(kubeconfig string, kubeAPIQPS float64, kubeAPIBurst int, userAgent string) (kubernetes.Interface, error) {
4✔
331
        var err error
4✔
332
        var kubeCfg *rest.Config
4✔
333
        if kubeconfig == "no-need-kubeconfig" {
5✔
334
                klog.V(2).Infof("kubeconfig is set as no-need-kubeconfig, kubeClient will be nil")
1✔
335
                return nil, nil
1✔
336
        }
1✔
337
        if kubeCfg, err = clientcmd.BuildConfigFromFlags("", kubeconfig); err != nil {
5✔
338
                return nil, err
2✔
339
        }
2✔
340
        if kubeCfg == nil {
1✔
341
                if kubeCfg, err = rest.InClusterConfig(); err != nil {
×
342
                        return nil, err
×
343
                }
×
344
        }
345
        //kubeCfg should not be nil
346
        // set QPS and QPS Burst as higher values
347
        klog.V(2).Infof("set QPS(%f) and QPS Burst(%d) for driver kubeClient", float32(kubeAPIQPS), kubeAPIBurst)
1✔
348
        kubeCfg.QPS = float32(kubeAPIQPS)
1✔
349
        kubeCfg.Burst = kubeAPIBurst
1✔
350
        kubeCfg.UserAgent = userAgent
1✔
351
        return kubernetes.NewForConfig(kubeCfg)
1✔
352
}
353

354
type VolumeMounter struct {
355
        path       string
356
        attributes volume.Attributes
357
}
358

359
func (l *VolumeMounter) GetPath() string {
1✔
360
        return l.path
1✔
361
}
1✔
362

363
func (l *VolumeMounter) GetAttributes() volume.Attributes {
7✔
364
        return l.attributes
7✔
365
}
7✔
366

367
func (l *VolumeMounter) CanMount() error {
1✔
368
        return nil
1✔
369
}
1✔
370

371
func (l *VolumeMounter) SetUp(_ volume.MounterArgs) error {
1✔
372
        return nil
1✔
373
}
1✔
374

375
func (l *VolumeMounter) SetUpAt(_ string, _ volume.MounterArgs) error {
1✔
376
        return nil
1✔
377
}
1✔
378

379
func (l *VolumeMounter) GetMetrics() (*volume.Metrics, error) {
1✔
380
        return nil, nil
1✔
381
}
1✔
382

383
// SetVolumeOwnership would set gid for path recursively
384
func SetVolumeOwnership(path, gid, policy string) error {
6✔
385
        id, err := strconv.Atoi(gid)
6✔
386
        if err != nil {
8✔
387
                return fmt.Errorf("convert %s to int failed with %v", gid, err)
2✔
388
        }
2✔
389
        gidInt64 := int64(id)
4✔
390
        fsGroupChangePolicy := v1.FSGroupChangeOnRootMismatch
4✔
391
        if policy != "" {
6✔
392
                fsGroupChangePolicy = v1.PodFSGroupChangePolicy(policy)
2✔
393
        }
2✔
394
        return volume.SetVolumeOwnership(&VolumeMounter{path: path}, path, &gidInt64, &fsGroupChangePolicy, nil)
4✔
395
}
396

397
// ExecFunc returns a exec function's output and error
398
type ExecFunc func() (err error)
399

400
// TimeoutFunc returns output and error if an ExecFunc timeout
401
type TimeoutFunc func() (err error)
402

403
// WaitUntilTimeout waits for the exec function to complete or return timeout error
404
func WaitUntilTimeout(timeout time.Duration, execFunc ExecFunc, timeoutFunc TimeoutFunc) error {
3✔
405
        // Create a channel to receive the result of the azcopy exec function
3✔
406
        done := make(chan bool)
3✔
407
        var err error
3✔
408

3✔
409
        // Start the azcopy exec function in a goroutine
3✔
410
        go func() {
6✔
411
                err = execFunc()
3✔
412
                done <- true
3✔
413
        }()
3✔
414

415
        // Wait for the function to complete or time out
416
        select {
3✔
417
        case <-done:
2✔
418
                return err
2✔
419
        case <-time.After(timeout):
1✔
420
                return timeoutFunc()
1✔
421
        }
422
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc