• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

0xmichalis / nftbk / 18378365690

09 Oct 2025 01:42PM UTC coverage: 34.076% (-31.8%) from 65.845%
18378365690

push

github

0xmichalis
ci: tarpaulin install check update

887 of 2603 relevant lines covered (34.08%)

6.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.13
/src/httpclient/stream.rs
1
use anyhow::Result;
2
use async_compression::tokio::bufread::GzipDecoder;
3
use futures_util::TryStreamExt;
4
use std::path::Path;
5
use tokio::io::{AsyncRead, AsyncWriteExt, BufReader};
6
use tokio_util::io::StreamReader;
7

8
pub(crate) async fn stream_reader_to_file<R: AsyncRead + Unpin>(
33✔
9
    reader: &mut R,
10
    file: &mut tokio::fs::File,
11
    file_path: &Path,
12
) -> anyhow::Result<std::path::PathBuf> {
13
    tokio::io::copy(reader, file).await.map_err(|e| {
133✔
14
        anyhow::anyhow!(
1✔
15
            "Failed to stream content to file {}: {}",
1✔
16
            file_path.display(),
2✔
17
            e
×
18
        )
19
    })?;
20
    file.flush()
32✔
21
        .await
×
22
        .map_err(|e| anyhow::anyhow!("Failed to flush file {}: {}", file_path.display(), e))?;
32✔
23
    Ok(file_path.to_path_buf())
32✔
24
}
25

26
pub(crate) async fn stream_http_to_file(
22✔
27
    response: reqwest::Response,
28
    file_path: &Path,
29
) -> anyhow::Result<std::path::PathBuf> {
30
    let stream = response.bytes_stream().map_err(std::io::Error::other);
88✔
31
    let mut reader = StreamReader::new(stream);
66✔
32

33
    let mut file_path = file_path.to_path_buf();
66✔
34
    let (detected_ext, prefix_buf) =
44✔
35
        crate::content::extensions::detect_extension_from_stream(&mut reader).await;
66✔
36
    if !crate::content::extensions::has_known_extension(&file_path) {
22✔
37
        if let Some(detected_ext) = detected_ext {
8✔
38
            let current_path_str = file_path.to_string_lossy();
39
            tracing::debug!("Appending detected media extension: {}", detected_ext);
×
40
            file_path = std::path::PathBuf::from(format!("{current_path_str}.{detected_ext}"));
41
        }
42
    }
43

44
    let mut file = tokio::fs::File::create(&file_path)
65✔
45
        .await
22✔
46
        .map_err(|e| anyhow::anyhow!("Failed to create file {}: {}", file_path.display(), e))?;
25✔
47
    if !prefix_buf.is_empty() {
48
        file.write_all(&prefix_buf).await.map_err(|e| {
42✔
49
            anyhow::anyhow!(
×
50
                "Failed to write prefix to file {}: {}",
×
51
                file_path.display(),
×
52
                e
53
            )
54
        })?;
55
    }
56
    let result = stream_reader_to_file(&mut reader, &mut file, &file_path).await?;
42✔
57

58
    file.sync_all().await.map_err(|e| {
21✔
59
        anyhow::anyhow!("Failed to sync file {} to disk: {}", file_path.display(), e)
×
60
    })?;
61

62
    Ok(result)
21✔
63
}
64

65
pub(crate) async fn stream_gzip_http_to_file(
10✔
66
    response: reqwest::Response,
67
    file_path: &Path,
68
) -> Result<std::path::PathBuf> {
69
    let mut file = tokio::fs::File::create(file_path).await?;
30✔
70
    let stream = response.bytes_stream().map_err(std::io::Error::other);
71
    let reader = StreamReader::new(stream);
72
    let mut decoder = GzipDecoder::new(BufReader::new(reader));
73
    stream_reader_to_file(&mut decoder, &mut file, file_path).await
74
}
75

76
#[cfg(test)]
77
mod stream_reader_to_file_tests {
78
    use super::*;
79
    use tempfile::TempDir;
80
    use tokio::io::AsyncReadExt;
81

82
    #[tokio::test]
83
    async fn writes_all_bytes_from_reader_to_file() {
84
        let temp_dir = TempDir::new().unwrap();
85
        let file_path = temp_dir.path().join("ten_kib.bin");
86
        let mut file = tokio::fs::File::create(&file_path).await.unwrap();
87

88
        // Create an AsyncRead that yields exactly 10 KiB of byte 'a'
89
        let mut reader = tokio::io::repeat(b'a').take(10 * 1024);
90

91
        let result = stream_reader_to_file(&mut reader, &mut file, &file_path).await;
92
        assert!(result.is_ok());
93

94
        let data = tokio::fs::read(&file_path).await.unwrap();
95
        assert_eq!(data.len(), 10 * 1024);
96
        assert!(data.iter().all(|b| *b == b'a'));
97
    }
98

99
    struct ErrorReader;
100

101
    impl tokio::io::AsyncRead for ErrorReader {
102
        fn poll_read(
103
            self: std::pin::Pin<&mut Self>,
104
            _cx: &mut std::task::Context<'_>,
105
            _buf: &mut tokio::io::ReadBuf<'_>,
106
        ) -> std::task::Poll<std::io::Result<()>> {
107
            std::task::Poll::Ready(Err(std::io::Error::other("simulated read error")))
108
        }
109
    }
110

111
    #[tokio::test]
112
    async fn returns_error_when_reader_fails() {
113
        let temp_dir = TempDir::new().unwrap();
114
        let file_path = temp_dir.path().join("out.bin");
115
        let mut file = tokio::fs::File::create(&file_path).await.unwrap();
116

117
        let mut reader = ErrorReader;
118
        let err = stream_reader_to_file(&mut reader, &mut file, &file_path)
119
            .await
120
            .expect_err("expected error");
121
        let msg = format!("{err:#}");
122
        assert!(msg.contains("Failed to stream content to file"));
123
    }
124
}
125

126
#[cfg(test)]
127
mod stream_http_to_file_tests {
128
    use super::*;
129
    use tempfile::TempDir;
130
    use wiremock::{
131
        matchers::{method, path},
132
        Mock, MockServer, ResponseTemplate,
133
    };
134

135
    #[tokio::test]
136
    async fn streams_large_http_body_to_file() {
137
        let mock_server = MockServer::start().await;
138
        let url = format!("{}/large-file", mock_server.uri());
139

140
        let large_body = "x".repeat(1024 * 1024);
141
        Mock::given(method("GET"))
142
            .and(path("/large-file"))
143
            .respond_with(ResponseTemplate::new(200).set_body_string(large_body))
144
            .mount(&mock_server)
145
            .await;
146

147
        let temp_dir = TempDir::new().unwrap();
148
        let file_path = temp_dir.path().join("large.txt");
149

150
        let response = reqwest::get(&url).await.unwrap();
151
        let result = stream_http_to_file(response, &file_path).await;
152
        assert!(result.is_ok());
153
        let metadata = std::fs::metadata(&file_path).unwrap();
154
        assert_eq!(metadata.len(), 1024 * 1024);
155
    }
156

157
    #[tokio::test]
158
    async fn appends_detected_extension_when_missing() {
159
        let mock_server = MockServer::start().await;
160
        let url = format!("{}/png", mock_server.uri());
161

162
        // Minimal PNG signature + padding
163
        let mut body = vec![0x89, b'P', b'N', b'G', 0x0D, 0x0A, 0x1A, 0x0A];
164
        body.extend_from_slice(&[0u8; 64]);
165
        Mock::given(method("GET"))
166
            .and(path("/png"))
167
            .respond_with(ResponseTemplate::new(200).set_body_bytes(body))
168
            .mount(&mock_server)
169
            .await;
170

171
        let temp_dir = TempDir::new().unwrap();
172
        let base_path = temp_dir.path().join("image");
173
        let response = reqwest::get(&url).await.unwrap();
174
        let result_path = stream_http_to_file(response, &base_path).await.unwrap();
175

176
        // Should have appended .png
177
        assert!(result_path.extension().is_some());
178
        assert_eq!(result_path.extension().unwrap(), "png");
179
        assert!(result_path.exists());
180
    }
181

182
    #[tokio::test]
183
    async fn does_not_append_when_known_extension_present() {
184
        let mock_server = MockServer::start().await;
185
        let url = format!("{}/any", mock_server.uri());
186
        Mock::given(method("GET"))
187
            .and(path("/any"))
188
            .respond_with(ResponseTemplate::new(200).set_body_string("data"))
189
            .mount(&mock_server)
190
            .await;
191

192
        let temp_dir = TempDir::new().unwrap();
193
        let path_with_ext = temp_dir.path().join("photo.jpg");
194
        let response = reqwest::get(&url).await.unwrap();
195
        let result_path = stream_http_to_file(response, &path_with_ext).await.unwrap();
196

197
        assert_eq!(result_path, path_with_ext);
198
        assert!(result_path.exists());
199
    }
200

201
    #[tokio::test]
202
    async fn returns_error_when_file_creation_fails() {
203
        let mock_server = MockServer::start().await;
204
        let url = format!("{}/small", mock_server.uri());
205
        Mock::given(method("GET"))
206
            .and(path("/small"))
207
            .respond_with(ResponseTemplate::new(200).set_body_string("abc"))
208
            .mount(&mock_server)
209
            .await;
210

211
        // Non-existent parent directory to trigger create error
212
        let temp_dir = TempDir::new().unwrap();
213
        let bad_path = temp_dir.path().join("no_such_dir").join("file.bin");
214

215
        let response = reqwest::get(&url).await.unwrap();
216
        let err = stream_http_to_file(response, &bad_path)
217
            .await
218
            .expect_err("expected creation error");
219
        let msg = format!("{err:#}");
220
        assert!(msg.contains("Failed to create file"));
221
    }
222
}
223

224
#[cfg(test)]
225
mod stream_gzip_http_to_file_tests {
226
    use super::*;
227
    use flate2::{write::GzEncoder, Compression};
228
    use tempfile::TempDir;
229
    use wiremock::{
230
        matchers::{method, path},
231
        Mock, MockServer, ResponseTemplate,
232
    };
233

234
    #[tokio::test]
235
    async fn decodes_gzipped_http_body_to_file() {
236
        let mock_server = MockServer::start().await;
237
        let url = format!("{}/gz", mock_server.uri());
238

239
        // Prepare gzipped payload for body "hello gzip"
240
        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
241
        std::io::Write::write_all(&mut encoder, b"hello gzip").unwrap();
242
        let gzipped = encoder.finish().unwrap();
243

244
        Mock::given(method("GET"))
245
            .and(path("/gz"))
246
            .respond_with(
247
                ResponseTemplate::new(200)
248
                    .set_body_bytes(gzipped)
249
                    .insert_header("Content-Encoding", "gzip"),
250
            )
251
            .mount(&mock_server)
252
            .await;
253

254
        let temp_dir = TempDir::new().unwrap();
255
        let file_path = temp_dir.path().join("out.txt");
256

257
        let response = reqwest::get(&url).await.unwrap();
258
        let result = stream_gzip_http_to_file(response, &file_path).await;
259
        assert!(result.is_ok());
260

261
        let content = tokio::fs::read_to_string(&file_path).await.unwrap();
262
        assert_eq!(content, "hello gzip");
263
    }
264
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc