• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5970575607

25 Aug 2023 01:00AM UTC coverage: 75.448% (-0.01%) from 75.46%
5970575607

push

github

web-flow
Bump thiserror from 1.0.39 to 1.0.47 (#1914)

Bumps [thiserror](https://github.com/dtolnay/thiserror) from 1.0.39 to 1.0.47.
- [Release notes](https://github.com/dtolnay/thiserror/releases)
- [Commits](https://github.com/dtolnay/thiserror/compare/1.0.39...1.0.47)

---
updated-dependencies:
- dependency-name: thiserror
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

46985 of 62275 relevant lines covered (75.45%)

48100.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.45
/dozer-log/src/storage/queue.rs
1
use std::{
2
    collections::{hash_map::Entry, HashMap},
3
    num::NonZeroU16,
4
    time::Duration,
5
};
6

7
use dozer_types::{
8
    log::{debug, error},
9
    thiserror::{self, Error},
10
};
11
use nonzero_ext::nonzero;
12
use tokio::{
13
    sync::{
14
        mpsc::{self, error::SendError, Receiver, Sender},
15
        oneshot,
16
    },
17
    task::JoinHandle,
18
};
19

20
use super::Storage;
21

22
#[derive(Debug, Clone)]
×
23
pub struct Queue {
24
    sender: Sender<Request>,
25
}
26

27
impl Queue {
28
    pub fn new(storage: Box<dyn Storage>, capacity: usize) -> (Self, JoinHandle<()>) {
1,269✔
29
        let (sender, requests) = mpsc::channel(capacity);
1,269✔
30
        let worker = tokio::spawn(upload_loop(storage, requests));
1,269✔
31
        (Self { sender }, worker)
1,269✔
32
    }
1,269✔
33

34
    pub fn create_upload(
1✔
35
        &self,
1✔
36
        key: String,
1✔
37
    ) -> Result<oneshot::Receiver<String>, SendError<String>> {
1✔
38
        self.send_request(key, RequestKind::CreateUpload)
1✔
39
    }
1✔
40

41
    pub fn upload_chunk(
1✔
42
        &self,
1✔
43
        key: String,
1✔
44
        data: Vec<u8>,
1✔
45
    ) -> Result<oneshot::Receiver<String>, SendError<String>> {
1✔
46
        self.send_request(key, RequestKind::UploadChunk(data))
1✔
47
    }
1✔
48

49
    pub fn complete_upload(
1✔
50
        &self,
1✔
51
        key: String,
1✔
52
    ) -> Result<oneshot::Receiver<String>, SendError<String>> {
1✔
53
        self.send_request(key, RequestKind::CompleteUpload)
1✔
54
    }
1✔
55

56
    pub fn upload_object(
484✔
57
        &self,
484✔
58
        key: String,
484✔
59
        data: Vec<u8>,
484✔
60
    ) -> Result<oneshot::Receiver<String>, SendError<String>> {
484✔
61
        self.send_request(key, RequestKind::UploadObject(data))
484✔
62
    }
484✔
63

64
    fn send_request(
487✔
65
        &self,
487✔
66
        key: String,
487✔
67
        kind: RequestKind,
487✔
68
    ) -> Result<oneshot::Receiver<String>, SendError<String>> {
487✔
69
        let (return_sender, return_receiver) = oneshot::channel();
487✔
70
        self.sender
487✔
71
            .blocking_send(Request {
487✔
72
                key,
487✔
73
                kind,
487✔
74
                return_sender,
487✔
75
            })
487✔
76
            .map_err(|e| SendError(e.0.key))?;
487✔
77
        Ok(return_receiver)
474✔
78
    }
487✔
79
}
80

81
#[derive(Debug)]
×
82
struct Request {
83
    key: String,
84
    kind: RequestKind,
85
    return_sender: oneshot::Sender<String>,
86
}
87

88
#[derive(Debug, Clone)]
19✔
89
enum RequestKind {
90
    CreateUpload,
91
    UploadChunk(Vec<u8>),
92
    CompleteUpload,
93
    UploadObject(Vec<u8>),
94
}
95

96
struct MultipartUpload {
97
    id: String,
98
    parts: Vec<(NonZeroU16, String)>,
99
}
100

101
async fn upload_loop(storage: Box<dyn Storage>, mut requests: Receiver<Request>) {
1,269✔
102
    let mut multipart_uploads = HashMap::new();
1,008✔
103

104
    while let Some(request) = requests.recv().await {
1,026✔
105
        loop {
106
            match handle_request(
19✔
107
                &*storage,
19✔
108
                &mut multipart_uploads,
19✔
109
                &request.key,
19✔
110
                request.kind.clone(),
19✔
111
            )
19✔
112
            .await
21✔
113
            {
114
                Ok(()) => {
115
                    if let Err(key) = request.return_sender.send(request.key) {
18✔
116
                        debug!("No one is waiting for the uploading result of {}", key);
×
117
                    }
2✔
118
                    break;
18✔
119
                }
120
                Err(Error::Storage(e)) => {
×
121
                    const RETRY_INTERVAL: Duration = Duration::from_secs(5);
122
                    error!(
123
                        "error uploading {}: {e}. Retrying in {RETRY_INTERVAL:?}",
×
124
                        request.key
125
                    );
126
                    tokio::time::sleep(RETRY_INTERVAL).await;
×
127
                }
128
                Err(e) => {
×
129
                    error!("error uploading {}: {e}", request.key);
×
130
                    break;
×
131
                }
132
            }
133
        }
134
    }
135
}
1,007✔
136

137
#[derive(Debug, Error)]
×
138
enum Error {
×
139
    #[error("storage error: {0}")]
140
    Storage(#[from] super::Error),
141
    #[error("upload already exists")]
142
    UploadAlreadyExists,
143
    #[error("upload not found")]
144
    UploadNotFound,
145
    #[error("too many parts")]
146
    TooManyParts,
147
}
148

149
async fn handle_request(
25✔
150
    storage: &dyn Storage,
25✔
151
    multipart_uploads: &mut HashMap<String, MultipartUpload>,
25✔
152
    key: &str,
25✔
153
    request: RequestKind,
25✔
154
) -> Result<(), Error> {
25✔
155
    match request {
25✔
156
        RequestKind::CreateUpload => match multipart_uploads.entry(key.to_string()) {
4✔
157
            Entry::Vacant(entry) => {
3✔
158
                let upload_id = storage.create_multipart_upload(entry.key().clone()).await?;
3✔
159
                entry.insert(MultipartUpload {
3✔
160
                    id: upload_id,
3✔
161
                    parts: vec![],
3✔
162
                });
3✔
163
            }
164
            Entry::Occupied(_) => {
165
                return Err(Error::UploadAlreadyExists);
1✔
166
            }
167
        },
168
        RequestKind::UploadChunk(data) => {
3✔
169
            let upload = multipart_uploads
3✔
170
                .get_mut(key)
3✔
171
                .ok_or(Error::UploadNotFound)?;
3✔
172
            let part_number = match upload.parts.last() {
2✔
173
                Some((last_part_number, _)) => {
×
174
                    last_part_number.checked_add(1).ok_or(Error::TooManyParts)?
×
175
                }
176
                None => nonzero!(1u16),
2✔
177
            };
178
            let part_id = storage
2✔
179
                .upload_part(key.to_string(), upload.id.clone(), part_number, data)
2✔
180
                .await?;
2✔
181
            upload.parts.push((part_number, part_id));
2✔
182
        }
183
        RequestKind::CompleteUpload => {
184
            let (key, upload) = multipart_uploads
2✔
185
                .remove_entry(key)
2✔
186
                .ok_or(Error::UploadNotFound)?;
2✔
187
            storage
2✔
188
                .complete_multipart_upload(key, upload.id, upload.parts)
2✔
189
                .await?;
6✔
190
        }
191
        RequestKind::UploadObject(data) => {
16✔
192
            storage.put_object(key.to_string(), data).await?;
17✔
193
        }
194
    }
195
    Ok(())
22✔
196
}
24✔
197

198
#[cfg(test)]
199
mod tests {
200
    use crate::storage::create_temp_dir_local_storage;
201

202
    use super::*;
203

204
    #[tokio::test]
1✔
205
    async fn test_handle_request() {
1✔
206
        let (_temp_dir, storage) = create_temp_dir_local_storage().await;
1✔
207
        let mut multipart_uploads = HashMap::new();
1✔
208
        let key = "test";
1✔
209
        let data = vec![1, 2, 3];
1✔
210
        handle_request(
1✔
211
            &*storage,
1✔
212
            &mut multipart_uploads,
1✔
213
            key,
1✔
214
            RequestKind::CreateUpload,
1✔
215
        )
1✔
216
        .await
×
217
        .unwrap();
1✔
218
        handle_request(
1✔
219
            &*storage,
1✔
220
            &mut multipart_uploads,
1✔
221
            key,
1✔
222
            RequestKind::UploadChunk(data.clone()),
1✔
223
        )
1✔
224
        .await
1✔
225
        .unwrap();
1✔
226
        handle_request(
1✔
227
            &*storage,
1✔
228
            &mut multipart_uploads,
1✔
229
            key,
1✔
230
            RequestKind::CompleteUpload,
1✔
231
        )
1✔
232
        .await
3✔
233
        .unwrap();
1✔
234
        assert_eq!(
1✔
235
            storage.download_object(key.to_string()).await.unwrap(),
4✔
236
            data
237
        );
238
        assert!(multipart_uploads.is_empty());
1✔
239
    }
240

241
    #[tokio::test]
1✔
242
    async fn test_handle_request_upload_already_exists() {
1✔
243
        let (_temp_dir, storage) = create_temp_dir_local_storage().await;
1✔
244
        let mut multipart_uploads = HashMap::new();
1✔
245
        let key = "test";
1✔
246
        handle_request(
1✔
247
            &*storage,
1✔
248
            &mut multipart_uploads,
1✔
249
            key,
1✔
250
            RequestKind::CreateUpload,
1✔
251
        )
1✔
252
        .await
×
253
        .unwrap();
1✔
254
        let error = handle_request(
1✔
255
            &*storage,
1✔
256
            &mut multipart_uploads,
1✔
257
            key,
1✔
258
            RequestKind::CreateUpload,
1✔
259
        )
1✔
260
        .await
×
261
        .unwrap_err();
1✔
262
        assert!(matches!(error, Error::UploadAlreadyExists));
1✔
263
    }
264

265
    #[tokio::test]
1✔
266
    async fn test_handle_request_upload_not_found() {
1✔
267
        let (_temp_dir, storage) = create_temp_dir_local_storage().await;
1✔
268
        let mut multipart_uploads = HashMap::new();
1✔
269
        let key = "test";
1✔
270
        let error = handle_request(
1✔
271
            &*storage,
1✔
272
            &mut multipart_uploads,
1✔
273
            key,
1✔
274
            RequestKind::UploadChunk(vec![]),
1✔
275
        )
1✔
276
        .await
×
277
        .unwrap_err();
1✔
278
        assert!(matches!(error, Error::UploadNotFound));
1✔
279
    }
280
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc