• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tu6ge / oss-rs / 5829505807

pending completion
5829505807

push

github

tu6ge
Support std IO (#26)

* feat(decode)!: change init object fn

* todo

* feat(error): OssError add more info

when OssError code is SignatureDoesNotMatch ,show expect
 sign string

* feat(io): support write

* feat: blocking support

* feat: blocking read

* feat: 允许读取的数据于目标数组长度不一致

* feat: 分离 Rc 和内部数据

* feat: support Arc Object Content

* feat: 解决多次写入少量数据导致oss错误的问题

当多次写入少量数据,不符合分片的最小数量时,调用 oss 接口会导致报错

* refactor

* feat: 交互 arc 与 rc 的位置

* docs(io)

* docs(io)

* style

* chore: default close blocking

* fix

* style

* feat(io): change seek

* feat(io): change error type

* style

* feat(bucket)!: change base_bucket_info

* test(io)

* test(doc): remove deprecated

* test(io)

* test(io)

* test(io)

* style(io): clippy

* chore: support more derive

* refactor

* docs

1293 of 1293 new or added lines in 19 files covered. (100.0%)

7298 of 7685 relevant lines covered (94.96%)

9.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.23
/src/object/content/blocking.rs
1
//! 读写 object 内容
6✔
2

3
use std::{
4
    io::{Read, Result as IoResult, Seek, SeekFrom, Write},
5
    ops::{Deref, DerefMut},
6
    rc::Rc,
7
};
8

9
use http::{header::CONTENT_LENGTH, HeaderValue, Method};
10
use url::Url;
11

12
#[cfg(test)]
13
use super::mock::blocking::Files;
14
#[cfg(not(test))]
15
use crate::file::BlockingFiles;
16
use crate::{
17
    file::blocking::AlignBuilder,
18
    object::InitObject,
19
    types::{
20
        object::{InvalidObjectPath, SetObjectPath},
21
        CanonicalizedResource,
22
    },
23
    ClientRc as Client, ObjectPath,
24
};
25

26
use super::{super::ObjectsBlocking, ContentError, ContentErrorKind, Inner};
27

28
/// # object 内容
29
/// [OSS 分片上传文档](https://help.aliyun.com/zh/oss/user-guide/multipart-upload-12)
30
//#[derive(Debug)]
31
pub struct Content {
32
    client: Rc<Client>,
33
    inner: Inner,
34
}
35

36
impl Write for Content {
37
    // 写入缓冲区
38
    fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
×
39
        self.inner.write(buf)
×
40
    }
×
41

42
    // 按分片数量选择上传 OSS 的方式
43
    fn flush(&mut self) -> IoResult<()> {
×
44
        let len = self.content_part.len();
×
45

46
        //println!("len: {}", len);
47

48
        if len == 0 {
×
49
            return Ok(());
×
50
        }
51
        if len == 1 {
×
52
            return self.upload();
×
53
        }
54

55
        self.upload_multi()
×
56
    }
×
57
}
58

59
impl Read for Content {
60
    fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
4✔
61
        let len = buf.len();
4✔
62
        if len as u64 > Inner::MAX_SIZE {
4✔
63
            return Err(ContentError::new(ContentErrorKind::OverflowMaxSize).into());
1✔
64
        }
65

66
        let end = self.current_pos + len as u64;
3✔
67
        let vec = self
6✔
68
            .client
69
            .get_object(self.path.clone(), self.current_pos..end - 1)?;
3✔
70

71
        let len = std::cmp::min(vec.len(), buf.len());
3✔
72
        buf[..len].copy_from_slice(&vec[..len]);
3✔
73

74
        Ok(len)
3✔
75
    }
4✔
76
}
77

78
impl Seek for Content {
79
    fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
×
80
        self.inner.seek(pos)
×
81
    }
×
82
}
83

84
impl Default for Content {
85
    fn default() -> Self {
13✔
86
        Self {
13✔
87
            client: Rc::default(),
13✔
88
            inner: Inner::default(),
13✔
89
        }
90
    }
13✔
91
}
92

93
impl Deref for Content {
94
    type Target = Inner;
95
    fn deref(&self) -> &Self::Target {
60✔
96
        &self.inner
97
    }
60✔
98
}
99

100
impl DerefMut for Content {
101
    fn deref_mut(&mut self) -> &mut Inner {
57✔
102
        &mut self.inner
103
    }
57✔
104
}
105

106
/// 带内容的 object 列表
107
pub type List = ObjectsBlocking<Content>;
108

109
impl InitObject<Content> for List {
110
    fn init_object(&mut self) -> Option<Content> {
1✔
111
        Some(Content {
1✔
112
            client: self.client(),
1✔
113
            ..Default::default()
1✔
114
        })
115
    }
1✔
116
}
117

118
impl Content {
119
    /// 从 client 创建
120
    pub fn from_client(client: Rc<Client>) -> Content {
9✔
121
        Content {
9✔
122
            client,
123
            ..Default::default()
9✔
124
        }
125
    }
9✔
126
    /// 设置 ObjectPath
127
    pub fn path<P>(mut self, path: P) -> Result<Self, InvalidObjectPath>
9✔
128
    where
129
        P: TryInto<ObjectPath>,
130
        P::Error: Into<InvalidObjectPath>,
131
    {
132
        self.path = path.try_into().map_err(Into::into)?;
9✔
133
        self.content_type_with_path();
9✔
134
        Ok(self)
9✔
135
    }
9✔
136

137
    fn part_canonicalized<'q>(&self, query: &'q str) -> (Url, CanonicalizedResource) {
9✔
138
        let mut url = self.client.get_bucket_url();
9✔
139
        url.set_object_path(&self.path);
9✔
140
        url.set_query(Some(query));
9✔
141

142
        let bucket = self.client.get_bucket_name();
9✔
143
        (
9✔
144
            url,
9✔
145
            CanonicalizedResource::new(format!("/{}/{}?{}", bucket, self.path.as_ref(), query)),
9✔
146
        )
147
    }
9✔
148

149
    fn upload(&mut self) -> IoResult<()> {
1✔
150
        let content = self.content_part.pop().unwrap();
1✔
151
        self.client
2✔
152
            .put_content_base(content, self.content_type, self.path.clone())
1✔
153
            .map(|_| ())
1✔
154
            .map_err(Into::into)
155
    }
1✔
156

157
    fn upload_multi(&mut self) -> IoResult<()> {
1✔
158
        self.init_multi()?;
1✔
159

160
        let mut i = 1;
1✔
161
        let mut size: u64 = 0;
1✔
162
        self.content_part.reverse();
1✔
163
        while let Some(item) = self.content_part.pop() {
3✔
164
            size += item.len() as u64;
2✔
165
            self.upload_part(i, item)?;
2✔
166
            i += 1;
2✔
167
        }
168

169
        self.complete_multi()?;
1✔
170
        self.content_size = size;
1✔
171

172
        Ok(())
1✔
173
    }
1✔
174

175
    /// 初始化批量上传
176
    fn init_multi(&mut self) -> Result<(), ContentError> {
2✔
177
        const UPLOADS: &str = "uploads";
178

179
        let (url, resource) = self.part_canonicalized(UPLOADS);
2✔
180
        let xml = self
4✔
181
            .client
182
            .builder(Method::POST, url, resource)?
2✔
183
            .send_adjust_error()?
×
184
            .text()?;
×
185

186
        self.parse_upload_id(&xml)
2✔
187
    }
2✔
188

189
    /// 上传分块
190
    fn upload_part(&mut self, index: u16, buf: Vec<u8>) -> Result<(), ContentError> {
6✔
191
        const ETAG: &str = "ETag";
192

193
        if self.upload_id.is_empty() {
6✔
194
            return Err(ContentError::new(ContentErrorKind::NoFoundUploadId));
1✔
195
        }
196

197
        if self.etag_list.len() >= Inner::MAX_PARTS_COUNT as usize {
5✔
198
            return Err(ContentError::new(ContentErrorKind::OverflowMaxPartsCount));
1✔
199
        }
200
        if buf.len() > Inner::PART_SIZE_MAX {
4✔
201
            return Err(ContentError::new(ContentErrorKind::OverflowPartSize));
1✔
202
        }
203

204
        let query = format!("partNumber={}&uploadId={}", index, self.upload_id);
3✔
205

206
        let (url, resource) = self.part_canonicalized(&query);
3✔
207

208
        let content_length = buf.len().to_string();
3✔
209
        let headers = vec![(
6✔
210
            CONTENT_LENGTH,
3✔
211
            HeaderValue::from_str(&content_length).unwrap(),
3✔
212
        )];
213

214
        let resp = self
6✔
215
            .client
216
            .builder_with_header(Method::PUT, url, resource, headers)?
3✔
217
            .body(buf)
3✔
218
            .send_adjust_error()?;
×
219

220
        let etag = resp
6✔
221
            .headers()
222
            .get(ETAG)
223
            .ok_or(ContentError::new(ContentErrorKind::NoFoundEtag))?;
3✔
224

225
        //println!("etag: {:?}", etag);
226

227
        // 59A2A10DD1686F679EE885FC1EBA5183
228
        //let etag = &(etag.to_str().unwrap())[1..33];
229

230
        self.etag_list.push((index, etag.to_owned()));
3✔
231

232
        Ok(())
3✔
233
    }
6✔
234

235
    /// 完成分块上传
236
    fn complete_multi(&mut self) -> Result<(), ContentError> {
4✔
237
        if self.upload_id.is_empty() {
4✔
238
            return Err(ContentError::new(ContentErrorKind::NoFoundUploadId));
2✔
239
        }
240

241
        let xml = self.etag_list_xml()?;
2✔
242

243
        let query = format!("uploadId={}", self.upload_id);
2✔
244

245
        let (url, resource) = self.part_canonicalized(&query);
2✔
246

247
        let content_length = xml.len().to_string();
2✔
248
        let headers = vec![(
4✔
249
            CONTENT_LENGTH,
2✔
250
            HeaderValue::from_str(&content_length).unwrap(),
2✔
251
        )];
252

253
        let _resp = self
4✔
254
            .client
255
            .builder_with_header(Method::POST, url, resource, headers)?
2✔
256
            .body(xml)
2✔
257
            .send_adjust_error()?;
×
258

259
        //println!("resp: {}", resp);
260
        self.etag_list.clear();
2✔
261
        self.upload_id = String::default();
2✔
262

263
        Ok(())
2✔
264
    }
4✔
265

266
    /// 取消分块上传
267
    pub fn abort_multi(&mut self) -> Result<(), ContentError> {
1✔
268
        if self.upload_id.is_empty() {
1✔
269
            return Err(ContentError::new(ContentErrorKind::NoFoundUploadId));
×
270
        }
271
        let query = format!("uploadId={}", self.upload_id);
1✔
272

273
        let (url, resource) = self.part_canonicalized(&query);
1✔
274
        let _resp = self
2✔
275
            .client
276
            .builder(Method::DELETE, url, resource)?
1✔
277
            .send_adjust_error()?;
×
278

279
        //println!("resp: {:?}", resp);
280
        self.etag_list.clear();
1✔
281
        self.upload_id = String::default();
1✔
282

283
        Ok(())
1✔
284
    }
1✔
285
}
286

287
// impl Drop for Content {
288
//     fn drop(&mut self) {
289
//         if self.upload_id.is_empty() == false {
290
//             self.abort_multi();
291
//         }
292
//     }
293
// }
294

295
impl From<Client> for Content {
296
    fn from(value: Client) -> Self {
×
297
        Content {
×
298
            client: Rc::new(value),
×
299
            ..Default::default()
×
300
        }
301
    }
×
302
}
303

304
#[cfg(test)]
305
mod tests {
306
    use crate::decode::RefineObject;
307

308
    use super::super::test_suite_block::{
309
        AbortMulti, CompleteMulti, InitMulti, UploadMulti, UploadPart,
310
    };
311
    use super::*;
312

313
    #[test]
314
    fn assert_impl() {
2✔
315
        fn impl_fn<T: RefineObject<E>, E: std::error::Error + 'static>(_: T) {}
1✔
316

317
        impl_fn(Content::default());
1✔
318

319
        fn impl_deref<T: Deref<Target = Inner>>(_: T) {}
1✔
320

321
        impl_deref(Content::default());
1✔
322
    }
2✔
323

324
    #[test]
325
    fn read() {
2✔
326
        let client = Client::test_init();
1✔
327
        let mut con = Content::from_client(Rc::new(client))
1✔
328
            .path("aaa.txt")
329
            .unwrap();
330

331
        let mut buf = [0u8; 201];
1✔
332
        let err = con.read(&mut buf).unwrap_err();
1✔
333
        assert_eq!(err.to_string(), "max size must be lt 48.8TB");
1✔
334

335
        let mut buf = [0u8; 10];
1✔
336
        let len = con.read(&mut buf).unwrap();
1✔
337
        assert_eq!(buf, [1u8, 2, 3, 4, 5, 0, 0, 0, 0, 0]);
1✔
338
        assert_eq!(len, 5);
1✔
339

340
        let mut buf = [0u8; 3];
1✔
341
        let len = con.read(&mut buf).unwrap();
1✔
342
        assert_eq!(buf, [1u8, 2, 3]);
1✔
343
        assert_eq!(len, 3);
1✔
344

345
        con.current_pos = 10;
1✔
346
        let mut buf = [0u8; 3];
1✔
347
        let len = con.read(&mut buf).unwrap();
1✔
348
        assert_eq!(buf, [1u8, 2, 3]);
1✔
349
        assert_eq!(len, 3);
1✔
350
    }
2✔
351

352
    #[test]
353
    fn init_object() {
2✔
354
        let mut list = List::default();
1✔
355
        let client = Client::test_init();
1✔
356
        list.set_client(Rc::new(client.clone()));
1✔
357

358
        let con = list.init_object().unwrap();
1✔
359

360
        assert_eq!(con.client.bucket, client.bucket);
1✔
361
        assert_eq!(con.inner, Inner::default());
1✔
362
    }
2✔
363

364
    #[test]
365
    fn from_client() {
2✔
366
        let client = Client::test_init();
1✔
367

368
        let con = Content::from_client(Rc::new(client.clone()));
1✔
369

370
        assert_eq!(con.client.bucket, client.bucket);
1✔
371
        assert_eq!(con.inner, Inner::default());
1✔
372
    }
2✔
373

374
    #[test]
375
    fn path() {
2✔
376
        let con = Content::default().path("aaa.txt").unwrap();
1✔
377
        assert_eq!(con.path, "aaa.txt");
1✔
378
    }
2✔
379

380
    #[test]
381
    fn part_canonicalized() {
2✔
382
        let client = Client::test_init();
1✔
383
        let con = Content::from_client(Rc::new(client))
1✔
384
            .path("aaa.txt")
385
            .unwrap();
386

387
        let (url, can) = con.part_canonicalized("first=1&second=2");
1✔
388
        assert_eq!(
1✔
389
            url.as_str(),
1✔
390
            "https://bar.oss-cn-qingdao.aliyuncs.com/aaa.txt?first=1&second=2"
391
        );
392
        assert_eq!(can.to_string(), "/bar/aaa.txt?first=1&second=2");
1✔
393
    }
2✔
394

395
    #[test]
396
    fn upload() {
2✔
397
        let client = Client::test_init();
1✔
398
        let mut con = Content::from_client(Rc::new(client))
1✔
399
            .path("aaa.txt")
400
            .unwrap();
401
        con.content_part.push(b"bbb".to_vec());
1✔
402
        con.upload().unwrap();
1✔
403
    }
2✔
404

405
    #[test]
406
    fn init_multi() {
2✔
407
        let client = Client::test_init().middleware(Rc::new(InitMulti {}));
1✔
408
        let mut con = Content::from_client(Rc::new(client))
1✔
409
            .path("aaa.txt")
410
            .unwrap();
411

412
        con.init_multi().unwrap();
1✔
413

414
        assert_eq!(con.upload_id, "foo_upload_id");
1✔
415
    }
2✔
416

417
    #[test]
418
    fn upload_part() {
2✔
419
        let client = Client::test_init().middleware(Rc::new(UploadPart {}));
1✔
420
        let mut con = Content::from_client(Rc::new(client))
1✔
421
            .path("aaa.txt")
422
            .unwrap();
423

424
        let err = con.upload_part(1, b"bbb".to_vec()).unwrap_err();
1✔
425
        assert_eq!(err.to_string(), "not found upload id");
1✔
426

427
        con.upload_id = "foo_upload_id".to_string();
1✔
428
        for _i in 0..10 {
11✔
429
            con.etag_list.push((1, "a".parse().unwrap()));
10✔
430
        }
431
        let err = con.upload_part(1, b"bbb".to_vec()).unwrap_err();
1✔
432
        assert_eq!(err.to_string(), "overflow max parts count");
1✔
433
        con.etag_list.clear();
1✔
434

435
        let err = con
1✔
436
            .upload_part(1, b"012345678901234567890".to_vec())
1✔
437
            .unwrap_err();
438
        assert_eq!(err.to_string(), "part size must be between 100k and 5G");
1✔
439

440
        con.upload_part(2, b"bbb".to_vec()).unwrap();
1✔
441
        let (index, value) = con.etag_list.pop().unwrap();
1✔
442
        assert_eq!(index, 2);
1✔
443
        assert_eq!(value.to_str().unwrap(), "foo_etag");
1✔
444
    }
2✔
445

446
    #[test]
447
    fn complete_multi() {
2✔
448
        let client = Client::test_init().middleware(Rc::new(CompleteMulti {}));
1✔
449
        let mut con = Content::from_client(Rc::new(client))
1✔
450
            .path("aaa.txt")
451
            .unwrap();
452
        let err = con.complete_multi().unwrap_err();
1✔
453
        assert_eq!(err.to_string(), "not found upload id");
1✔
454

455
        con.upload_id = "foo_upload_id".to_string();
1✔
456
        con.etag_list.push((1, "aaa".parse().unwrap()));
1✔
457
        con.etag_list.push((2, "bbb".parse().unwrap()));
1✔
458
        con.complete_multi().unwrap();
1✔
459
        assert!(con.etag_list.is_empty());
1✔
460
        assert!(con.upload_id.is_empty());
1✔
461
    }
2✔
462

463
    #[test]
464
    fn upload_multi() {
2✔
465
        let client = Client::test_init().middleware(Rc::new(UploadMulti {}));
1✔
466
        let mut con = Content::from_client(Rc::new(client))
1✔
467
            .path("aaa.txt")
468
            .unwrap();
469

470
        con.content_part.push(b"aaa".to_vec());
1✔
471
        con.content_part.push(b"bbb".to_vec());
1✔
472

473
        con.upload_multi().unwrap();
1✔
474

475
        assert_eq!(con.content_size, 6);
1✔
476
    }
2✔
477

478
    #[test]
479
    fn abort_multi() {
2✔
480
        let client = Client::test_init().middleware(Rc::new(AbortMulti {}));
1✔
481
        let mut con = Content::from_client(Rc::new(client))
1✔
482
            .path("aaa.txt")
483
            .unwrap();
484
        let err = con.complete_multi().unwrap_err();
1✔
485
        assert_eq!(err.to_string(), "not found upload id");
1✔
486

487
        con.upload_id = "foo_upload_id".to_string();
1✔
488
        con.etag_list.push((1, "aaa".parse().unwrap()));
1✔
489
        con.abort_multi().unwrap();
1✔
490
        assert!(con.etag_list.is_empty());
1✔
491
        assert!(con.upload_id.is_empty());
1✔
492
    }
2✔
493
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc