• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jstaf / onedriver / 6539229776

16 Oct 2023 09:05PM UTC coverage: 71.743% (-2.4%) from 74.097%
6539229776

Pull #357

github

jstaf
0.14.0-2 dummy release to force gpg key refresh on OBS
Pull Request #357: 0.14.0-2 dummy release to force gpg key refresh on OBS

1828 of 2548 relevant lines covered (71.74%)

172113.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.56
/fs/upload_manager.go
1
package fs
2

3
import (
4
        "encoding/json"
5
        "time"
6

7
        "github.com/jstaf/onedriver/fs/graph"
8
        "github.com/rs/zerolog/log"
9
        bolt "go.etcd.io/bbolt"
10
)
11

12
const maxUploadsInFlight = 5
13

14
var bucketUploads = []byte("uploads")
15

16
// UploadManager is used to manage and retry uploads.
17
type UploadManager struct {
18
        queue         chan *UploadSession
19
        deletionQueue chan string
20
        sessions      map[string]*UploadSession
21
        inFlight      uint8 // number of sessions in flight
22
        auth          *graph.Auth
23
        fs            *Filesystem
24
        db            *bolt.DB
25
}
26

27
// NewUploadManager creates a new queue/thread for uploads
28
func NewUploadManager(duration time.Duration, db *bolt.DB, fs *Filesystem, auth *graph.Auth) *UploadManager {
10✔
29
        manager := UploadManager{
10✔
30
                queue:         make(chan *UploadSession),
10✔
31
                deletionQueue: make(chan string, 1000), // FIXME - why does this chan need to be buffered now???
10✔
32
                sessions:      make(map[string]*UploadSession),
10✔
33
                auth:          auth,
10✔
34
                db:            db,
10✔
35
                fs:            fs,
10✔
36
        }
10✔
37
        db.View(func(tx *bolt.Tx) error {
20✔
38
                // Add any incomplete sessions from disk - any sessions here were never
10✔
39
                // finished. The most likely cause of this is that the user shut off
10✔
40
                // their computer or closed the program after starting the upload.
10✔
41
                b := tx.Bucket(bucketUploads)
10✔
42
                if b == nil {
19✔
43
                        // bucket does not exist yet, bail out early
9✔
44
                        return nil
9✔
45
                }
9✔
46
                return b.ForEach(func(key []byte, val []byte) error {
2✔
47
                        session := &UploadSession{}
1✔
48
                        err := json.Unmarshal(val, session)
1✔
49
                        if err != nil {
1✔
50
                                log.Error().Err(err).Msg("Failure restoring upload sessions from disk.")
×
51
                                return err
×
52
                        }
×
53
                        if session.getState() != uploadNotStarted {
1✔
54
                                manager.inFlight++
×
55
                        }
×
56
                        session.cancel(auth) // uploads are currently non-resumable
1✔
57
                        manager.sessions[session.ID] = session
1✔
58
                        return nil
1✔
59
                })
60
        })
61
        go manager.uploadLoop(duration)
10✔
62
        return &manager
10✔
63
}
64

65
// uploadLoop manages the deduplication and tracking of uploads
66
func (u *UploadManager) uploadLoop(duration time.Duration) {
10✔
67
        ticker := time.NewTicker(duration)
10✔
68
        for {
628✔
69
                select {
618✔
70
                case session := <-u.queue: // new sessions
48✔
71
                        // deduplicate sessions for the same item
48✔
72
                        if old, exists := u.sessions[session.ID]; exists {
58✔
73
                                old.cancel(u.auth)
10✔
74
                        }
10✔
75
                        contents, _ := json.Marshal(session)
48✔
76
                        u.db.Batch(func(tx *bolt.Tx) error {
96✔
77
                                // persist to disk in case the user shuts off their computer or
48✔
78
                                // kills onedriver prematurely
48✔
79
                                b, _ := tx.CreateBucketIfNotExists(bucketUploads)
48✔
80
                                return b.Put([]byte(session.ID), contents)
48✔
81
                        })
48✔
82
                        u.sessions[session.ID] = session
48✔
83

84
                case cancelID := <-u.deletionQueue: // remove uploads for deleted items
57✔
85
                        u.finishUpload(cancelID)
57✔
86

87
                case <-ticker.C: // periodically start uploads, or remove them if done/failed
503✔
88
                        for _, session := range u.sessions {
568✔
89
                                switch session.getState() {
65✔
90
                                case uploadNotStarted:
34✔
91
                                        // max active upload sessions are capped at this limit for faster
34✔
92
                                        // uploads of individual files and also to prevent possible server-
34✔
93
                                        // side throttling that can cause errors.
34✔
94
                                        if u.inFlight < maxUploadsInFlight {
67✔
95
                                                u.inFlight++
33✔
96
                                                go session.Upload(u.auth)
33✔
97
                                        }
33✔
98

99
                                case uploadErrored:
×
100
                                        session.retries++
×
101
                                        if session.retries > 5 {
×
102
                                                log.Error().
×
103
                                                        Str("id", session.ID).
×
104
                                                        Str("name", session.Name).
×
105
                                                        Err(session).
×
106
                                                        Int("retries", session.retries).
×
107
                                                        Msg("Upload session failed too many times, cancelling session.")
×
108
                                                u.finishUpload(session.ID)
×
109
                                        }
×
110

111
                                        log.Warn().
×
112
                                                Str("id", session.ID).
×
113
                                                Str("name", session.Name).
×
114
                                                Err(session).
×
115
                                                Msg("Upload session failed, will retry from beginning.")
×
116
                                        session.cancel(u.auth) // cancel large sessions
×
117
                                        session.setState(uploadNotStarted, nil)
×
118

119
                                case uploadComplete:
30✔
120
                                        log.Info().
30✔
121
                                                Str("id", session.ID).
30✔
122
                                                Str("oldID", session.OldID).
30✔
123
                                                Str("name", session.Name).
30✔
124
                                                Msg("Upload completed!")
30✔
125

30✔
126
                                        // ID changed during upload, move to new ID
30✔
127
                                        if session.OldID != session.ID {
54✔
128
                                                err := u.fs.MoveID(session.OldID, session.ID)
24✔
129
                                                if err != nil {
24✔
130
                                                        log.Error().
×
131
                                                                Str("id", session.ID).
×
132
                                                                Str("oldID", session.OldID).
×
133
                                                                Str("name", session.Name).
×
134
                                                                Err(err).
×
135
                                                                Msg("Could not move inode to new ID!")
×
136
                                                }
×
137
                                        }
138

139
                                        // inode will exist at the new ID now, but we check if inode
140
                                        // is nil to see if the item has been deleted since upload start
141
                                        if inode := u.fs.GetID(session.ID); inode != nil {
60✔
142
                                                inode.Lock()
30✔
143
                                                inode.DriveItem.ETag = session.ETag
30✔
144
                                                inode.Unlock()
30✔
145
                                        }
30✔
146

147
                                        // the old ID is the one that was used to add it to the queue.
148
                                        // cleanup the session.
149
                                        u.finishUpload(session.OldID)
30✔
150
                                }
151
                        }
152
                }
153
        }
154
}
155

156
// QueueUpload queues an item for upload.
157
func (u *UploadManager) QueueUpload(inode *Inode) error {
48✔
158
        data := u.fs.getInodeContent(inode)
48✔
159
        session, err := NewUploadSession(inode, data)
48✔
160
        if err == nil {
96✔
161
                u.queue <- session
48✔
162
        }
48✔
163
        return err
48✔
164
}
165

166
// CancelUpload is used to kill any pending uploads for a session
167
func (u *UploadManager) CancelUpload(id string) {
57✔
168
        u.deletionQueue <- id
57✔
169
}
57✔
170

171
// finishUpload is an internal method that gets called when a session is
172
// completed. It cancels the session if one was in progress, and then deletes
173
// it from both memory and disk.
174
func (u *UploadManager) finishUpload(id string) {
87✔
175
        if session, exists := u.sessions[id]; exists {
126✔
176
                session.cancel(u.auth)
39✔
177
        }
39✔
178
        u.db.Batch(func(tx *bolt.Tx) error {
174✔
179
                if b := tx.Bucket(bucketUploads); b != nil {
170✔
180
                        b.Delete([]byte(id))
83✔
181
                }
83✔
182
                return nil
87✔
183
        })
184
        if u.inFlight > 0 {
120✔
185
                u.inFlight--
33✔
186
        }
33✔
187
        delete(u.sessions, id)
87✔
188
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc